From de6921a5555ac6526ea7dd63d41e23529c24026e Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Sat, 29 Nov 2025 22:10:34 +0600 Subject: [PATCH 1/3] feat: Implement selective GZip middleware to exclude streaming endpoints from compression fix: Update media type and headers for StreamingResponse in graph router refactor: Change return type of graph streaming method to AsyncIterable[str] and ensure newline delimiters chore: Simulate network delay in weather response function --- .../src/app/core/config/setup_middleware.py | 34 +++++++++++++++++-- agentflow_cli/src/app/routers/graph/router.py | 5 +-- .../routers/graph/services/graph_service.py | 10 +++--- graph/react.py | 2 ++ 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/agentflow_cli/src/app/core/config/setup_middleware.py b/agentflow_cli/src/app/core/config/setup_middleware.py index 57ebd03..2ac6d2b 100644 --- a/agentflow_cli/src/app/core/config/setup_middleware.py +++ b/agentflow_cli/src/app/core/config/setup_middleware.py @@ -7,11 +7,38 @@ from starlette.middleware.base import BaseHTTPMiddleware from starlette.middleware.cors import CORSMiddleware from starlette.requests import Request +from starlette.types import ASGIApp, Receive, Scope, Send from .sentry_config import init_sentry from .settings import get_settings, logger +# Paths that should be excluded from GZip compression (streaming endpoints) +GZIP_EXCLUDED_PATHS = frozenset({"/v1/graph/stream"}) + + +class SelectiveGZipMiddleware: + """ + GZip middleware that excludes certain paths from compression. + + This is necessary because streaming endpoints need to send data + immediately without buffering, but GZipMiddleware buffers the + entire response before compressing. + """ + + def __init__(self, app: ASGIApp, minimum_size: int = 1000): + self.app = app + self.gzip_app = GZipMiddleware(app, minimum_size=minimum_size) + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope["type"] == "http" and scope["path"] in GZIP_EXCLUDED_PATHS: + # Skip GZip for excluded paths - pass through directly + await self.app(scope, receive, send) + else: + # Apply GZip for all other paths + await self.gzip_app(scope, receive, send) + + class RequestIDMiddleware(BaseHTTPMiddleware): """ Middleware to add a unique request ID and timestamp to each request and response. @@ -74,7 +101,7 @@ def setup_middleware(app: FastAPI): Middleware: - CORS: Configured based on settings.ORIGINS. - TrustedHost: Configured with allowed hosts from settings.ALLOWED_HOST. - - GZip: Applied with a minimum size of 1000 bytes. + - GZip: Applied with a minimum size of 1000 bytes (excludes streaming endpoints). """ settings = get_settings() # init cors @@ -90,8 +117,9 @@ def setup_middleware(app: FastAPI): app.add_middleware(RequestIDMiddleware) - # Note: If you need streaming responses, you should not use GZipMiddleware. - app.add_middleware(GZipMiddleware, minimum_size=1000) + # Use SelectiveGZipMiddleware to exclude streaming endpoints from compression + # Streaming endpoints need immediate data transmission without buffering + app.add_middleware(SelectiveGZipMiddleware, minimum_size=1000) logger.debug("Middleware set up") # Initialize Sentry diff --git a/agentflow_cli/src/app/routers/graph/router.py b/agentflow_cli/src/app/routers/graph/router.py index 147de5a..b22e72b 100644 --- a/agentflow_cli/src/app/routers/graph/router.py +++ b/agentflow_cli/src/app/routers/graph/router.py @@ -81,11 +81,12 @@ async def stream_graph( return StreamingResponse( result, - media_type="text/plain", + media_type="text/event-stream", headers={ - "Cache-Control": "no-cache", + "Cache-Control": "no-cache, no-transform", "Connection": "keep-alive", "X-Accel-Buffering": "no", # Disable nginx buffering + "Content-Encoding": "identity", # Disable any content encoding (bypasses GZip) }, ) diff --git a/agentflow_cli/src/app/routers/graph/services/graph_service.py b/agentflow_cli/src/app/routers/graph/services/graph_service.py index 0920dfb..42649da 100644 --- a/agentflow_cli/src/app/routers/graph/services/graph_service.py +++ b/agentflow_cli/src/app/routers/graph/services/graph_service.py @@ -10,8 +10,6 @@ from fastapi import HTTPException from injectq import InjectQ, inject, singleton from pydantic import BaseModel -from starlette.responses import Content - from agentflow_cli.src.app.core import logger from agentflow_cli.src.app.core.config.graph_config import GraphConfig from agentflow_cli.src.app.routers.graph.schemas.graph_schemas import ( @@ -261,7 +259,7 @@ async def stream_graph( self, graph_input: GraphInputSchema, user: dict[str, Any], - ) -> AsyncIterable[Content]: + ) -> AsyncIterable[str]: """ Streams the graph execution with the provided input. @@ -270,7 +268,7 @@ async def stream_graph( stream_mode (str): The stream mode ("values", "updates", "messages", etc.). Yields: - GraphStreamChunkSchema: Individual chunks from graph execution. + str: Individual JSON chunks from graph execution with newline delimiters. Raises: HTTPException: If graph streaming fails. @@ -295,7 +293,7 @@ async def stream_graph( mt = chunk.metadata or {} mt.update(meta) chunk.metadata = mt - yield chunk.model_dump_json(serialize_as_any=True) + yield chunk.model_dump_json(serialize_as_any=True) + "\n" if ( self.config.thread_name_generator_path and meta["is_new_thread"] @@ -317,7 +315,7 @@ async def stream_graph( event=StreamEvent.UPDATES, data={"status": "completed"}, metadata=meta, - ).model_dump_json(serialize_as_any=True) + ).model_dump_json(serialize_as_any=True) + "\n" except Exception as e: logger.error(f"Graph streaming failed: {e}") diff --git a/graph/react.py b/graph/react.py index 0639ec1..2e74e84 100644 --- a/graph/react.py +++ b/graph/react.py @@ -1,4 +1,5 @@ import logging +from time import sleep from typing import Any from agentflow.adapters.llm.model_response_converter import ModelResponseConverter @@ -51,6 +52,7 @@ def get_weather( logger.debug("Number of messages in context: %d", len(state.context)) # Mock weather response - in production, this would call a real weather API + sleep(1) # Simulate network delay return f"The weather in {location} is sunny" From bd7719a806fab3e24e806a612d4ac58c512400f8 Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Sat, 29 Nov 2025 22:10:40 +0600 Subject: [PATCH 2/3] fix: Refactor yield statement in graph streaming to improve readability --- .../src/app/routers/graph/services/graph_service.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/agentflow_cli/src/app/routers/graph/services/graph_service.py b/agentflow_cli/src/app/routers/graph/services/graph_service.py index 42649da..1f0a207 100644 --- a/agentflow_cli/src/app/routers/graph/services/graph_service.py +++ b/agentflow_cli/src/app/routers/graph/services/graph_service.py @@ -311,11 +311,14 @@ async def stream_graph( ) meta["thread_name"] = thread_name - yield StreamChunk( - event=StreamEvent.UPDATES, - data={"status": "completed"}, - metadata=meta, - ).model_dump_json(serialize_as_any=True) + "\n" + yield ( + StreamChunk( + event=StreamEvent.UPDATES, + data={"status": "completed"}, + metadata=meta, + ).model_dump_json(serialize_as_any=True) + + "\n" + ) except Exception as e: logger.error(f"Graph streaming failed: {e}") From 77322342bce0f7f514761463eb0045e015451b53 Mon Sep 17 00:00:00 2001 From: Shudipto Trafder Date: Sat, 29 Nov 2025 22:13:09 +0600 Subject: [PATCH 3/3] fix: Add missing import statement for improved functionality --- agentflow_cli/src/app/routers/graph/services/graph_service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/agentflow_cli/src/app/routers/graph/services/graph_service.py b/agentflow_cli/src/app/routers/graph/services/graph_service.py index 1f0a207..2b4f3a7 100644 --- a/agentflow_cli/src/app/routers/graph/services/graph_service.py +++ b/agentflow_cli/src/app/routers/graph/services/graph_service.py @@ -10,6 +10,7 @@ from fastapi import HTTPException from injectq import InjectQ, inject, singleton from pydantic import BaseModel + from agentflow_cli.src.app.core import logger from agentflow_cli.src.app.core.config.graph_config import GraphConfig from agentflow_cli.src.app.routers.graph.schemas.graph_schemas import (