From 3c804eae6d3d9a1d768dc835ad6847da19cdb212 Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Fri, 21 Nov 2025 23:05:24 +0600 Subject: [PATCH 1/5] fix: Remove unnecessary call to msg.text() in graph streaming completion --- agentflow_cli/src/app/routers/graph/services/graph_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/agentflow_cli/src/app/routers/graph/services/graph_service.py b/agentflow_cli/src/app/routers/graph/services/graph_service.py index acf922c..12049d0 100644 --- a/agentflow_cli/src/app/routers/graph/services/graph_service.py +++ b/agentflow_cli/src/app/routers/graph/services/graph_service.py @@ -308,7 +308,6 @@ async def stream_graph( logger.info("Graph streaming completed successfully") if meta["is_new_thread"] and self.config.thread_name_generator_path: - messages_str = [msg.text() for msg in messages_str] thread_name = await self._save_thread_name( config, config["thread_id"], messages_str ) From 36b8e9e1ffbbec4584422c7373c08ed35d6e848a Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Fri, 21 Nov 2025 23:13:08 +0600 Subject: [PATCH 2/5] fix: Enhance fix_graph operation with improved logging and error handling --- .../routers/graph/services/graph_service.py | 115 ++++++++++-------- graph/react.py | 74 ----------- 2 files changed, 67 insertions(+), 122 deletions(-) diff --git a/agentflow_cli/src/app/routers/graph/services/graph_service.py b/agentflow_cli/src/app/routers/graph/services/graph_service.py index 12049d0..d79b550 100644 --- a/agentflow_cli/src/app/routers/graph/services/graph_service.py +++ b/agentflow_cli/src/app/routers/graph/services/graph_service.py @@ -371,66 +371,85 @@ async def fix_graph( Raises: HTTPException: If the operation fails """ + try: + logger.info(f"Starting fix graph operation for thread: {thread_id}") + logger.debug(f"User info: {user}") - logger.info(f"Starting fix graph operation for thread: {thread_id}") - logger.debug(f"User info: {user}") - - fix_config = { - "thread_id": thread_id, - "user": user, - } + fix_config = { + "thread_id": thread_id, + "user": user, + } - # Merge additional config if provided - if config: - fix_config.update(config) + # Merge additional config if provided + if config: + fix_config.update(config) + + logger.debug("Fetching current state from checkpointer") + state: AgentState | None = await self.checkpointer.aget_state(fix_config) + + if not state: + logger.warning(f"No state found for thread: {thread_id}") + return { + "success": False, + "message": f"No state found for thread: {thread_id}", + "removed_count": 0, + "state": None, + } - logger.debug("Fetching current state from checkpointer") - state: AgentState | None = await self.checkpointer.aget_state(fix_config) + messages: list[Message] = state.context + logger.debug(f"Found {len(messages)} messages in state") - if not state: - logger.warning(f"No state found for thread: {thread_id}") - return { - "success": False, - "message": f"No state found for thread: {thread_id}", - "removed_count": 0, - "state": None, - } + if not messages: + logger.info("No messages found in state, nothing to fix") + return { + "success": True, + "message": "No messages found in state", + "removed_count": 0, + "state": state.model_dump_json(), + } - messages: list[Message] = state.context - logger.debug(f"Found {len(messages)} messages in state") + new_messages = [] + removed_count = 0 + + for msg in messages: + should_remove = False + if msg.tools_calls: + for tool_call in msg.tools_calls: + content = None + if isinstance(tool_call, dict): + content = tool_call.get("content") + else: + content = getattr(tool_call, "content", None) + + if content is None or content == "": + should_remove = True + break + + if should_remove: + removed_count += 1 + else: + new_messages.append(msg) + + if removed_count > 0: + state.context = new_messages + await self.checkpointer.aput_state(fix_config, state) + return { + "success": True, + "message": f"Successfully removed {removed_count} message(s)", + "removed_count": removed_count, + "state": state.model_dump_json(), + } - if not messages: - logger.info("No messages found in state, nothing to fix") return { "success": True, - "message": "No messages found in state", + "message": "No messages with empty tool calls found", "removed_count": 0, "state": state.model_dump_json(), } - last_message = messages[-1] - updated_context = [] - if last_message.role == "assistant" and last_message.tools_calls: - updated_context = messages[:-1] - state.context = updated_context - await self.checkpointer.aput_state(fix_config, state) - return { - "success": True, - "message": "Removed last assistant message with empty tool calls", - "removed_count": 1, - "state": state.model_dump_json(), - } - else: - logger.warning( - "Last message is not an assistant message with tool calls, skipping it from checks." - ) - - return { - "success": True, - "message": "No messages with empty tool calls found", - "removed_count": 0, - "state": state.model_dump_json(), - } + except Exception as e: + logger.error(f"Fix graph operation failed: {e}") + raise HTTPException(status_code=500, detail=f"Fix graph operation failed: {e!s}") async def setup(self, data: GraphSetupSchema) -> dict: # lets create tools diff --git a/graph/react.py b/graph/react.py index 7dd5c38..0639ec1 100644 --- a/graph/react.py +++ b/graph/react.py @@ -1,37 +1,3 @@ -""" -Graph-based React Agent Implementation - -This module implements a reactive agent system using PyAgenity's StateGraph. -The agent can interact with tools (like weather checking) and maintain conversation -state through a checkpointer. The graph orchestrates the flow between the main -agent logic and tool execution. - -Key Components: -- Weather tool: Demonstrates tool calling with dependency injection -- Main agent: AI-powered assistant that can use tools -- Graph flow: Conditional routing based on tool usage -- Checkpointer: Maintains conversation state across interactions - -Architecture: -The system uses a state graph with two main nodes: -1. MAIN: Processes user input and generates AI responses -2. TOOL: Executes tool calls when requested by the AI - -The graph conditionally routes between these nodes based on whether -the AI response contains tool calls. Conversation history is maintained -through the checkpointer, allowing for multi-turn conversations. - -Tools are defined as functions with JSON schema docstrings that describe -their interface for the AI model. The ToolNode automatically extracts -these schemas for tool selection. - -Dependencies: -- PyAgenity: For graph and state management -- LiteLLM: For AI model interactions -- InjectQ: For dependency injection -- Python logging: For debug and info messages -""" - import logging from typing import Any @@ -70,46 +36,6 @@ class MyAgentState(AgentState): checkpointer = InMemoryCheckpointer[MyAgentState]() -""" -Note: The docstring below will be used as the tool description and it will be -passed to the AI model for tool selection, so keep it relevant and concise. -This function will be converted to a tool with the following schema: -[ - { - 'type': 'function', - 'function': { - 'name': 'get_weather', - 'description': 'Retrieve current weather information for a specified location.', - 'parameters': { - 'type': 'object', - 'properties': { - 'location': {'type': 'string'} - }, - 'required': ['location'] - } - } - } - ] - -Parameters like tool_call_id, state, and checkpointer are injected automatically -by InjectQ when the tool is called by the agent. -Available injected parameters: -The following parameters are automatically injected by InjectQ when the tool is called, -but need to keep them as same name and type for proper injection: -- tool_call_id: Unique ID for the tool call -- state: Current AgentState containing conversation context -- config: Configuration dictionary passed during graph invocation - -Below fields need to be used with Inject[] to get the instances: -- context_manager: ContextManager instance for managing context, like trimming -- publisher: Publisher instance for publishing events and logs -- checkpointer: InMemoryCheckpointer instance for state management -- store: InMemoryStore instance for temporary data storage -- callback: CallbackManager instance for handling callbacks - -""" - - def get_weather( location: str, tool_call_id: str, From 096b87685834fb21aeb9c36fddb0ec2b2a70a55e Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Fri, 21 Nov 2025 23:16:48 +0600 Subject: [PATCH 3/5] fix: Add method to check for empty tool calls in messages during graph fixing --- .../routers/graph/services/graph_service.py | 72 ++++++++----------- 1 file changed, 30 insertions(+), 42 deletions(-) diff --git a/agentflow_cli/src/app/routers/graph/services/graph_service.py b/agentflow_cli/src/app/routers/graph/services/graph_service.py index d79b550..b5bb17f 100644 --- a/agentflow_cli/src/app/routers/graph/services/graph_service.py +++ b/agentflow_cli/src/app/routers/graph/services/graph_service.py @@ -343,6 +343,25 @@ async def get_state_schema(self) -> dict: logger.error(f"Failed to get state schema: {e}") raise HTTPException(status_code=500, detail=f"Failed to get state schema: {e!s}") + def _has_empty_tool_call(self, msg: Message) -> bool: + """Return True if any tool call on the message has empty content. + + A tool call is considered empty if its ``content`` attribute/key is ``None`` or + an empty string. Tool calls may be dict-like or objects with a ``content`` attribute. + """ + tool_calls = getattr(msg, "tools_calls", None) + if not tool_calls: + return False + for tool_call in tool_calls: + content = ( + tool_call.get("content") + if isinstance(tool_call, dict) + else getattr(tool_call, "content", None) + ) + if content in (None, ""): + return True + return False + async def fix_graph( self, thread_id: str, @@ -375,18 +394,12 @@ async def fix_graph( logger.info(f"Starting fix graph operation for thread: {thread_id}") logger.debug(f"User info: {user}") - fix_config = { - "thread_id": thread_id, - "user": user, - } - - # Merge additional config if provided + fix_config = {"thread_id": thread_id, "user": user} if config: fix_config.update(config) logger.debug("Fetching current state from checkpointer") state: AgentState | None = await self.checkpointer.aget_state(fix_config) - if not state: logger.warning(f"No state found for thread: {thread_id}") return { @@ -396,11 +409,9 @@ async def fix_graph( "state": None, } - messages: list[Message] = state.context + messages: list[Message] = list(state.context or []) logger.debug(f"Found {len(messages)} messages in state") - if not messages: - logger.info("No messages found in state, nothing to fix") return { "success": True, "message": "No messages found in state", @@ -408,45 +419,22 @@ async def fix_graph( "state": state.model_dump_json(), } - new_messages = [] - removed_count = 0 - - for msg in messages: - should_remove = False - if msg.tools_calls: - for tool_call in msg.tools_calls: - content = None - if isinstance(tool_call, dict): - content = tool_call.get("content") - else: - content = getattr(tool_call, "content", None) - - if content is None or content == "": - should_remove = True - break - - if should_remove: - removed_count += 1 - else: - new_messages.append(msg) + filtered = [m for m in messages if not self._has_empty_tool_call(m)] + removed_count = len(messages) - len(filtered) - if removed_count > 0: - state.context = new_messages + if removed_count: + state.context = filtered await self.checkpointer.aput_state(fix_config, state) - return { - "success": True, - "message": f"Successfully removed {removed_count} message(s)", - "removed_count": removed_count, - "state": state.model_dump_json(), - } + message = f"Successfully removed {removed_count} message(s)" + else: + message = "No messages with empty tool calls found" return { "success": True, - "message": "No messages with empty tool calls found", - "removed_count": 0, + "message": message, + "removed_count": removed_count, "state": state.model_dump_json(), } - except Exception as e: logger.error(f"Fix graph operation failed: {e}") raise HTTPException(status_code=500, detail=f"Fix graph operation failed: {e!s}") From f22ba7cab8a995d2a90c75f94b8f61301b9ebc23 Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Fri, 21 Nov 2025 23:26:20 +0600 Subject: [PATCH 4/5] fix: Update error handling to replace ResourceNotFoundError with APIResourceNotFoundError --- .../src/app/core/exceptions/handle_errors.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/agentflow_cli/src/app/core/exceptions/handle_errors.py b/agentflow_cli/src/app/core/exceptions/handle_errors.py index 1cecc49..8fe65cd 100644 --- a/agentflow_cli/src/app/core/exceptions/handle_errors.py +++ b/agentflow_cli/src/app/core/exceptions/handle_errors.py @@ -4,7 +4,6 @@ GraphRecursionError, MetricsError, NodeError, - ResourceNotFoundError, SchemaVersionError, SerializationError, StorageError, @@ -40,7 +39,7 @@ def init_errors_handler(app: FastAPI): ValueError: Handles value errors. UserAccountError: Handles custom user account errors. UserPermissionError: Handles custom user permission errors. - ResourceNotFoundError: Handles custom resource not found errors. + APIResourceNotFoundError: Handles custom API resource not found errors. """ @app.exception_handler(HTTPException) @@ -208,16 +207,3 @@ async def transient_storage_error_exception_handler( details=getattr(exc, "context", None), status_code=503, ) - - @app.exception_handler(ResourceNotFoundError) - async def resource_not_found_storage_exception_handler( - request: Request, exc: ResourceNotFoundError - ): - logger.error(f"ResourceNotFoundError: url: {request.base_url}", exc_info=exc) - return error_response( - request, - error_code=getattr(exc, "error_code", "RESOURCE_NOT_FOUND_000"), - message=getattr(exc, "message", str(exc)), - details=getattr(exc, "context", None), - status_code=404, - ) From 9eb721c0afd150e3a957b393355904ab11a06d94 Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Fri, 21 Nov 2025 23:28:39 +0600 Subject: [PATCH 5/5] fix: Update dependency version for 10xscale-agentflow to 0.5.0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 7213684..8b8373c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ classifiers = [ "Topic :: Internet :: WWW/HTTP :: HTTP Servers", ] dependencies = [ - "10xscale-agentflow>=0.4.0", + "10xscale-agentflow>=0.5.0", "fastapi", "gunicorn", "orjson",