diff --git a/agentflow_cli/src/app/core/exceptions/handle_errors.py b/agentflow_cli/src/app/core/exceptions/handle_errors.py index 1cecc49..8fe65cd 100644 --- a/agentflow_cli/src/app/core/exceptions/handle_errors.py +++ b/agentflow_cli/src/app/core/exceptions/handle_errors.py @@ -4,7 +4,6 @@ GraphRecursionError, MetricsError, NodeError, - ResourceNotFoundError, SchemaVersionError, SerializationError, StorageError, @@ -40,7 +39,7 @@ def init_errors_handler(app: FastAPI): ValueError: Handles value errors. UserAccountError: Handles custom user account errors. UserPermissionError: Handles custom user permission errors. - ResourceNotFoundError: Handles custom resource not found errors. + APIResourceNotFoundError: Handles custom API resource not found errors. """ @app.exception_handler(HTTPException) @@ -208,16 +207,3 @@ async def transient_storage_error_exception_handler( details=getattr(exc, "context", None), status_code=503, ) - - @app.exception_handler(ResourceNotFoundError) - async def resource_not_found_storage_exception_handler( - request: Request, exc: ResourceNotFoundError - ): - logger.error(f"ResourceNotFoundError: url: {request.base_url}", exc_info=exc) - return error_response( - request, - error_code=getattr(exc, "error_code", "RESOURCE_NOT_FOUND_000"), - message=getattr(exc, "message", str(exc)), - details=getattr(exc, "context", None), - status_code=404, - ) diff --git a/agentflow_cli/src/app/routers/graph/services/graph_service.py b/agentflow_cli/src/app/routers/graph/services/graph_service.py index acf922c..b5bb17f 100644 --- a/agentflow_cli/src/app/routers/graph/services/graph_service.py +++ b/agentflow_cli/src/app/routers/graph/services/graph_service.py @@ -308,7 +308,6 @@ async def stream_graph( logger.info("Graph streaming completed successfully") if meta["is_new_thread"] and self.config.thread_name_generator_path: - messages_str = [msg.text() for msg in messages_str] thread_name = await self._save_thread_name( config, config["thread_id"], messages_str ) @@ -344,6 +343,25 @@ async def get_state_schema(self) -> dict: logger.error(f"Failed to get state schema: {e}") raise HTTPException(status_code=500, detail=f"Failed to get state schema: {e!s}") + def _has_empty_tool_call(self, msg: Message) -> bool: + """Return True if any tool call on the message has empty content. + + A tool call is considered empty if its ``content`` attribute/key is ``None`` or + an empty string. Tool calls may be dict-like or objects with a ``content`` attribute. + """ + tool_calls = getattr(msg, "tools_calls", None) + if not tool_calls: + return False + for tool_call in tool_calls: + content = ( + tool_call.get("content") + if isinstance(tool_call, dict) + else getattr(tool_call, "content", None) + ) + if content in (None, ""): + return True + return False + async def fix_graph( self, thread_id: str, @@ -372,66 +390,54 @@ async def fix_graph( Raises: HTTPException: If the operation fails """ + try: + logger.info(f"Starting fix graph operation for thread: {thread_id}") + logger.debug(f"User info: {user}") - logger.info(f"Starting fix graph operation for thread: {thread_id}") - logger.debug(f"User info: {user}") - - fix_config = { - "thread_id": thread_id, - "user": user, - } - - # Merge additional config if provided - if config: - fix_config.update(config) - - logger.debug("Fetching current state from checkpointer") - state: AgentState | None = await self.checkpointer.aget_state(fix_config) + fix_config = {"thread_id": thread_id, "user": user} + if config: + fix_config.update(config) + + logger.debug("Fetching current state from checkpointer") + state: AgentState | None = await self.checkpointer.aget_state(fix_config) + if not state: + logger.warning(f"No state found for thread: {thread_id}") + return { + "success": False, + "message": f"No state found for thread: {thread_id}", + "removed_count": 0, + "state": None, + } - if not state: - logger.warning(f"No state found for thread: {thread_id}") - return { - "success": False, - "message": f"No state found for thread: {thread_id}", - "removed_count": 0, - "state": None, - } + messages: list[Message] = list(state.context or []) + logger.debug(f"Found {len(messages)} messages in state") + if not messages: + return { + "success": True, + "message": "No messages found in state", + "removed_count": 0, + "state": state.model_dump_json(), + } - messages: list[Message] = state.context - logger.debug(f"Found {len(messages)} messages in state") + filtered = [m for m in messages if not self._has_empty_tool_call(m)] + removed_count = len(messages) - len(filtered) - if not messages: - logger.info("No messages found in state, nothing to fix") - return { - "success": True, - "message": "No messages found in state", - "removed_count": 0, - "state": state.model_dump_json(), - } + if removed_count: + state.context = filtered + await self.checkpointer.aput_state(fix_config, state) + message = f"Successfully removed {removed_count} message(s)" + else: + message = "No messages with empty tool calls found" - last_message = messages[-1] - updated_context = [] - if last_message.role == "assistant" and last_message.tools_calls: - updated_context = messages[:-1] - state.context = updated_context - await self.checkpointer.aput_state(fix_config, state) return { "success": True, - "message": "Removed last assistant message with empty tool calls", - "removed_count": 1, + "message": message, + "removed_count": removed_count, "state": state.model_dump_json(), } - else: - logger.warning( - "Last message is not an assistant message with tool calls, skipping it from checks." - ) - - return { - "success": True, - "message": "No messages with empty tool calls found", - "removed_count": 0, - "state": state.model_dump_json(), - } + except Exception as e: + logger.error(f"Fix graph operation failed: {e}") + raise HTTPException(status_code=500, detail=f"Fix graph operation failed: {e!s}") async def setup(self, data: GraphSetupSchema) -> dict: # lets create tools diff --git a/graph/react.py b/graph/react.py index 7dd5c38..0639ec1 100644 --- a/graph/react.py +++ b/graph/react.py @@ -1,37 +1,3 @@ -""" -Graph-based React Agent Implementation - -This module implements a reactive agent system using PyAgenity's StateGraph. -The agent can interact with tools (like weather checking) and maintain conversation -state through a checkpointer. The graph orchestrates the flow between the main -agent logic and tool execution. - -Key Components: -- Weather tool: Demonstrates tool calling with dependency injection -- Main agent: AI-powered assistant that can use tools -- Graph flow: Conditional routing based on tool usage -- Checkpointer: Maintains conversation state across interactions - -Architecture: -The system uses a state graph with two main nodes: -1. MAIN: Processes user input and generates AI responses -2. TOOL: Executes tool calls when requested by the AI - -The graph conditionally routes between these nodes based on whether -the AI response contains tool calls. Conversation history is maintained -through the checkpointer, allowing for multi-turn conversations. - -Tools are defined as functions with JSON schema docstrings that describe -their interface for the AI model. The ToolNode automatically extracts -these schemas for tool selection. - -Dependencies: -- PyAgenity: For graph and state management -- LiteLLM: For AI model interactions -- InjectQ: For dependency injection -- Python logging: For debug and info messages -""" - import logging from typing import Any @@ -70,46 +36,6 @@ class MyAgentState(AgentState): checkpointer = InMemoryCheckpointer[MyAgentState]() -""" -Note: The docstring below will be used as the tool description and it will be -passed to the AI model for tool selection, so keep it relevant and concise. -This function will be converted to a tool with the following schema: -[ - { - 'type': 'function', - 'function': { - 'name': 'get_weather', - 'description': 'Retrieve current weather information for a specified location.', - 'parameters': { - 'type': 'object', - 'properties': { - 'location': {'type': 'string'} - }, - 'required': ['location'] - } - } - } - ] - -Parameters like tool_call_id, state, and checkpointer are injected automatically -by InjectQ when the tool is called by the agent. -Available injected parameters: -The following parameters are automatically injected by InjectQ when the tool is called, -but need to keep them as same name and type for proper injection: -- tool_call_id: Unique ID for the tool call -- state: Current AgentState containing conversation context -- config: Configuration dictionary passed during graph invocation - -Below fields need to be used with Inject[] to get the instances: -- context_manager: ContextManager instance for managing context, like trimming -- publisher: Publisher instance for publishing events and logs -- checkpointer: InMemoryCheckpointer instance for state management -- store: InMemoryStore instance for temporary data storage -- callback: CallbackManager instance for handling callbacks - -""" - - def get_weather( location: str, tool_call_id: str, diff --git a/pyproject.toml b/pyproject.toml index 7213684..8b8373c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ classifiers = [ "Topic :: Internet :: WWW/HTTP :: HTTP Servers", ] dependencies = [ - "10xscale-agentflow>=0.4.0", + "10xscale-agentflow>=0.5.0", "fastapi", "gunicorn", "orjson",