Skip to content

[Bug]: DefaultRequestHandlerV2 ActiveTask producer can remain pending during EventQueue subscriber cleanup #1101

@r4ntix

Description

@r4ntix

What happened?

We are using a2a-sdk==1.1.0 on Python 3.12 with the standard DefaultRequestHandler, which resolves to DefaultRequestHandlerV2.

Under a concurrent A2A request scenario, one request was rejected/finished at the application layer and persisted as a terminal task state, but the SDK-created background producer task did not fully complete. Later, when the event loop / worker was shutting down, asyncio reported:

Task was destroyed but it is pending!

The pending task was the SDK ActiveTask._run_producer() task named producer:<task_id>. The coroutine was stuck in the producer finally block at:

await self._event_queue_subscribers.close(immediate=False)

From reading the SDK code, EventQueueSource.close(immediate=False) waits for already queued events and child sinks to drain. If a subscriber/tapped sink is no longer being consumed, this can leave the producer cleanup pending indefinitely.

Expected behavior: DefaultRequestHandlerV2 / ActiveTask should not leave SDK-owned producer tasks pending after the request has already reached a terminal application state. Cleanup of subscriber queues should either complete reliably or be bounded/forced in a way that does not leave a background task alive until event loop destruction.

This also causes confusing observability because the asyncio warning is emitted later from the event loop, not from the original request context.

Relevant log output

level: ERROR
logger_name: asyncio
message: Task was destroyed but it is pending!
task: <Task pending name='producer:task-REDACTED'
  coro=<ActiveTask._run_producer() running at
  /path/to/.venv/lib/python3.12/site-packages/a2a/server/agent_execution/active_task.py:566>
  wait_for=<_GatheringFuture pending cb=[Task.task_wakeup()]>>


Relevant SDK location:


# a2a/server/agent_execution/active_task.py

finally:
    self._request_queue.shutdown(immediate=True)
    await self._event_queue_agent.close(immediate=False)
    await self._event_queue_subscribers.close(immediate=False)
    logger.debug('Producer[%s]: Completed', self._task_id)


Environment:


Python: 3.12
a2a-sdk: 1.1.0
Request handler: a2a.server.request_handlers.default_request_handler_v2.DefaultRequestHandlerV2
Transport: HTTP / streaming A2A endpoint

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions