-
Notifications
You must be signed in to change notification settings - Fork 8
fix: Enhance interrupt handling in StreamHandler to prevent re-interruption after resume #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -99,6 +99,11 @@ async def _check_interrupted( | |
| logger.info( | ||
| "Resuming from interrupted state at node '%s'", state.execution_meta.current_node | ||
| ) | ||
| # Save the interrupted node info before clearing so we don't re-interrupt | ||
| config["_skip_interrupt_at"] = { | ||
| "node": state.execution_meta.interrupted_node, | ||
| "status": state.execution_meta.status, | ||
| } | ||
| # This is a resume case - clear interrupt and merge input data | ||
| if input_data: | ||
| config["resume_data"] = input_data | ||
|
|
@@ -143,6 +148,24 @@ async def _check_and_handle_interrupt( | |
| self.interrupt_before if interrupt_type == "before" else self.interrupt_after | ||
| ) or [] | ||
|
|
||
| # Check if we just resumed from an interrupt at this node with this type | ||
| skip_info = config.get("_skip_interrupt_at", {}) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This portion not required |
||
| if skip_info.get("node") == current_node: | ||
| expected_status = ( | ||
| ExecutionStatus.INTERRUPTED_BEFORE | ||
| if interrupt_type == "before" | ||
| else ExecutionStatus.INTERRUPTED_AFTER | ||
| ) | ||
| if skip_info.get("status") == expected_status: | ||
| logger.debug( | ||
| "Skipping %s interrupt check for node '%s' - just resumed from it", | ||
| interrupt_type, | ||
| current_node, | ||
| ) | ||
| # Clear the flag after using it once | ||
| config.pop("_skip_interrupt_at", None) | ||
| return False | ||
|
Comment on lines
+151
to
+167
|
||
|
|
||
| if current_node in interrupt_nodes: | ||
| status = ( | ||
| ExecutionStatus.INTERRUPTED_BEFORE | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On resume,
_skip_interrupt_atis set toexecution_meta.interrupted_nodeunconditionally. ForINTERRUPTED_AFTERthe state often advancescurrent_nodeto the next node before pausing (see_execute_graph), so this flag may never be consumed/cleared and can later suppress a legitimate interrupt if the graph cycles back to the old node. Consider only setting this skip flag wheninterrupted_node == current_node(i.e., when resuming will immediately re-check the same node), or otherwise clearing/avoiding the flag for cases wherecurrent_nodehas already moved on.