Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ env:
jobs:
# Build and test the project
build-lint-test:
timeout-minutes: 30
timeout-minutes: 35
strategy:
fail-fast: false
matrix:
Expand Down
1 change: 1 addition & 0 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2963,6 +2963,7 @@ async def result(
rpc_metadata=rpc_metadata,
rpc_timeout=rpc_timeout,
):
print("Getting history event:", event)
if event.HasField("workflow_execution_completed_event_attributes"):
complete_attr = event.workflow_execution_completed_event_attributes
# Follow execution
Expand Down
2 changes: 1 addition & 1 deletion temporalio/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.LoggingConfig:


LoggingConfig.default = LoggingConfig(
filter=TelemetryFilter(core_level="WARN", other_level="ERROR")
filter=TelemetryFilter(core_level="DEBUG", other_level="ERROR")
)

_module_start_time = time.time()
Expand Down
16 changes: 16 additions & 0 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,8 @@ async def raise_on_shutdown():
f"Beginning worker shutdown, will wait {graceful_timeout} before cancelling activities"
)

print("Initiate bridge worker shutdown")

# Initiate core worker shutdown
self._bridge_worker.initiate_shutdown()

Expand All @@ -844,6 +846,8 @@ async def raise_on_shutdown():
if worker and task.done() and task.exception():
tasks[worker] = asyncio.create_task(worker.drain_poll_queue())

print("Exception tasks replaced")

# Notify shutdown occurring
if self._activity_worker:
self._activity_worker.notify_shutdown()
Expand All @@ -852,23 +856,35 @@ async def raise_on_shutdown():
if self._nexus_worker:
self._nexus_worker.notify_shutdown()

print("Workers notified")

# Wait for all tasks to complete (i.e. for poller loops to stop)
await asyncio.wait(tasks.values())

print("Tasks drained: ", tasks)

# Sometimes both workers throw an exception and since we only take the
# first, Python may complain with "Task exception was never retrieved"
# if we don't get the others. Therefore we call cancel on each task
# which suppresses this.
for task in tasks.values():
task.cancel()

print("Tasks cancelled")

# Let all activity / nexus operations completions finish. We cannot guarantee that
# because poll shutdown completed (which means activities/operations completed)
# that they got flushed to the server.
if self._activity_worker:
await self._activity_worker.wait_all_completed()

print("Activitiy worker waited")

if self._nexus_worker:
await self._nexus_worker.wait_all_completed()

print("Finalize bridge worker shutdown")

# Do final shutdown
try:
await self._bridge_worker.finalize_shutdown()
Expand Down
2 changes: 2 additions & 0 deletions temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,10 @@ async def run(
for t in asyncio.all_tasks()
if getattr(t, "__temporal_task_tag", None) is task_tag
]
print("Waiting for tasks to finish: ", our_tasks)
if our_tasks:
await asyncio.wait(our_tasks)
print("Waited for tasks to finish: ", our_tasks)
# Shutdown the thread pool executor if we created it
if not self._workflow_task_executor_user_provided:
self._workflow_task_executor.shutdown()
Expand Down
4 changes: 4 additions & 0 deletions tests/worker/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,15 @@ async def capture_info() -> None:
nonlocal info
info = activity.info()

print("Executing workflow")
result = await _execute_workflow_with_activity(
client,
worker,
capture_info,
start_to_close_timeout_ms=4000,
shared_state_manager=shared_state_manager,
)
print("Executed workflow")

assert info
assert info.activity_id # type:ignore[reportUnreachable]
Expand Down Expand Up @@ -1621,12 +1623,14 @@ async def _execute_workflow_with_activity(
task_queue=worker.task_queue,
result_type=result_type_override,
)
print("Constructing activity result")
return _ActivityResult(
act_task_queue=worker_config["task_queue"],
result=await handle.result(),
handle=handle,
)
finally:
print("On complete.")
if on_complete:
on_complete()

Expand Down
Loading