Python: Add SSE keepalive interval to AG-UI FastAPI endpoint#6647
Python: Add SSE keepalive interval to AG-UI FastAPI endpoint#6647vaibhav-patel wants to merge 1 commit into
Conversation
The AG-UI FastAPI endpoint served the event stream as a bare
StreamingResponse with no application-level heartbeat. During long silent
gaps between agent events (slow server-side tools such as OCR or retrieval),
idle-timeout proxies in front of the endpoint (Azure ingress, nginx,
serverless front doors) drop the otherwise-healthy connection and the client
sees a spurious HTTP 500.
Wrap the event-stream generator so that when no upstream event is produced
within a configurable interval, a transport-level SSE keepalive comment
(": keepalive") is written to the wire. SSE comment lines are a protocol
no-op that clients and parsers ignore, so real events still flush immediately
and in order while only idle gaps trigger a ping.
The interval is exposed via the new keepalive_interval_seconds parameter on
add_agent_framework_fastapi_endpoint (default 15.0; pass None to disable).
The upstream generator is drained by a single dedicated task so its entire
lifecycle stays in one contextvars context, which the agent run/telemetry
pipeline requires for its ContextVar cleanup hooks.
Fixes microsoft#6611.
|
@vaibhav-patel please read the following Contributor License Agreement(CLA). If you agree with the CLA, please reply with the following information.
Contributor License AgreementContribution License AgreementThis Contribution License Agreement (“Agreement”) is agreed to by the party signing below (“You”),
|
There was a problem hiding this comment.
Pull request overview
This PR adds an application-level SSE keepalive mechanism to the Python AG-UI FastAPI endpoint so that long idle gaps in the event stream don’t get dropped by idle-timeout proxies (e.g., Azure ingress/nginx), while preserving event order and immediate flushing for real events.
Changes:
- Introduces an SSE keepalive wrapper (
_with_sse_keepalive) that injects: keepalivecomment frames during idle periods. - Adds a new
keepalive_interval_secondsparameter toadd_agent_framework_fastapi_endpoint(default15.0,Nonedisables; non-positive rejected). - Adds unit/acceptance tests covering wrapper behavior and endpoint configuration/validation.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
python/packages/ag-ui/agent_framework_ag_ui/_endpoint.py |
Adds keepalive wrapper and wires it into the StreamingResponse pipeline via a new endpoint parameter. |
python/packages/ag-ui/tests/ag_ui/test_endpoint.py |
Adds tests for keepalive wrapper behavior plus endpoint-level enable/disable/validation checks. |
| them. Racing each individual pull with ``asyncio.wait_for`` would instead scatter the pulls | ||
| across contexts and break that cleanup. | ||
| """ | ||
| queue: asyncio.Queue[str | type[_StreamEnd] | Exception] = asyncio.Queue() |
| # Use a tiny interval so the idle gap reliably trips several keepalives without slow tests. | ||
| wrapped = _with_sse_keepalive(upstream(), 0.01) | ||
|
|
Fixes #6611.
Problem
add_agent_framework_fastapi_endpointserves the AG-UI event stream as a bareStreamingResponse(media_type="text/event-stream")with no application-level heartbeat.During long silent gaps between agent events (slow server-side tools like OCR or
retrieval), idle-timeout proxies in front of the endpoint (Azure ingress, nginx,
serverless front doors) drop the healthy connection and the client sees a spurious
HTTP 500. The
Connection: keep-aliveheader is TCP-level only and emits no applicationdata during an idle stream.
Fix
Wrap the event-stream generator so that when no upstream event is produced within a
configurable interval, a transport-level SSE keepalive comment (
: keepalive) is writtento the wire. SSE comment lines are a protocol no-op that clients and parsers ignore, so
real events still flush immediately and in order while only idle gaps trigger a ping.
keepalive_interval_secondsonadd_agent_framework_fastapi_endpoint(default
15.0; passNoneto disable). Non-positive values raiseValueError.asyncio.Queue,keeping its entire lifecycle in one
contextvarscontext. This is required by the agentrun/telemetry pipeline, whose streaming cleanup resets
ContextVartokens that must bereset in the context that created them (racing individual pulls with
asyncio.wait_forbreaks that and was observed to raise "Token was created in a different Context"). Client
disconnects cancel the producer so the upstream generator closes cleanly.
StreamingResponse; did not pull insse-starlette).Tests
Added focused async tests in
tests/ag_ui/test_endpoint.py: a keepalive is emitted duringan idle gap while real events still pass through in order; no keepalive when events flow
back-to-back; empty stream and upstream-error paths; and endpoint-level acceptance,
disable (
None), and validation. Fulltest_endpoint.py(59) and the HTTP round-tripsuites pass;
ruffandmypyare clean.Note
The keepalive defaults on (15s) and emits only benign SSE comment lines during idle gaps.
If you'd prefer it opt-in, defaulting
keepalive_interval_secondstoNoneis a one-linechange.