fix: Enhance interrupt handling in StreamHandler to prevent re-interruption after resume#49
Conversation
…uption after resume
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
Updates streaming graph execution to avoid immediately re-triggering configured interrupts when resuming from an interrupted state, improving pause/resume behavior in the StreamHandler used by CompiledGraph.astream().
Changes:
- Persist a one-time “skip interrupt” marker in
configduring resume to prevent re-interrupting at the same node. - Add logic in
_check_and_handle_interrupt()to bypass a single interrupt check after resume and then clear the marker.
| # Check if we just resumed from an interrupt at this node with this type | ||
| skip_info = config.get("_skip_interrupt_at", {}) | ||
| if skip_info.get("node") == current_node: | ||
| expected_status = ( | ||
| ExecutionStatus.INTERRUPTED_BEFORE | ||
| if interrupt_type == "before" | ||
| else ExecutionStatus.INTERRUPTED_AFTER | ||
| ) | ||
| if skip_info.get("status") == expected_status: | ||
| logger.debug( | ||
| "Skipping %s interrupt check for node '%s' - just resumed from it", | ||
| interrupt_type, | ||
| current_node, | ||
| ) | ||
| # Clear the flag after using it once | ||
| config.pop("_skip_interrupt_at", None) | ||
| return False |
There was a problem hiding this comment.
This change alters streaming interrupt/resume behavior, but there doesn’t appear to be an integration test that resumes a streaming run (via CompiledGraph.astream) with interrupt_before/after configured to ensure it doesn’t immediately re-interrupt after resume. Adding a focused test would help prevent regressions between InvokeHandler and StreamHandler.
| config["_skip_interrupt_at"] = { | ||
| "node": state.execution_meta.interrupted_node, | ||
| "status": state.execution_meta.status, | ||
| } |
There was a problem hiding this comment.
On resume, _skip_interrupt_at is set to execution_meta.interrupted_node unconditionally. For INTERRUPTED_AFTER the state often advances current_node to the next node before pausing (see _execute_graph), so this flag may never be consumed/cleared and can later suppress a legitimate interrupt if the graph cycles back to the old node. Consider only setting this skip flag when interrupted_node == current_node (i.e., when resuming will immediately re-check the same node), or otherwise clearing/avoiding the flag for cases where current_node has already moved on.
| config["_skip_interrupt_at"] = { | |
| "node": state.execution_meta.interrupted_node, | |
| "status": state.execution_meta.status, | |
| } | |
| interrupted_node = state.execution_meta.interrupted_node | |
| if interrupted_node == state.execution_meta.current_node: | |
| config["_skip_interrupt_at"] = { | |
| "node": interrupted_node, | |
| "status": state.execution_meta.status, | |
| } | |
| else: | |
| # If we've already advanced to a different node, don't carry a stale skip flag | |
| config.pop("_skip_interrupt_at", None) |
| ) or [] | ||
|
|
||
| # Check if we just resumed from an interrupt at this node with this type | ||
| skip_info = config.get("_skip_interrupt_at", {}) |
|
Not required, already solved: 0.6.0 |
No description provided.