Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions crates/mcp-brain-server/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ pub async fn create_router() -> (Router, AppState) {
notifier: crate::notify::ResendNotifier::from_env(),
cached_status: Arc::new(parking_lot::RwLock::new(None)),
gist_publisher: crate::gist::GistPublisher::from_env().map(Arc::new),
optimize_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
last_optimize_completed: Arc::new(parking_lot::RwLock::new(None)),
sse_connections: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};

let router = Router::new()
Expand All @@ -287,6 +290,7 @@ pub async fn create_router() -> (Router, AppState) {
.route("/.well-known/agent-guide.md", get(agent_guide))
.route("/origin", get(origin_page))
.route("/v1/health", get(health))
.route("/v1/ready", get(ready))
.route("/v1/challenge", get(issue_challenge))
.route("/v1/memories", post(share_memory))
.route("/v1/memories/search", get(search_memories))
Expand Down Expand Up @@ -1139,6 +1143,16 @@ async fn health(State(state): State<AppState>) -> Json<HealthResponse> {
})
}

/// GET /v1/ready — lightweight readiness probe (ADR-130).
/// Returns 200 immediately. No computation, no state access.
async fn ready() -> StatusCode {
StatusCode::OK
}

/// Maximum concurrent SSE connections per instance (ADR-130 Phase 1).
/// Prevents reconnect storms from exhausting Cloud Run concurrency slots.
const MAX_SSE_CONNECTIONS: usize = 50;

/// Issue a challenge nonce for replay protection.
/// Clients must include this nonce in write requests.
/// Nonces are single-use and expire after 5 minutes.
Expand Down Expand Up @@ -3378,13 +3392,37 @@ async fn pipeline_metrics_handler(
}

