From a09251c75ca3f57f6f59100fcdf75c4bd5cc3dee Mon Sep 17 00:00:00 2001 From: rUv Date: Mon, 30 Mar 2026 11:59:35 +0000 Subject: [PATCH] fix(brain): SSE connection limiter, pipeline rate limit, Firestore pagination fallback (ADR-130) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three fixes for recurring pi.ruv.io outages: 1. SSE connection limiter (max 50) — prevents MCP reconnect storms from exhausting Cloud Run concurrency slots. Tracks active count with AtomicUsize, rejects excess with 429. 2. Pipeline optimize rate limiter — max 1 concurrent request with 30s cooldown. Prevents scheduler thundering herd from CPU-saturating the instance. 3. Firestore pagination offset fallback — when page tokens go stale after OOM restart (400 Bad Request), switches to offset-based pagination to load all documents instead of stopping at first batch. Also adds /v1/ready lightweight probe (zero-cost, no state access) for Cloud Run health checks. ADR-130 documents the full decoupling architecture (SSE service split). Co-Authored-By: claude-flow --- crates/mcp-brain-server/src/routes.rs | 64 +++- crates/mcp-brain-server/src/store.rs | 41 ++- crates/mcp-brain-server/src/types.rs | 6 + ...-130-mcp-sse-decoupling-midstream-queue.md | 282 ++++++++++++++++++ 4 files changed, 385 insertions(+), 8 deletions(-) create mode 100644 docs/adr/ADR-130-mcp-sse-decoupling-midstream-queue.md diff --git a/crates/mcp-brain-server/src/routes.rs b/crates/mcp-brain-server/src/routes.rs index a2d01b71e..dd15323e8 100644 --- a/crates/mcp-brain-server/src/routes.rs +++ b/crates/mcp-brain-server/src/routes.rs @@ -276,6 +276,9 @@ pub async fn create_router() -> (Router, AppState) { notifier: crate::notify::ResendNotifier::from_env(), cached_status: Arc::new(parking_lot::RwLock::new(None)), gist_publisher: crate::gist::GistPublisher::from_env().map(Arc::new), + optimize_semaphore: Arc::new(tokio::sync::Semaphore::new(1)), + last_optimize_completed: Arc::new(parking_lot::RwLock::new(None)), + sse_connections: Arc::new(std::sync::atomic::AtomicUsize::new(0)), }; let router = Router::new() @@ -287,6 +290,7 @@ pub async fn create_router() -> (Router, AppState) { .route("/.well-known/agent-guide.md", get(agent_guide)) .route("/origin", get(origin_page)) .route("/v1/health", get(health)) + .route("/v1/ready", get(ready)) .route("/v1/challenge", get(issue_challenge)) .route("/v1/memories", post(share_memory)) .route("/v1/memories/search", get(search_memories)) @@ -1139,6 +1143,16 @@ async fn health(State(state): State) -> Json { }) } +/// GET /v1/ready — lightweight readiness probe (ADR-130). +/// Returns 200 immediately. No computation, no state access. +async fn ready() -> StatusCode { + StatusCode::OK +} + +/// Maximum concurrent SSE connections per instance (ADR-130 Phase 1). +/// Prevents reconnect storms from exhausting Cloud Run concurrency slots. +const MAX_SSE_CONNECTIONS: usize = 50; + /// Issue a challenge nonce for replay protection. /// Clients must include this nonce in write requests. /// Nonces are single-use and expire after 5 minutes. @@ -3378,6 +3392,9 @@ async fn pipeline_metrics_handler( } /// POST /v1/pipeline/optimize — trigger optimization actions +/// +/// Rate-limited: max 1 concurrent, 30s cooldown between runs. +/// Prevents scheduler thundering herd from saturating the instance. async fn pipeline_optimize( State(state): State, _contributor: AuthenticatedContributor, @@ -3385,6 +3402,27 @@ async fn pipeline_optimize( ) -> Result, (StatusCode, String)> { check_read_only(&state)?; + // Enforce 30-second cooldown between optimize runs + { + let last = state.last_optimize_completed.read(); + if let Some(ts) = *last { + if ts.elapsed() < std::time::Duration::from_secs(30) { + let wait = 30 - ts.elapsed().as_secs(); + return Err(( + StatusCode::TOO_MANY_REQUESTS, + format!("Pipeline optimize cooldown: retry in {wait}s"), + )); + } + } + } + + // Only 1 concurrent optimize — reject others immediately + let _permit = state.optimize_semaphore.try_acquire() + .map_err(|_| ( + StatusCode::TOO_MANY_REQUESTS, + "Pipeline optimize already in progress".to_string(), + ))?; + let all_actions = vec![ "train", "drift_check", "transfer_all", "rebuild_graph", "cleanup", "attractor_analysis", ]; @@ -3492,6 +3530,7 @@ async fn pipeline_optimize( } state.pipeline_metrics.optimization_cycles.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + *state.last_optimize_completed.write() = Some(std::time::Instant::now()); Ok(Json(OptimizeResponse { results, @@ -4566,19 +4605,32 @@ async fn origin_page() -> ( /// SSE handler — client connects here, receives event stream async fn sse_handler( State(state): State, -) -> Sse>> { +) -> Result>>, (StatusCode, String)> { + // ADR-130 Phase 1: reject new SSE connections when at capacity + let current = state.sse_connections.load(Ordering::Relaxed); + if current >= MAX_SSE_CONNECTIONS { + tracing::warn!("SSE connection limit reached ({}/{}), rejecting", current, MAX_SSE_CONNECTIONS); + return Err(( + StatusCode::TOO_MANY_REQUESTS, + format!("SSE connection limit reached ({MAX_SSE_CONNECTIONS}). Retry-After: 10"), + )); + } + state.sse_connections.fetch_add(1, Ordering::Relaxed); + let session_id = Uuid::new_v4().to_string(); let (tx, rx) = tokio::sync::mpsc::channel::(64); // Store sender for this session state.sessions.insert(session_id.clone(), tx); - tracing::info!("SSE session started: {}", session_id); + tracing::info!("SSE session started: {} (active: {})", session_id, + state.sse_connections.load(Ordering::Relaxed)); // Build SSE stream: first event is the endpoint, then stream messages let initial_event = format!("/messages?sessionId={session_id}"); let session_id_cleanup = session_id.clone(); let sessions_cleanup = state.sessions.clone(); + let sse_counter = state.sse_connections.clone(); let stream = async_stream::stream! { // Send endpoint event first @@ -4590,9 +4642,13 @@ async fn sse_handler( yield Ok(Event::default().event("message").data(msg)); } + // Decrement connection counter on disconnect + sse_counter.fetch_sub(1, Ordering::Relaxed); + // Clean up session on disconnect — grace period lets clients reconnect // without losing the session (e.g. MCP SDK's EventSource polyfill) - tracing::info!("SSE stream closed for session: {}, starting 30s grace period", session_id_cleanup); + tracing::info!("SSE stream closed for session: {}, starting 30s grace period (active: {})", + session_id_cleanup, sse_counter.load(Ordering::Relaxed)); tokio::spawn({ let sessions = sessions_cleanup.clone(); let sid = session_id_cleanup.clone(); @@ -4609,7 +4665,7 @@ async fn sse_handler( }); }; - Sse::new(stream).keep_alive(KeepAlive::default()) + Ok(Sse::new(stream).keep_alive(KeepAlive::default())) } /// Query params for /messages endpoint diff --git a/crates/mcp-brain-server/src/store.rs b/crates/mcp-brain-server/src/store.rs index 17a52552d..a9a4ac352 100644 --- a/crates/mcp-brain-server/src/store.rs +++ b/crates/mcp-brain-server/src/store.rs @@ -314,10 +314,19 @@ impl FirestoreClient { let mut all_docs = Vec::new(); let mut page_token: Option = None; let mut consecutive_errors: usize = 0; + // Track whether we're using offset-based fallback (after stale page token) + let mut use_offset_fallback = false; loop { let mut url = format!("{base}/{collection}?pageSize=300"); - if let Some(ref token) = page_token { + if use_offset_fallback { + // Stale page token fallback: use offset to skip already-loaded docs + url.push_str(&format!("&offset={}", all_docs.len())); + tracing::info!( + "Firestore LIST {collection}: using offset fallback at {} docs", + all_docs.len() + ); + } else if let Some(ref token) = page_token { url.push_str(&format!("&pageToken={}", urlencoding::encode(token))); } @@ -329,6 +338,7 @@ impl FirestoreClient { let resp = match result { Ok(resp) if resp.status().is_success() => { consecutive_errors = 0; + use_offset_fallback = false; // back on happy path resp } Ok(resp) if resp.status().as_u16() == 401 => { @@ -367,11 +377,24 @@ impl FirestoreClient { } } Ok(resp) => { + let status = resp.status().as_u16(); consecutive_errors += 1; tracing::warn!( "Firestore LIST {collection} returned {} (error {}/{})", - resp.status(), consecutive_errors, Self::MAX_PAGE_ERRORS + status, consecutive_errors, Self::MAX_PAGE_ERRORS ); + // 400 Bad Request with a page token means the token is stale + // (e.g. after OOM restart). Switch to offset-based pagination. + if status == 400 && page_token.is_some() && !use_offset_fallback { + tracing::warn!( + "Firestore LIST {collection}: stale page token at {} docs, switching to offset fallback", + all_docs.len() + ); + page_token = None; + use_offset_fallback = true; + consecutive_errors = 0; // reset — this is a recovery, not repeated failure + continue; + } if consecutive_errors >= Self::MAX_PAGE_ERRORS { break; } continue; } @@ -418,8 +441,18 @@ impl FirestoreClient { // Check for next page match body.get("nextPageToken").and_then(|t| t.as_str()) { - Some(token) => page_token = Some(token.to_string()), - None => break, + Some(token) if !use_offset_fallback => { + page_token = Some(token.to_string()); + } + _ if use_offset_fallback => { + // In offset mode, check if we got any docs this page + // (no docs = we've exhausted the collection) + if body.get("documents").and_then(|d| d.as_array()).map_or(true, |d| d.is_empty()) { + break; + } + // Otherwise continue with incremented offset (all_docs.len() grows each iteration) + } + _ => break, } } diff --git a/crates/mcp-brain-server/src/types.rs b/crates/mcp-brain-server/src/types.rs index d5a486ea6..04f81aed3 100644 --- a/crates/mcp-brain-server/src/types.rs +++ b/crates/mcp-brain-server/src/types.rs @@ -1341,6 +1341,12 @@ pub struct AppState { pub cached_status: std::sync::Arc>>, /// GitHub Gist publisher for autonomous discoveries — None if GITHUB_GIST_PAT not set pub gist_publisher: Option>, + /// Semaphore to limit concurrent pipeline/optimize requests (prevents scheduler thundering herd) + pub optimize_semaphore: std::sync::Arc, + /// Timestamp of last completed pipeline/optimize run (for cooldown enforcement) + pub last_optimize_completed: std::sync::Arc>>, + /// Active SSE connection count (ADR-130 Phase 1 — prevents SSE reconnect storms) + pub sse_connections: std::sync::Arc, } // ────────────────────────────────────────────────────────────────────── diff --git a/docs/adr/ADR-130-mcp-sse-decoupling-midstream-queue.md b/docs/adr/ADR-130-mcp-sse-decoupling-midstream-queue.md new file mode 100644 index 000000000..9e4ec595a --- /dev/null +++ b/docs/adr/ADR-130-mcp-sse-decoupling-midstream-queue.md @@ -0,0 +1,282 @@ +# ADR-130: MCP SSE Decoupling via Midstream Queue Architecture + +## Status + +Proposed + +## Date + +2026-03-29 + +## Context + +pi.ruv.io has experienced three outages in two days, all traced to the same root cause: **MCP SSE transport sharing the Cloud Run concurrency pool with the REST API**. Each SSE connection holds an open HTTP stream on a Cloud Run request slot indefinitely. When MCP clients disconnect and reconnect in loops (IDE restarts, network blips, SSE polyfill reconnects), they create a reconnect storm that exhausts all concurrency slots, returning 429 to every request — including health checks, scheduler jobs, and the REST API. + +### Failure Timeline + +| Date | Incident | Root Cause | +|------|----------|------------| +| 2026-03-28 AM | 504 Gateway Timeout | Scheduler thundering herd saturated CPU | +| 2026-03-28 PM | 503 "Service is disabled" | GFE marked service unavailable after cascading timeouts | +| 2026-03-29 AM | OOM crash → partial data load | 5,898 memories + 12.5M edge graph exceeded 2GB | +| 2026-03-29 PM | 429 Rate Exceeded (all endpoints) | SSE reconnect storm consumed all concurrency slots | + +### Current Architecture (broken) + +``` +MCP Clients ──SSE──┐ +Health Checks ─────┤ +Scheduler Jobs ────┤── Cloud Run (single service, shared concurrency) +REST API ──────────┤ ruvbrain: 2 CPU, 4GB, concurrency=250 +Browser/UI ────────┘ +``` + +All traffic types compete for the same concurrency slots. SSE connections are long-lived (minutes to hours). REST requests are short-lived (milliseconds). Mixing them on the same concurrency pool is fundamentally broken at scale. + +## Decision + +### Split into three decoupled services with a Rust midstream queue + +``` + ┌─────────────────────┐ +MCP Clients ──SSE──▶│ ruvbrain-sse │──push──▶┌──────────────┐ + │ (Cloud Run) │ │ │ + │ Concurrency: 500 │◀─poll───│ Midstream │ + │ CPU: 1, Mem: 512MB │ │ Queue │ + └─────────────────────┘ │ (in-process) │ + │ │ +Health/Scheduler ──▶┌─────────────────────┐ │ Ring buffer │ +REST API ──────────▶│ ruvbrain-api │──push──▶│ per session │ +Browser/UI ────────▶│ (Cloud Run) │ │ │ + │ Concurrency: 80 │◀─poll───│ │ + │ CPU: 2, Mem: 4GB │ └──────────────┘ + └─────────────────────┘ + │ + ▼ + ┌─────────────────────┐ +Scheduler Jobs ───▶│ ruvbrain-worker │ + │ (Cloud Run Jobs) │ + │ GPU: L4 (optional) │ + │ Timeout: 1hr │ + └─────────────────────┘ +``` + +### Service Separation + +| Service | Purpose | Concurrency | CPU | Memory | Cost/month | +|---------|---------|-------------|-----|--------|------------| +| `ruvbrain-api` | REST API, health, status | 80 | 2 | 4 GB | ~$30 | +| `ruvbrain-sse` | MCP SSE transport only | 500 | 1 | 512 MB | ~$10 | +| `ruvbrain-worker` | Scheduler jobs (train, drift, transfer) | 1 | 2 | 4 GB | ~$15 (job-based) | + +### Midstream Queue (Rust, in-process) + +The queue bridges SSE and API services. It runs inside `ruvbrain-api` as a Rust module — no external dependencies (no Pub/Sub, no Redis). + +```rust +/// Midstream message queue for SSE decoupling. +/// Each MCP session gets a bounded ring buffer. +/// API writes responses into the buffer; SSE service polls via internal endpoint. +pub struct MidstreamQueue { + /// Session ID → bounded ring buffer of JSON-RPC responses + sessions: DashMap, + /// Maximum sessions before evicting oldest idle + max_sessions: usize, + /// Maximum messages per session buffer + buffer_capacity: usize, +} + +pub struct SessionBuffer { + messages: VecDeque, + created_at: Instant, + last_poll: Instant, + capacity: usize, +} + +impl MidstreamQueue { + pub fn new(max_sessions: usize, buffer_capacity: usize) -> Self { + Self { + sessions: DashMap::new(), + max_sessions, + buffer_capacity, + } + } + + /// Called by API when processing a JSON-RPC request for a session. + /// Pushes the response into the session's ring buffer. + pub fn push(&self, session_id: &str, message: String) -> Result<(), QueueError> { + let mut entry = self.sessions + .entry(session_id.to_string()) + .or_insert_with(|| SessionBuffer::new(self.buffer_capacity)); + entry.push(message); + Ok(()) + } + + /// Called by SSE service to drain pending messages for a session. + /// Returns all buffered messages and clears the buffer. + pub fn drain(&self, session_id: &str) -> Vec { + if let Some(mut entry) = self.sessions.get_mut(session_id) { + entry.last_poll = Instant::now(); + entry.messages.drain(..).collect() + } else { + Vec::new() + } + } + + /// Evict sessions idle for > timeout (called periodically). + pub fn evict_idle(&self, timeout: Duration) { + let now = Instant::now(); + self.sessions.retain(|_, buf| now.duration_since(buf.last_poll) < timeout); + } +} +``` + +### SSE Service Protocol + +The `ruvbrain-sse` service is a thin proxy: + +1. **Client connects** via `GET /sse` → SSE service creates session, sends `endpoint` event +2. **Client sends** JSON-RPC via `POST /messages?sessionId=X` → SSE service forwards to `ruvbrain-api` internal endpoint +3. **API processes** request, pushes response into midstream queue +4. **SSE service polls** `ruvbrain-api` at `/internal/queue/drain?sessionId=X` every 100ms (or uses Server-Sent Events from API → SSE via internal endpoint) +5. **SSE service streams** response to client + +The SSE service has **no business logic** — it only manages WebSocket/SSE transport. All brain logic stays in `ruvbrain-api`. + +### Worker Service (Scheduler Isolation) + +Heavy scheduler jobs move to `ruvbrain-worker` as Cloud Run Jobs: + +| Job | Current | New | +|-----|---------|-----| +| `brain-train` | POST to API (every 5m) | Cloud Run Job (every 5m) — reads Firestore directly | +| `brain-transfer` | POST to API (every 30m) | Cloud Run Job (every 30m) — reads Firestore directly | +| `brain-attractor` | POST to API (every 20m) | Cloud Run Job (every 20m) | +| `brain-graph` | POST to API (hourly) | Cloud Run Job (hourly) — rebuilds graph, writes back | +| `brain-drift` | POST to API (every 15m) | Stays on API (lightweight, read-only) | +| `brain-cleanup` | POST to API (daily) | Cloud Run Job (daily) | + +Workers read from and write to Firestore directly. They share the same Rust crates (`mcp-brain-server::store`, `mcp-brain-server::graph`, `sona`) compiled into a separate binary. + +### Internal API Endpoints (not exposed externally) + +Added to `ruvbrain-api` for SSE service communication: + +``` +POST /internal/queue/push — SSE service forwards JSON-RPC here +GET /internal/queue/drain — SSE service polls for responses +POST /internal/session/create — SSE service registers new session +DELETE /internal/session/:id — SSE service cleans up on disconnect +``` + +These are authenticated via an internal service account token (Cloud Run service-to-service auth). + +## Implementation Plan + +### Phase 1: SSE Connection Limiting (immediate, no new services) + +Add server-side SSE connection limits to the existing monolith to stop the bleeding: + +```rust +/// Maximum concurrent SSE connections per instance +const MAX_SSE_CONNECTIONS: usize = 50; +/// SSE idle timeout — disconnect clients that haven't sent a message in 5 minutes +const SSE_IDLE_TIMEOUT: Duration = Duration::from_secs(300); +/// Backoff header sent on 429 to slow reconnect storms +const SSE_RETRY_AFTER: u32 = 10; // seconds +``` + +1. Track active SSE count with `AtomicUsize` +2. Reject new SSE connections with `429 + Retry-After: 10` when at capacity +3. Add idle timeout — disconnect SSE sessions with no activity for 5 minutes +4. Add exponential backoff hint in SSE retry field + +### Phase 2: Midstream Queue Module (week 1) + +Implement `MidstreamQueue` as a module in `crates/mcp-brain-server/src/midstream_queue.rs`: + +1. Ring buffer per session (capacity: 64 messages) +2. Idle session eviction (5 min timeout) +3. Max 200 concurrent sessions +4. Internal drain endpoint for future SSE service + +### Phase 3: Service Split (week 2) + +1. Create `ruvbrain-sse` Dockerfile (thin, no graph/embeddings — just SSE + HTTP proxy) +2. Create `ruvbrain-worker` binary (scheduler jobs, shares store/graph crates) +3. Deploy both alongside existing `ruvbrain-api` +4. Update Cloud Scheduler to target worker jobs instead of API +5. Update DNS/domain mapping + +### Phase 4: Worker Migration (week 3) + +1. Convert `brain-train`, `brain-transfer`, `brain-attractor`, `brain-graph` to direct Firestore workers +2. Remove `/v1/pipeline/optimize` endpoint from API (workers don't need it) +3. Add write-back protocol: worker writes results to Firestore, API reads on next request + +## SSE Connection Limiting (Phase 1 — ship immediately) + +This is the minimum fix to stop outages while the full decoupling is built: + +```rust +// In AppState: +pub sse_connections: Arc, + +// In sse_handler: +async fn sse_handler(State(state): State) -> Result, (StatusCode, String)> { + let current = state.sse_connections.load(Ordering::Relaxed); + if current >= MAX_SSE_CONNECTIONS { + return Err(( + StatusCode::TOO_MANY_REQUESTS, + "SSE connection limit reached. Retry-After: 10".into(), + )); + } + state.sse_connections.fetch_add(1, Ordering::Relaxed); + + // ... existing SSE logic ... + + // On stream close: + state.sse_connections.fetch_sub(1, Ordering::Relaxed); +} +``` + +## Alternatives Considered + +1. **Google Cloud Pub/Sub**: External message queue between SSE and API. Adds latency (~50ms), cost ($0.40/million messages), and operational complexity. The in-process ring buffer is simpler and faster for this scale. + +2. **Cloud Run WebSocket support**: Cloud Run supports WebSockets but with the same concurrency model. Doesn't solve the slot exhaustion problem. + +3. **Separate domain for SSE** (`sse.pi.ruv.io`): Routes SSE to a different Cloud Run service but still shares the same codebase/binary. Partial solution — helps with concurrency isolation but doesn't decouple the business logic. + +4. **Cloud Run min-instances scaling**: Set `min-instances=3` to absorb SSE load across more instances. Increases cost 3x without solving the architectural issue. Band-aid. + +5. **Move to GKE**: Full Kubernetes with separate deployments per concern. Correct architecture but massive operational overhead for a single-developer project. + +6. **Redis Pub/Sub**: External broker. Adds a managed service dependency. Overkill for session-scoped message passing where the in-process queue suffices. + +## Cost Impact + +| Component | Current | After Split | +|-----------|---------|-------------| +| API (ruvbrain) | ~$40/mo (2 CPU, 4GB, always-on) | ~$30/mo (same but less load) | +| SSE service | included above | ~$10/mo (1 CPU, 512MB) | +| Worker jobs | included above | ~$15/mo (on-demand, job-based) | +| **Total** | **~$40/mo** | **~$55/mo** | + +$15/month increase for elimination of all concurrency-related outages. + +## Risks + +| Risk | Impact | Mitigation | +|------|--------|------------| +| SSE ↔ API latency | 100ms polling adds latency to MCP responses | Use internal SSE stream instead of polling; fallback to 50ms poll | +| Worker ↔ API cache coherence | Worker writes to Firestore; API has stale in-memory cache | API refreshes from Firestore on cache miss; add TTL to cached data | +| Increased deploy complexity | 3 services instead of 1 | Shared Dockerfile base; single `deploy_all.sh` script | +| SSE service statelessness | Session affinity needed for SSE reconnects | Cloud Run session affinity on SSE service | + +## References + +- [ADR-066: SSE MCP Transport](./ADR-066-sse-mcp-transport.md) +- [ADR-077: Midstream Brain Integration](./ADR-077-midstream-brain-integration.md) +- [Cloud Run concurrency model](https://cloud.google.com/run/docs/about-concurrency) +- [MCP SSE Transport specification](https://modelcontextprotocol.io/docs/concepts/transports#sse)