diff --git a/agentflow_cli/src/app/routers/graph/services/graph_service.py b/agentflow_cli/src/app/routers/graph/services/graph_service.py index acf922c..441b04d 100644 --- a/agentflow_cli/src/app/routers/graph/services/graph_service.py +++ b/agentflow_cli/src/app/routers/graph/services/graph_service.py @@ -295,7 +295,8 @@ async def stream_graph( mt = chunk.metadata or {} mt.update(meta) chunk.metadata = mt - yield chunk.model_dump_json() + # Add newline delimiter and encode to bytes for proper chunk-by-chunk streaming + yield (chunk.model_dump_json() + "\n") if ( self.config.thread_name_generator_path and meta["is_new_thread"] @@ -303,22 +304,32 @@ async def stream_graph( and chunk.message and not chunk.message.delta ): - messages_str.append(chunk.message.text()) + # Safely extract text - handle both string and Message object + msg_text = ( + chunk.message.text() + if hasattr(chunk.message, "text") and callable(chunk.message.text) + else str(chunk.message) + ) + messages_str.append(msg_text) logger.info("Graph streaming completed successfully") if meta["is_new_thread"] and self.config.thread_name_generator_path: - messages_str = [msg.text() for msg in messages_str] + # messages_str already contains text strings, no need to call .text() again thread_name = await self._save_thread_name( config, config["thread_id"], messages_str ) meta["thread_name"] = thread_name - yield StreamChunk( - event=StreamEvent.UPDATES, - data={"status": "completed"}, - metadata=meta, - ).model_dump_json() + # Add newline delimiter and encode to bytes for proper chunk-by-chunk streaming + yield ( + StreamChunk( + event=StreamEvent.UPDATES, + data={"status": "completed"}, + metadata=meta, + ).model_dump_json() + + "\n" + ) except Exception as e: logger.error(f"Graph streaming failed: {e}") @@ -421,10 +432,10 @@ async def fix_graph( "removed_count": 1, "state": state.model_dump_json(), } - else: - logger.warning( - "Last message is not an assistant message with tool calls, skipping it from checks." - ) + + logger.warning( + "Last message is not an assistant message with tool calls, skipping it from checks." + ) return { "success": True, diff --git a/graph/react.py b/graph/react.py index 7dd5c38..d84d7a9 100644 --- a/graph/react.py +++ b/graph/react.py @@ -70,44 +70,41 @@ class MyAgentState(AgentState): checkpointer = InMemoryCheckpointer[MyAgentState]() -""" -Note: The docstring below will be used as the tool description and it will be -passed to the AI model for tool selection, so keep it relevant and concise. -This function will be converted to a tool with the following schema: -[ - { - 'type': 'function', - 'function': { - 'name': 'get_weather', - 'description': 'Retrieve current weather information for a specified location.', - 'parameters': { - 'type': 'object', - 'properties': { - 'location': {'type': 'string'} - }, - 'required': ['location'] - } - } - } - ] - -Parameters like tool_call_id, state, and checkpointer are injected automatically -by InjectQ when the tool is called by the agent. -Available injected parameters: -The following parameters are automatically injected by InjectQ when the tool is called, -but need to keep them as same name and type for proper injection: -- tool_call_id: Unique ID for the tool call -- state: Current AgentState containing conversation context -- config: Configuration dictionary passed during graph invocation - -Below fields need to be used with Inject[] to get the instances: -- context_manager: ContextManager instance for managing context, like trimming -- publisher: Publisher instance for publishing events and logs -- checkpointer: InMemoryCheckpointer instance for state management -- store: InMemoryStore instance for temporary data storage -- callback: CallbackManager instance for handling callbacks - -""" +# Note: The docstring below will be used as the tool description and it will be +# passed to the AI model for tool selection, so keep it relevant and concise. +# This function will be converted to a tool with the following schema: +# [ +# { +# 'type': 'function', +# 'function': { +# 'name': 'get_weather', +# 'description': 'Retrieve current weather information for a specified location.', +# 'parameters': { +# 'type': 'object', +# 'properties': { +# 'location': {'type': 'string'} +# }, +# 'required': ['location'] +# } +# } +# } +# ] +# +# Parameters like tool_call_id, state, and checkpointer are injected automatically +# by InjectQ when the tool is called by the agent. +# Available injected parameters: +# The following parameters are automatically injected by InjectQ when the tool is called, +# but need to keep them as same name and type for proper injection: +# - tool_call_id: Unique ID for the tool call +# - state: Current AgentState containing conversation context +# - config: Configuration dictionary passed during graph invocation +# +# Below fields need to be used with Inject[] to get the instances: +# - context_manager: ContextManager instance for managing context, like trimming +# - publisher: Publisher instance for publishing events and logs +# - checkpointer: InMemoryCheckpointer instance for state management +# - store: InMemoryStore instance for temporary data storage +# - callback: CallbackManager instance for handling callbacks def get_weather(