From 66323d423921871c70887ff250e83364d3430f82 Mon Sep 17 00:00:00 2001 From: sarthak422004 Date: Wed, 18 Feb 2026 19:21:33 +0530 Subject: [PATCH] fix: Enhance interrupt handling in StreamHandler to prevent re-interruption after resume --- agentflow/graph/utils/stream_handler.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/agentflow/graph/utils/stream_handler.py b/agentflow/graph/utils/stream_handler.py index ef2f734..4332574 100644 --- a/agentflow/graph/utils/stream_handler.py +++ b/agentflow/graph/utils/stream_handler.py @@ -99,6 +99,11 @@ async def _check_interrupted( logger.info( "Resuming from interrupted state at node '%s'", state.execution_meta.current_node ) + # Save the interrupted node info before clearing so we don't re-interrupt + config["_skip_interrupt_at"] = { + "node": state.execution_meta.interrupted_node, + "status": state.execution_meta.status, + } # This is a resume case - clear interrupt and merge input data if input_data: config["resume_data"] = input_data @@ -143,6 +148,24 @@ async def _check_and_handle_interrupt( self.interrupt_before if interrupt_type == "before" else self.interrupt_after ) or [] + # Check if we just resumed from an interrupt at this node with this type + skip_info = config.get("_skip_interrupt_at", {}) + if skip_info.get("node") == current_node: + expected_status = ( + ExecutionStatus.INTERRUPTED_BEFORE + if interrupt_type == "before" + else ExecutionStatus.INTERRUPTED_AFTER + ) + if skip_info.get("status") == expected_status: + logger.debug( + "Skipping %s interrupt check for node '%s' - just resumed from it", + interrupt_type, + current_node, + ) + # Clear the flag after using it once + config.pop("_skip_interrupt_at", None) + return False + if current_node in interrupt_nodes: status = ( ExecutionStatus.INTERRUPTED_BEFORE