Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions agentflow_cli/src/app/core/exceptions/handle_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
GraphRecursionError,
MetricsError,
NodeError,
ResourceNotFoundError,
SchemaVersionError,
SerializationError,
StorageError,
Expand Down Expand Up @@ -40,7 +39,7 @@ def init_errors_handler(app: FastAPI):
ValueError: Handles value errors.
UserAccountError: Handles custom user account errors.
UserPermissionError: Handles custom user permission errors.
ResourceNotFoundError: Handles custom resource not found errors.
APIResourceNotFoundError: Handles custom API resource not found errors.
"""

@app.exception_handler(HTTPException)
Expand Down Expand Up @@ -208,16 +207,3 @@ async def transient_storage_error_exception_handler(
details=getattr(exc, "context", None),
status_code=503,
)

@app.exception_handler(ResourceNotFoundError)
async def resource_not_found_storage_exception_handler(
request: Request, exc: ResourceNotFoundError
):
logger.error(f"ResourceNotFoundError: url: {request.base_url}", exc_info=exc)
return error_response(
request,
error_code=getattr(exc, "error_code", "RESOURCE_NOT_FOUND_000"),
message=getattr(exc, "message", str(exc)),
details=getattr(exc, "context", None),
status_code=404,
)
110 changes: 58 additions & 52 deletions agentflow_cli/src/app/routers/graph/services/graph_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ async def stream_graph(
logger.info("Graph streaming completed successfully")

if meta["is_new_thread"] and self.config.thread_name_generator_path:
messages_str = [msg.text() for msg in messages_str]
thread_name = await self._save_thread_name(
config, config["thread_id"], messages_str
)
Expand Down Expand Up @@ -344,6 +343,25 @@ async def get_state_schema(self) -> dict:
logger.error(f"Failed to get state schema: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get state schema: {e!s}")

def _has_empty_tool_call(self, msg: Message) -> bool:
"""Return True if any tool call on the message has empty content.

A tool call is considered empty if its ``content`` attribute/key is ``None`` or
an empty string. Tool calls may be dict-like or objects with a ``content`` attribute.
"""
tool_calls = getattr(msg, "tools_calls", None)
if not tool_calls:
return False
for tool_call in tool_calls:
content = (
tool_call.get("content")
if isinstance(tool_call, dict)
else getattr(tool_call, "content", None)
)
if content in (None, ""):
return True
return False

async def fix_graph(
self,
thread_id: str,
Expand Down Expand Up @@ -372,66 +390,54 @@ async def fix_graph(
Raises:
HTTPException: If the operation fails
"""
try:
logger.info(f"Starting fix graph operation for thread: {thread_id}")
logger.debug(f"User info: {user}")

logger.info(f"Starting fix graph operation for thread: {thread_id}")
logger.debug(f"User info: {user}")

fix_config = {
"thread_id": thread_id,
"user": user,
}

# Merge additional config if provided
if config:
fix_config.update(config)

logger.debug("Fetching current state from checkpointer")
state: AgentState | None = await self.checkpointer.aget_state(fix_config)
fix_config = {"thread_id": thread_id, "user": user}
if config:
fix_config.update(config)

logger.debug("Fetching current state from checkpointer")
state: AgentState | None = await self.checkpointer.aget_state(fix_config)
if not state:
logger.warning(f"No state found for thread: {thread_id}")
return {
"success": False,
"message": f"No state found for thread: {thread_id}",
"removed_count": 0,
"state": None,
}

if not state:
logger.warning(f"No state found for thread: {thread_id}")
return {
"success": False,
"message": f"No state found for thread: {thread_id}",
"removed_count": 0,
"state": None,
}
messages: list[Message] = list(state.context or [])
logger.debug(f"Found {len(messages)} messages in state")
if not messages:
return {
"success": True,
"message": "No messages found in state",
"removed_count": 0,
"state": state.model_dump_json(),
}

messages: list[Message] = state.context
logger.debug(f"Found {len(messages)} messages in state")
filtered = [m for m in messages if not self._has_empty_tool_call(m)]
removed_count = len(messages) - len(filtered)

