Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions agentflow/graph/utils/stream_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ async def _check_interrupted(
logger.info(
"Resuming from interrupted state at node '%s'", state.execution_meta.current_node
)
# Save the interrupted node info before clearing so we don't re-interrupt
config["_skip_interrupt_at"] = {
"node": state.execution_meta.interrupted_node,
"status": state.execution_meta.status,
}
Comment on lines +103 to +106
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On resume, _skip_interrupt_at is set to execution_meta.interrupted_node unconditionally. For INTERRUPTED_AFTER the state often advances current_node to the next node before pausing (see _execute_graph), so this flag may never be consumed/cleared and can later suppress a legitimate interrupt if the graph cycles back to the old node. Consider only setting this skip flag when interrupted_node == current_node (i.e., when resuming will immediately re-check the same node), or otherwise clearing/avoiding the flag for cases where current_node has already moved on.

Suggested change
config["_skip_interrupt_at"] = {
"node": state.execution_meta.interrupted_node,
"status": state.execution_meta.status,
}
interrupted_node = state.execution_meta.interrupted_node
if interrupted_node == state.execution_meta.current_node:
config["_skip_interrupt_at"] = {
"node": interrupted_node,
"status": state.execution_meta.status,
}
else:
# If we've already advanced to a different node, don't carry a stale skip flag
config.pop("_skip_interrupt_at", None)

Copilot uses AI. Check for mistakes.
# This is a resume case - clear interrupt and merge input data
if input_data:
config["resume_data"] = input_data
Expand Down Expand Up @@ -143,6 +148,24 @@ async def _check_and_handle_interrupt(
self.interrupt_before if interrupt_type == "before" else self.interrupt_after
) or []

# Check if we just resumed from an interrupt at this node with this type
skip_info = config.get("_skip_interrupt_at", {})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This portion not required

if skip_info.get("node") == current_node:
expected_status = (
ExecutionStatus.INTERRUPTED_BEFORE
if interrupt_type == "before"
else ExecutionStatus.INTERRUPTED_AFTER
)
if skip_info.get("status") == expected_status:
logger.debug(
"Skipping %s interrupt check for node '%s' - just resumed from it",
interrupt_type,
current_node,
)
# Clear the flag after using it once
config.pop("_skip_interrupt_at", None)
return False
Comment on lines +151 to +167
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change alters streaming interrupt/resume behavior, but there doesn’t appear to be an integration test that resumes a streaming run (via CompiledGraph.astream) with interrupt_before/after configured to ensure it doesn’t immediately re-interrupt after resume. Adding a focused test would help prevent regressions between InvokeHandler and StreamHandler.

Copilot uses AI. Check for mistakes.

if current_node in interrupt_nodes:
status = (
ExecutionStatus.INTERRUPTED_BEFORE
Expand Down