diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d1223cf44..c9d000d20 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ env: jobs: # Build and test the project build-lint-test: - timeout-minutes: 30 + timeout-minutes: 35 strategy: fail-fast: false matrix: diff --git a/temporalio/client.py b/temporalio/client.py index cc2750ec6..40a5a023c 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -2963,6 +2963,7 @@ async def result( rpc_metadata=rpc_metadata, rpc_timeout=rpc_timeout, ): + print("Getting history event:", event) if event.HasField("workflow_execution_completed_event_attributes"): complete_attr = event.workflow_execution_completed_event_attributes # Follow execution diff --git a/temporalio/runtime.py b/temporalio/runtime.py index b151f96f0..c6f3e6aac 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -217,7 +217,7 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.LoggingConfig: LoggingConfig.default = LoggingConfig( - filter=TelemetryFilter(core_level="WARN", other_level="ERROR") + filter=TelemetryFilter(core_level="DEBUG", other_level="ERROR") ) _module_start_time = time.time() diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 332e2ead7..91c79b76b 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -836,6 +836,8 @@ async def raise_on_shutdown(): f"Beginning worker shutdown, will wait {graceful_timeout} before cancelling activities" ) + print("Initiate bridge worker shutdown") + # Initiate core worker shutdown self._bridge_worker.initiate_shutdown() @@ -844,6 +846,8 @@ async def raise_on_shutdown(): if worker and task.done() and task.exception(): tasks[worker] = asyncio.create_task(worker.drain_poll_queue()) + print("Exception tasks replaced") + # Notify shutdown occurring if self._activity_worker: self._activity_worker.notify_shutdown() @@ -852,8 +856,13 @@ async def raise_on_shutdown(): if self._nexus_worker: self._nexus_worker.notify_shutdown() + print("Workers notified") + # Wait for all tasks to complete (i.e. for poller loops to stop) await asyncio.wait(tasks.values()) + + print("Tasks drained: ", tasks) + # Sometimes both workers throw an exception and since we only take the # first, Python may complain with "Task exception was never retrieved" # if we don't get the others. Therefore we call cancel on each task @@ -861,14 +870,21 @@ async def raise_on_shutdown(): for task in tasks.values(): task.cancel() + print("Tasks cancelled") + # Let all activity / nexus operations completions finish. We cannot guarantee that # because poll shutdown completed (which means activities/operations completed) # that they got flushed to the server. if self._activity_worker: await self._activity_worker.wait_all_completed() + + print("Activitiy worker waited") + if self._nexus_worker: await self._nexus_worker.wait_all_completed() + print("Finalize bridge worker shutdown") + # Do final shutdown try: await self._bridge_worker.finalize_shutdown() diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index fb104b414..efc251dc3 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -210,8 +210,10 @@ async def run( for t in asyncio.all_tasks() if getattr(t, "__temporal_task_tag", None) is task_tag ] + print("Waiting for tasks to finish: ", our_tasks) if our_tasks: await asyncio.wait(our_tasks) + print("Waited for tasks to finish: ", our_tasks) # Shutdown the thread pool executor if we created it if not self._workflow_task_executor_user_provided: self._workflow_task_executor.shutdown() diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index 99efdf30f..80b5a2987 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -194,6 +194,7 @@ async def capture_info() -> None: nonlocal info info = activity.info() + print("Executing workflow") result = await _execute_workflow_with_activity( client, worker, @@ -201,6 +202,7 @@ async def capture_info() -> None: start_to_close_timeout_ms=4000, shared_state_manager=shared_state_manager, ) + print("Executed workflow") assert info assert info.activity_id # type:ignore[reportUnreachable] @@ -1621,12 +1623,14 @@ async def _execute_workflow_with_activity( task_queue=worker.task_queue, result_type=result_type_override, ) + print("Constructing activity result") return _ActivityResult( act_task_queue=worker_config["task_queue"], result=await handle.result(), handle=handle, ) finally: + print("On complete.") if on_complete: on_complete()