File tree Expand file tree Collapse file tree 5 files changed +39
-9
lines changed
Expand file tree Collapse file tree 5 files changed +39
-9
lines changed Original file line number Diff line number Diff line change @@ -16,10 +16,11 @@ class ComposeGreetingInput:
1616@activity .defn
1717async def compose_greeting (input : ComposeGreetingInput ) -> str :
1818 test_service = TestService ()
19+ attempt = 1
1920 while True :
2021 try :
2122 try :
22- result = await test_service .get_service_result (input )
23+ result = await test_service .get_service_result (input , attempt )
2324 activity .logger .info (f"Exiting activity ${ result } " )
2425 return result
2526 except Exception as e :
@@ -28,6 +29,7 @@ async def compose_greeting(input: ComposeGreetingInput) -> str:
2829
2930 activity .heartbeat ("Invoking activity" )
3031 await asyncio .sleep (1 )
32+ attempt += 1
3133 except asyncio .CancelledError :
3234 # activity was either cancelled or workflow was completed or worker shut down
3335 # if you need to clean up you can catch this.
Original file line number Diff line number Diff line change @@ -16,4 +16,4 @@ async def compose_greeting(input: ComposeGreetingInput) -> str:
1616 test_service = TestService ()
1717 # If this raises an exception because it's not done yet, the activity will
1818 # continually be scheduled for retry
19- return await test_service .get_service_result (input )
19+ return await test_service .get_service_result (input , activity . info (). attempt )
Original file line number Diff line number Diff line change @@ -14,9 +14,9 @@ async def run(self, name: str) -> str:
1414 return await workflow .execute_activity (
1515 compose_greeting ,
1616 ComposeGreetingInput ("Hello" , name ),
17- start_to_close_timeout = timedelta (seconds = 2 ),
17+ start_to_close_timeout = timedelta (seconds = 5 ),
1818 retry_policy = RetryPolicy (
1919 backoff_coefficient = 1.0 ,
20- initial_interval = timedelta (seconds = 60 ),
20+ initial_interval = timedelta (seconds = 30 ),
2121 ),
2222 )
Original file line number Diff line number Diff line change 11class TestService :
22 def __init__ (self ):
3- self .try_attempts = 0
43 self .error_attempts = 5
54
6- async def get_service_result (self , input ):
5+ async def get_service_result (self , input , attempt : int ):
76 print (
8- f"Attempt { self . try_attempts } "
7+ f"Attempt { attempt } "
98 f" of { self .error_attempts } to invoke service"
109 )
11- self .try_attempts += 1
12- if self .try_attempts % self .error_attempts == 0 :
10+ if attempt % self .error_attempts == 0 :
1311 return f"{ input .greeting } , { input .name } !"
1412 raise Exception ("service is down" )
Original file line number Diff line number Diff line change 1+ import uuid
2+ import pytest
3+ from polling .infrequent .activities import compose_greeting
4+ from polling .infrequent .workflows import GreetingWorkflow
5+ from temporalio .client import Client
6+ from temporalio .testing import WorkflowEnvironment
7+ from temporalio .worker import Worker
8+
9+
10+ async def test_infrequent_polling_workflow (client : Client , env : WorkflowEnvironment ):
11+ if not env .supports_time_skipping :
12+ pytest .skip (
13+ "Too slow to test with time-skipping disabled" )
14+
15+ # Start a worker that hosts the workflow and activity implementations.
16+ task_queue = f"tq-{ uuid .uuid4 ()} "
17+ async with Worker (
18+ client ,
19+ task_queue = task_queue ,
20+ workflows = [GreetingWorkflow ],
21+ activities = [compose_greeting ],
22+ ):
23+ handle = await client .start_workflow (
24+ GreetingWorkflow .run ,
25+ "Temporal" ,
26+ id = f"infrequent-polling-{ uuid .uuid4 ()} " ,
27+ task_queue = task_queue ,
28+ )
29+ result = await handle .result ()
30+ assert result == "Hello, Temporal!"
You can’t perform that action at this time.
0 commit comments