Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions agentflow_cli/src/app/routers/graph/services/graph_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,30 +295,41 @@ async def stream_graph(
mt = chunk.metadata or {}
mt.update(meta)
chunk.metadata = mt
yield chunk.model_dump_json()
# Add newline delimiter and encode to bytes for proper chunk-by-chunk streaming
yield (chunk.model_dump_json() + "\n")
if (
self.config.thread_name_generator_path
and meta["is_new_thread"]
and chunk.event == StreamEvent.MESSAGE
and chunk.message
and not chunk.message.delta
):
messages_str.append(chunk.message.text())
# Safely extract text - handle both string and Message object
msg_text = (
chunk.message.text()
if hasattr(chunk.message, "text") and callable(chunk.message.text)
else str(chunk.message)
)
messages_str.append(msg_text)

logger.info("Graph streaming completed successfully")

if meta["is_new_thread"] and self.config.thread_name_generator_path:
messages_str = [msg.text() for msg in messages_str]
# messages_str already contains text strings, no need to call .text() again
thread_name = await self._save_thread_name(
config, config["thread_id"], messages_str
)
meta["thread_name"] = thread_name

yield StreamChunk(
event=StreamEvent.UPDATES,
data={"status": "completed"},
metadata=meta,
).model_dump_json()
# Add newline delimiter and encode to bytes for proper chunk-by-chunk streaming
yield (
StreamChunk(
event=StreamEvent.UPDATES,
data={"status": "completed"},
metadata=meta,
).model_dump_json()
+ "\n"
)

except Exception as e:
logger.error(f"Graph streaming failed: {e}")
Expand Down Expand Up @@ -421,10 +432,10 @@ async def fix_graph(
"removed_count": 1,
"state": state.model_dump_json(),
}
else:
logger.warning(
"Last message is not an assistant message with tool calls, skipping it from checks."
)

logger.warning(
"Last message is not an assistant message with tool calls, skipping it from checks."
)

return {
"success": True,
Expand Down
73 changes: 35 additions & 38 deletions graph/react.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,44 +70,41 @@ class MyAgentState(AgentState):
checkpointer = InMemoryCheckpointer[MyAgentState]()


"""
Note: The docstring below will be used as the tool description and it will be
passed to the AI model for tool selection, so keep it relevant and concise.
This function will be converted to a tool with the following schema:
[
{
'type': 'function',
'function': {
'name': 'get_weather',
'description': 'Retrieve current weather information for a specified location.',
'parameters': {
'type': 'object',
'properties': {
'location': {'type': 'string'}
},
'required': ['location']
}
}
}
]

Parameters like tool_call_id, state, and checkpointer are injected automatically
by InjectQ when the tool is called by the agent.
Available injected parameters:
The following parameters are automatically injected by InjectQ when the tool is called,
but need to keep them as same name and type for proper injection:
- tool_call_id: Unique ID for the tool call
- state: Current AgentState containing conversation context
- config: Configuration dictionary passed during graph invocation

Below fields need to be used with Inject[] to get the instances:
- context_manager: ContextManager instance for managing context, like trimming
- publisher: Publisher instance for publishing events and logs
- checkpointer: InMemoryCheckpointer instance for state management
- store: InMemoryStore instance for temporary data storage
- callback: CallbackManager instance for handling callbacks

"""
# Note: The docstring below will be used as the tool description and it will be
# passed to the AI model for tool selection, so keep it relevant and concise.
# This function will be converted to a tool with the following schema:
# [
# {
# 'type': 'function',
# 'function': {
# 'name': 'get_weather',
# 'description': 'Retrieve current weather information for a specified location.',
# 'parameters': {
# 'type': 'object',
# 'properties': {
# 'location': {'type': 'string'}
# },
# 'required': ['location']
# }
# }
# }
# ]
#
# Parameters like tool_call_id, state, and checkpointer are injected automatically
# by InjectQ when the tool is called by the agent.
# Available injected parameters:
# The following parameters are automatically injected by InjectQ when the tool is called,
# but need to keep them as same name and type for proper injection:
# - tool_call_id: Unique ID for the tool call
# - state: Current AgentState containing conversation context
# - config: Configuration dictionary passed during graph invocation
#
# Below fields need to be used with Inject[] to get the instances:
# - context_manager: ContextManager instance for managing context, like trimming
# - publisher: Publisher instance for publishing events and logs
# - checkpointer: InMemoryCheckpointer instance for state management
# - store: InMemoryStore instance for temporary data storage
# - callback: CallbackManager instance for handling callbacks


def get_weather(
Expand Down
Loading