Python: add checkpointing support to AgentFrameworkWorkflow.run() in agent-framework-ag-ui#6646
Conversation
…ag-ui The ag-ui AgentFrameworkWorkflow.run() previously accepted only a RunAgentInput payload and exposed no way to use the core workflow's checkpointing/state-persistence, unlike the core agent-framework workflow implementations. This left ag-ui workflows without resumable execution. Add optional checkpoint_storage and checkpoint_id keyword arguments to run(), threaded through run_workflow_stream() into the core Workflow.run(). This delegates to the existing core capability instead of reinventing it and keeps the public surface consistent with Workflow.run(): - checkpoint_storage enables checkpoint creation at each superstep boundary. - checkpoint_id resumes a run from a persisted checkpoint; incoming messages are forwarded only as request-info responses (never as a new start-executor message) to honor the core's message/checkpoint_id mutual exclusivity, and responses + checkpoint_id performs a restore-then-send in one call. Both can also be supplied via the input_data keys __ag_ui_checkpoint_storage and __ag_ui_checkpoint_id so the FastAPI endpoint (which calls run(input_data) positionally) can opt in without changing its call site; explicit keyword arguments take precedence. Checkpoint resume bypasses the AG-UI thread snapshot hydration early-returns so it always reaches the core restore path. Backward compatible: run(input_data) keeps working unchanged, and the non-checkpoint path still calls run_workflow_stream(input_data, workflow) with its original two-argument convention. Adds focused tests covering checkpoint creation, resume-from-checkpoint, input-data-keyed params, and the unchanged default path. Fixes microsoft#6632.
|
@vaibhav-patel please read the following Contributor License Agreement(CLA). If you agree with the CLA, please reply with the following information.
Contributor License AgreementContribution License AgreementThis Contribution License Agreement (“Agreement”) is agreed to by the party signing below (“You”),
|
There was a problem hiding this comment.
Pull request overview
This PR adds workflow checkpointing support to the agent-framework-ag-ui adapter so AgentFrameworkWorkflow.run() can create checkpoints and resume from a persisted checkpoint_id, mirroring the core agent_framework.Workflow.run() API.
Changes:
- Extended
AgentFrameworkWorkflow.run()to acceptcheckpoint_storage/checkpoint_id(and optionally read them frominput_data), ensuring checkpoint resumes bypass snapshot-only hydration paths. - Updated
run_workflow_stream()to forward checkpointing parameters to the core workflow and to avoid pre-run early-returns that would otherwise skip restores. - Added focused tests covering checkpoint creation, resume, input-data-keyed params, and the unchanged default path.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
python/packages/ag-ui/agent_framework_ag_ui/_workflow.py |
Adds checkpoint kwargs to AgentFrameworkWorkflow.run() and plumbs them through to the stream runner, while preserving existing snapshot behavior. |
python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py |
Extends run_workflow_stream() with checkpoint parameters and ensures checkpoint resumes reach workflow.run(checkpoint_id=...). |
python/packages/ag-ui/tests/ag_ui/test_workflow_agent.py |
Adds checkpointing tests for the AG-UI workflow wrapper. |
| from agent_framework import ( | ||
| InMemoryCheckpointStorage, | ||
| Workflow, | ||
| WorkflowBuilder, | ||
| WorkflowContext, | ||
| executor, | ||
| handler, | ||
| ) | ||
| from agent_framework._workflows._executor import Executor | ||
|
|
Summary
Fixes #6632. Adds workflow checkpointing to the ag-ui adapter's
AgentFrameworkWorkflow.run(), bringing it to parity with the coreagent_framework.Workflow.run()checkpointing API.run()now accepts two optional keyword arguments:checkpoint_storage: CheckpointStorage | None— enables checkpoint creation at eachsuperstep (forwarded to the core workflow).
checkpoint_id: str | None— resumes a run from a persisted checkpoint.This delegates to the existing core capability rather than reinventing it, so the public
surface mirrors
Workflow.run(checkpoint_storage=..., checkpoint_id=...).Details
messageis never passed alongsidecheckpoint_id; incoming messages are forwarded only as request-inforesponses(so
responses+checkpoint_idis a restore-then-send).input_datakeys__ag_ui_checkpoint_storage/__ag_ui_checkpoint_id, letting the FastAPI endpoint (which callsrun(input_data)positionally) opt in without a signature change; explicit keyword args take precedence.
bypasses snapshot-hydration early-returns to reach the core restore path.
Backward compatibility
The existing
run(RunAgentInput)/run(input_data)signature is preserved — the newparams are optional and default to
None, and the non-checkpoint path still callsrun_workflow_stream(input_data, workflow)with its original two-argument convention.Tests
New focused tests cover checkpoint creation, resume-from-checkpoint, input-data-keyed
params, and the unchanged default path. Full ag-ui suite green (824 passed); ruff
lint/format clean; no new pyright errors.