if not messages:
logger.info("No messages found in state, nothing to fix")
return {
"success": True,
"message": "No messages found in state",
"removed_count": 0,
"state": state.model_dump_json(),
}
if removed_count:
state.context = filtered
await self.checkpointer.aput_state(fix_config, state)
message = f"Successfully removed {removed_count} message(s)"
else:
message = "No messages with empty tool calls found"

last_message = messages[-1]
updated_context = []
if last_message.role == "assistant" and last_message.tools_calls:
updated_context = messages[:-1]
state.context = updated_context
await self.checkpointer.aput_state(fix_config, state)
return {
"success": True,
"message": "Removed last assistant message with empty tool calls",
"removed_count": 1,
"message": message,
"removed_count": removed_count,
"state": state.model_dump_json(),
}
else:
logger.warning(
"Last message is not an assistant message with tool calls, skipping it from checks."
)

return {
"success": True,
"message": "No messages with empty tool calls found",
"removed_count": 0,
"state": state.model_dump_json(),
}
except Exception as e:
logger.error(f"Fix graph operation failed: {e}")
raise HTTPException(status_code=500, detail=f"Fix graph operation failed: {e!s}")

async def setup(self, data: GraphSetupSchema) -> dict:
# lets create tools
Expand Down
74 changes: 0 additions & 74 deletions graph/react.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,3 @@
"""
Graph-based React Agent Implementation

This module implements a reactive agent system using PyAgenity's StateGraph.
The agent can interact with tools (like weather checking) and maintain conversation
state through a checkpointer. The graph orchestrates the flow between the main
agent logic and tool execution.

Key Components:
- Weather tool: Demonstrates tool calling with dependency injection
- Main agent: AI-powered assistant that can use tools
- Graph flow: Conditional routing based on tool usage
- Checkpointer: Maintains conversation state across interactions

Architecture:
The system uses a state graph with two main nodes:
1. MAIN: Processes user input and generates AI responses
2. TOOL: Executes tool calls when requested by the AI

The graph conditionally routes between these nodes based on whether
the AI response contains tool calls. Conversation history is maintained
through the checkpointer, allowing for multi-turn conversations.

Tools are defined as functions with JSON schema docstrings that describe
their interface for the AI model. The ToolNode automatically extracts
these schemas for tool selection.

Dependencies:
- PyAgenity: For graph and state management
- LiteLLM: For AI model interactions
- InjectQ: For dependency injection
- Python logging: For debug and info messages
"""

import logging
from typing import Any

Expand Down Expand Up @@ -70,46 +36,6 @@ class MyAgentState(AgentState):
checkpointer = InMemoryCheckpointer[MyAgentState]()


"""
Note: The docstring below will be used as the tool description and it will be
passed to the AI model for tool selection, so keep it relevant and concise.
This function will be converted to a tool with the following schema:
[
{
'type': 'function',
'function': {
'name': 'get_weather',
'description': 'Retrieve current weather information for a specified location.',
'parameters': {
'type': 'object',
'properties': {
'location': {'type': 'string'}
},
'required': ['location']
}
}
}
]

Parameters like tool_call_id, state, and checkpointer are injected automatically
by InjectQ when the tool is called by the agent.
Available injected parameters:
The following parameters are automatically injected by InjectQ when the tool is called,
but need to keep them as same name and type for proper injection:
- tool_call_id: Unique ID for the tool call
- state: Current AgentState containing conversation context
- config: Configuration dictionary passed during graph invocation

Below fields need to be used with Inject[] to get the instances:
- context_manager: ContextManager instance for managing context, like trimming
- publisher: Publisher instance for publishing events and logs
- checkpointer: InMemoryCheckpointer instance for state management
- store: InMemoryStore instance for temporary data storage
- callback: CallbackManager instance for handling callbacks

"""


def get_weather(
location: str,
tool_call_id: str,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ classifiers = [
"Topic :: Internet :: WWW/HTTP :: HTTP Servers",
]
dependencies = [
"10xscale-agentflow>=0.4.0",
"10xscale-agentflow>=0.5.0",
"fastapi",
"gunicorn",
"orjson",
Expand Down