Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions agentflow_cli/src/app/core/config/setup_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,38 @@
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.middleware.cors import CORSMiddleware
from starlette.requests import Request
from starlette.types import ASGIApp, Receive, Scope, Send

from .sentry_config import init_sentry
from .settings import get_settings, logger


# Paths that should be excluded from GZip compression (streaming endpoints)
GZIP_EXCLUDED_PATHS = frozenset({"/v1/graph/stream"})


class SelectiveGZipMiddleware:
"""
GZip middleware that excludes certain paths from compression.

This is necessary because streaming endpoints need to send data
immediately without buffering, but GZipMiddleware buffers the
entire response before compressing.
"""

def __init__(self, app: ASGIApp, minimum_size: int = 1000):
self.app = app
self.gzip_app = GZipMiddleware(app, minimum_size=minimum_size)

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "http" and scope["path"] in GZIP_EXCLUDED_PATHS:
# Skip GZip for excluded paths - pass through directly
await self.app(scope, receive, send)
else:
# Apply GZip for all other paths
await self.gzip_app(scope, receive, send)


class RequestIDMiddleware(BaseHTTPMiddleware):
"""
Middleware to add a unique request ID and timestamp to each request and response.
Expand Down Expand Up @@ -74,7 +101,7 @@ def setup_middleware(app: FastAPI):
Middleware:
- CORS: Configured based on settings.ORIGINS.
- TrustedHost: Configured with allowed hosts from settings.ALLOWED_HOST.
- GZip: Applied with a minimum size of 1000 bytes.
- GZip: Applied with a minimum size of 1000 bytes (excludes streaming endpoints).
"""
settings = get_settings()
# init cors
Expand All @@ -90,8 +117,9 @@ def setup_middleware(app: FastAPI):

app.add_middleware(RequestIDMiddleware)

# Note: If you need streaming responses, you should not use GZipMiddleware.
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Use SelectiveGZipMiddleware to exclude streaming endpoints from compression
# Streaming endpoints need immediate data transmission without buffering
app.add_middleware(SelectiveGZipMiddleware, minimum_size=1000)
logger.debug("Middleware set up")

# Initialize Sentry
Expand Down
5 changes: 3 additions & 2 deletions agentflow_cli/src/app/routers/graph/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ async def stream_graph(

return StreamingResponse(
result,
media_type="text/plain",
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Cache-Control": "no-cache, no-transform",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
"Content-Encoding": "identity", # Disable any content encoding (bypasses GZip)
},
)

Expand Down
20 changes: 11 additions & 9 deletions agentflow_cli/src/app/routers/graph/services/graph_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from fastapi import HTTPException
from injectq import InjectQ, inject, singleton
from pydantic import BaseModel
from starlette.responses import Content

from agentflow_cli.src.app.core import logger
from agentflow_cli.src.app.core.config.graph_config import GraphConfig
Expand Down Expand Up @@ -261,7 +260,7 @@ async def stream_graph(
self,
graph_input: GraphInputSchema,
user: dict[str, Any],
) -> AsyncIterable[Content]:
) -> AsyncIterable[str]:
"""
Streams the graph execution with the provided input.

Expand All @@ -270,7 +269,7 @@ async def stream_graph(
stream_mode (str): The stream mode ("values", "updates", "messages", etc.).

Yields:
GraphStreamChunkSchema: Individual chunks from graph execution.
str: Individual JSON chunks from graph execution with newline delimiters.

Raises:
HTTPException: If graph streaming fails.
Expand All @@ -295,7 +294,7 @@ async def stream_graph(
mt = chunk.metadata or {}
mt.update(meta)
chunk.metadata = mt
yield chunk.model_dump_json(serialize_as_any=True)
yield chunk.model_dump_json(serialize_as_any=True) + "\n"
if (
self.config.thread_name_generator_path
and meta["is_new_thread"]
Expand All @@ -313,11 +312,14 @@ async def stream_graph(
)
meta["thread_name"] = thread_name

yield StreamChunk(
event=StreamEvent.UPDATES,
data={"status": "completed"},
metadata=meta,
).model_dump_json(serialize_as_any=True)
yield (
StreamChunk(
event=StreamEvent.UPDATES,
data={"status": "completed"},
metadata=meta,
).model_dump_json(serialize_as_any=True)
+ "\n"
)

except Exception as e:
logger.error(f"Graph streaming failed: {e}")
Expand Down
2 changes: 2 additions & 0 deletions graph/react.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from time import sleep
from typing import Any

from agentflow.adapters.llm.model_response_converter import ModelResponseConverter
Expand Down Expand Up @@ -51,6 +52,7 @@ def get_weather(
logger.debug("Number of messages in context: %d", len(state.context))

# Mock weather response - in production, this would call a real weather API
sleep(1) # Simulate network delay
return f"The weather in {location} is sunny"


Expand Down