/// POST /v1/pipeline/optimize — trigger optimization actions
///
/// Rate-limited: max 1 concurrent, 30s cooldown between runs.
/// Prevents scheduler thundering herd from saturating the instance.
async fn pipeline_optimize(
State(state): State<AppState>,
_contributor: AuthenticatedContributor,
Json(req): Json<OptimizeRequest>,
) -> Result<Json<OptimizeResponse>, (StatusCode, String)> {
check_read_only(&state)?;

// Enforce 30-second cooldown between optimize runs
{
let last = state.last_optimize_completed.read();
if let Some(ts) = *last {
if ts.elapsed() < std::time::Duration::from_secs(30) {
let wait = 30 - ts.elapsed().as_secs();
return Err((
StatusCode::TOO_MANY_REQUESTS,
format!("Pipeline optimize cooldown: retry in {wait}s"),
));
}
}
}

// Only 1 concurrent optimize — reject others immediately
let _permit = state.optimize_semaphore.try_acquire()
.map_err(|_| (
StatusCode::TOO_MANY_REQUESTS,
"Pipeline optimize already in progress".to_string(),
))?;

let all_actions = vec![
"train", "drift_check", "transfer_all", "rebuild_graph", "cleanup", "attractor_analysis",
];
Expand Down Expand Up @@ -3492,6 +3530,7 @@ async fn pipeline_optimize(
}

state.pipeline_metrics.optimization_cycles.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
*state.last_optimize_completed.write() = Some(std::time::Instant::now());

Ok(Json(OptimizeResponse {
results,
Expand Down Expand Up @@ -4566,19 +4605,32 @@ async fn origin_page() -> (
/// SSE handler — client connects here, receives event stream
async fn sse_handler(
State(state): State<AppState>,
) -> Sse<impl tokio_stream::Stream<Item = Result<Event, std::convert::Infallible>>> {
) -> Result<Sse<impl tokio_stream::Stream<Item = Result<Event, std::convert::Infallible>>>, (StatusCode, String)> {
// ADR-130 Phase 1: reject new SSE connections when at capacity
let current = state.sse_connections.load(Ordering::Relaxed);
if current >= MAX_SSE_CONNECTIONS {
tracing::warn!("SSE connection limit reached ({}/{}), rejecting", current, MAX_SSE_CONNECTIONS);
return Err((
StatusCode::TOO_MANY_REQUESTS,
format!("SSE connection limit reached ({MAX_SSE_CONNECTIONS}). Retry-After: 10"),
));
}
state.sse_connections.fetch_add(1, Ordering::Relaxed);

let session_id = Uuid::new_v4().to_string();
let (tx, rx) = tokio::sync::mpsc::channel::<String>(64);

// Store sender for this session
state.sessions.insert(session_id.clone(), tx);

tracing::info!("SSE session started: {}", session_id);
tracing::info!("SSE session started: {} (active: {})", session_id,
state.sse_connections.load(Ordering::Relaxed));

// Build SSE stream: first event is the endpoint, then stream messages
let initial_event = format!("/messages?sessionId={session_id}");
let session_id_cleanup = session_id.clone();
let sessions_cleanup = state.sessions.clone();
let sse_counter = state.sse_connections.clone();

let stream = async_stream::stream! {
// Send endpoint event first
Expand All @@ -4590,9 +4642,13 @@ async fn sse_handler(
yield Ok(Event::default().event("message").data(msg));
}

// Decrement connection counter on disconnect
sse_counter.fetch_sub(1, Ordering::Relaxed);

// Clean up session on disconnect — grace period lets clients reconnect
// without losing the session (e.g. MCP SDK's EventSource polyfill)
tracing::info!("SSE stream closed for session: {}, starting 30s grace period", session_id_cleanup);
tracing::info!("SSE stream closed for session: {}, starting 30s grace period (active: {})",
session_id_cleanup, sse_counter.load(Ordering::Relaxed));
tokio::spawn({
let sessions = sessions_cleanup.clone();
let sid = session_id_cleanup.clone();
Expand All @@ -4609,7 +4665,7 @@ async fn sse_handler(
});
};

Sse::new(stream).keep_alive(KeepAlive::default())
Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
}

/// Query params for /messages endpoint
Expand Down
41 changes: 37 additions & 4 deletions crates/mcp-brain-server/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,19 @@ impl FirestoreClient {
let mut all_docs = Vec::new();
let mut page_token: Option<String> = None;
let mut consecutive_errors: usize = 0;
// Track whether we're using offset-based fallback (after stale page token)
let mut use_offset_fallback = false;

loop {
let mut url = format!("{base}/{collection}?pageSize=300");
if let Some(ref token) = page_token {
if use_offset_fallback {
// Stale page token fallback: use offset to skip already-loaded docs
url.push_str(&format!("&offset={}", all_docs.len()));
tracing::info!(
"Firestore LIST {collection}: using offset fallback at {} docs",
all_docs.len()
);
} else if let Some(ref token) = page_token {
url.push_str(&format!("&pageToken={}", urlencoding::encode(token)));
}

Expand All @@ -329,6 +338,7 @@ impl FirestoreClient {
let resp = match result {
Ok(resp) if resp.status().is_success() => {
consecutive_errors = 0;
use_offset_fallback = false; // back on happy path
resp
}
Ok(resp) if resp.status().as_u16() == 401 => {
Expand Down Expand Up @@ -367,11 +377,24 @@ impl FirestoreClient {
}
}
Ok(resp) => {
let status = resp.status().as_u16();
consecutive_errors += 1;
tracing::warn!(
"Firestore LIST {collection} returned {} (error {}/{})",
resp.status(), consecutive_errors, Self::MAX_PAGE_ERRORS
status, consecutive_errors, Self::MAX_PAGE_ERRORS
);
// 400 Bad Request with a page token means the token is stale
// (e.g. after OOM restart). Switch to offset-based pagination.
if status == 400 && page_token.is_some() && !use_offset_fallback {
tracing::warn!(
"Firestore LIST {collection}: stale page token at {} docs, switching to offset fallback",
all_docs.len()
);
page_token = None;
use_offset_fallback = true;
consecutive_errors = 0; // reset — this is a recovery, not repeated failure
continue;
}
if consecutive_errors >= Self::MAX_PAGE_ERRORS { break; }
continue;
}
Expand Down Expand Up @@ -418,8 +441,18 @@ impl FirestoreClient {

// Check for next page
match body.get("nextPageToken").and_then(|t| t.as_str()) {
Some(token) => page_token = Some(token.to_string()),
None => break,
Some(token) if !use_offset_fallback => {
page_token = Some(token.to_string());
}
_ if use_offset_fallback => {
// In offset mode, check if we got any docs this page
// (no docs = we've exhausted the collection)
if body.get("documents").and_then(|d| d.as_array()).map_or(true, |d| d.is_empty()) {
break;
}
// Otherwise continue with incremented offset (all_docs.len() grows each iteration)
}
_ => break,
}
}

Expand Down
6 changes: 6 additions & 0 deletions crates/mcp-brain-server/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,12 @@ pub struct AppState {
pub cached_status: std::sync::Arc<parking_lot::RwLock<Option<(std::time::Instant, StatusResponse)>>>,
/// GitHub Gist publisher for autonomous discoveries — None if GITHUB_GIST_PAT not set
pub gist_publisher: Option<std::sync::Arc<crate::gist::GistPublisher>>,
/// Semaphore to limit concurrent pipeline/optimize requests (prevents scheduler thundering herd)
pub optimize_semaphore: std::sync::Arc<tokio::sync::Semaphore>,
/// Timestamp of last completed pipeline/optimize run (for cooldown enforcement)
pub last_optimize_completed: std::sync::Arc<parking_lot::RwLock<Option<std::time::Instant>>>,
/// Active SSE connection count (ADR-130 Phase 1 — prevents SSE reconnect storms)
pub sse_connections: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}

// ──────────────────────────────────────────────────────────────────────
Expand Down
Loading
Loading