diff --git a/architecture/gateway.md b/architecture/gateway.md index 04e64a73f..ead164bd4 100644 --- a/architecture/gateway.md +++ b/architecture/gateway.md @@ -33,6 +33,12 @@ health, metrics, or tunnel routes. The plaintext service router also rejects browser requests whose Fetch Metadata, Origin, or Referer headers indicate a cross-origin or sibling-subdomain request. +Dedicated health listeners expose `/healthz` (process liveness only) and +`/readyz` (dependency-aware readiness). Readiness reflects the latest result +of an in-process background task that pings the persistence layer on a +fixed cadence; the handler reads a cached state, so responses are +sub-millisecond and never race the kubelet probe timeout. + Supported auth modes: | Mode | Use | diff --git a/crates/openshell-server/Cargo.toml b/crates/openshell-server/Cargo.toml index 4bbfe24fc..3848cea06 100644 --- a/crates/openshell-server/Cargo.toml +++ b/crates/openshell-server/Cargo.toml @@ -93,6 +93,7 @@ x509-parser = "0.16" [features] dev-settings = ["openshell-core/dev-settings"] +test-support = [] [dev-dependencies] hyper-rustls = { version = "0.27", default-features = false, features = ["native-tokio", "http1", "tls12", "logging", "ring", "webpki-tokio"] } diff --git a/crates/openshell-server/src/http.rs b/crates/openshell-server/src/http.rs index 40f0b39d0..63b59edf0 100644 --- a/crates/openshell-server/src/http.rs +++ b/crates/openshell-server/src/http.rs @@ -2,6 +2,19 @@ // SPDX-License-Identifier: Apache-2.0 //! HTTP health endpoints using Axum. +//! +//! Three endpoints with distinct semantics: +//! - `/healthz` — Kubernetes liveness probe. Returns `200 OK` whenever the +//! process is responsive. Intentionally does NOT depend on the database +//! so a transient outage does not cascade into a `CrashLoopBackOff`. +//! - `/readyz` — Kubernetes readiness probe. Reads the cached state +//! published by [`crate::readiness::DatabaseHealthMonitor`] and returns +//! `503 Service Unavailable` when the latest background check failed. +//! Handler latency is sub-millisecond: the database is never pinged from +//! inside the request path, so the response cannot race the kubelet's +//! probe timeout. +//! - `/health` — Alias of `/readyz` for external monitors +//! that conventionally probe `/health`. use axum::{ Json, Router, @@ -14,43 +27,143 @@ use axum::{ use metrics_exporter_prometheus::PrometheusHandle; use serde::Serialize; use std::sync::Arc; +use tokio::sync::watch; -/// Health check response. +use crate::persistence::Store; +use crate::readiness::{DatabaseHealthMonitor, HealthError, HealthState}; + +const STATUS_HEALTHY: &str = "healthy"; +const STATUS_UNHEALTHY: &str = "unhealthy"; +const DATABASE_INITIALIZING_ERROR: &str = "readiness monitor still initializing"; +const DATABASE_UNAVAILABLE_ERROR: &str = "database unavailable"; +const DATABASE_TIMEOUT_ERROR: &str = "database health check timed out"; + +#[derive(Clone)] +struct HealthRouterState { + health: watch::Receiver, +} + +/// Per-dependency check entry exposed under `checks` in the JSON payload. +#[derive(Debug, Serialize)] +pub struct DependencyCheck { + /// `"healthy"` or `"unhealthy"`. + pub status: &'static str, + + /// Wall-clock time of the latest background ping, when measurable. + #[serde(skip_serializing_if = "Option::is_none")] + pub latency_ms: Option, + + /// Failure detail. Absent on success. + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Aggregated dependency results. +#[derive(Debug, Serialize)] +pub struct HealthChecks { + pub database: DependencyCheck, +} + +/// Readiness response payload. #[derive(Debug, Serialize)] pub struct HealthResponse { - /// Service status. + /// Overall status: `"healthy"` if every dependency is healthy. pub status: &'static str, /// Service version. pub version: &'static str, -} -/// Simple health check - returns 200 OK. -async fn health() -> impl IntoResponse { - StatusCode::OK + /// Per-dependency breakdown. + pub checks: HealthChecks, } -/// Kubernetes liveness probe. +/// Kubernetes liveness probe — process responsiveness only. async fn healthz() -> impl IntoResponse { StatusCode::OK } -/// Kubernetes readiness probe with detailed status. -async fn readyz() -> impl IntoResponse { +/// Kubernetes readiness probe — reflects the cached background DB state. +async fn readyz(State(state): State>) -> impl IntoResponse { + render_response(&state.health.borrow()) +} + +/// Convenience alias of [`readyz`] for monitors that probe `/health`. +async fn health(State(state): State>) -> impl IntoResponse { + render_response(&state.health.borrow()) +} + +fn render_response(state: &HealthState) -> (StatusCode, Json) { + let database = render_database(state); + let healthy = state.is_healthy(); let response = HealthResponse { - status: "healthy", + status: if healthy { + STATUS_HEALTHY + } else { + STATUS_UNHEALTHY + }, version: openshell_core::VERSION, + checks: HealthChecks { database }, + }; + let code = if healthy { + StatusCode::OK + } else { + StatusCode::SERVICE_UNAVAILABLE }; + (code, Json(response)) +} + +fn render_database(state: &HealthState) -> DependencyCheck { + match state { + HealthState::Initializing => DependencyCheck { + status: STATUS_UNHEALTHY, + latency_ms: None, + error: Some(DATABASE_INITIALIZING_ERROR.to_string()), + }, + HealthState::Healthy { latency_ms } => DependencyCheck { + status: STATUS_HEALTHY, + latency_ms: Some(*latency_ms), + error: None, + }, + HealthState::Unhealthy(HealthError::Unavailable { latency_ms }) => DependencyCheck { + status: STATUS_UNHEALTHY, + latency_ms: Some(*latency_ms), + error: Some(DATABASE_UNAVAILABLE_ERROR.to_string()), + }, + HealthState::Unhealthy(HealthError::Timeout) => DependencyCheck { + status: STATUS_UNHEALTHY, + latency_ms: None, + error: Some(DATABASE_TIMEOUT_ERROR.to_string()), + }, + } +} - (StatusCode::OK, Json(response)) +/// Build the health router by spawning a background [`DatabaseHealthMonitor`] +/// for `store` and wiring its receiver into the handlers. +/// +/// Returns immediately so the listener is responsive from t=0. The router's +/// initial state is [`HealthState::Initializing`] — `/readyz` and `/health` +/// will return `503` with a structured `{"checks": {"database": {"status": +/// "initializing"}}}` payload until the background monitor publishes its +/// first real probe outcome (within one [`crate::readiness::DEFAULT_CHECK_INTERVAL`]). +/// The background task continues running detached for the remainder of the +/// runtime. +pub fn health_router(store: Arc) -> Router { + let monitor = DatabaseHealthMonitor::spawn(store); + health_router_from_receiver(monitor.subscribe()) } -/// Create the health router. -pub fn health_router() -> Router { +/// Build the health router from an existing monitor receiver. +/// +/// Crate-internal: used by [`health_router`] and by tests that drive the +/// `HealthState` directly without spinning up the polling task. +pub fn health_router_from_receiver(receiver: watch::Receiver) -> Router { + let state = Arc::new(HealthRouterState { health: receiver }); + Router::new() .route("/health", get(health)) .route("/healthz", get(healthz)) .route("/readyz", get(readyz)) + .with_state(state) } /// Create the metrics router for the dedicated metrics port. @@ -64,7 +177,7 @@ async fn render_metrics(State(handle): State) -> impl IntoResp handle.render() } -/// Create the HTTP router. +/// Create the HTTP router served on the multiplexed gateway port. pub fn http_router(state: Arc) -> Router { crate::ws_tunnel::router(state.clone()) .merge(crate::auth::router(state.clone())) @@ -305,3 +418,149 @@ mod tests { assert!(!browser_context_allows_plaintext_service_request(&req)); } } + +#[cfg(test)] +mod readiness_tests { + use super::*; + use axum::body::Body; + use http::Request; + use http_body_util::BodyExt; + use tower::ServiceExt; + + async fn in_memory_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store"), + ) + } + + /// Build a [`health_router`] that has already observed its first probe + /// outcome. Test-only — production code must not block the listener on + /// the first poll (see [`health_router`]). + async fn polled_health_router(store: Arc) -> Router { + let mut monitor = DatabaseHealthMonitor::spawn(store); + monitor.wait_until_polled().await; + health_router_from_receiver(monitor.subscribe()) + } + + async fn get(router: Router, path: &str) -> (StatusCode, serde_json::Value) { + let response = router + .oneshot(Request::get(path).body(Body::empty()).unwrap()) + .await + .expect("router responds"); + let status = response.status(); + let bytes = response + .into_body() + .collect() + .await + .expect("collect body") + .to_bytes(); + let body = if bytes.is_empty() { + serde_json::Value::Null + } else { + serde_json::from_slice(&bytes).expect("response is valid JSON") + }; + (status, body) + } + + /// Build a router whose state is driven by a `HealthState` we control, + /// so each handler-shape test can pin the exact mapping under test. + fn router_with_state(state: HealthState) -> Router { + let (_tx, rx) = watch::channel(state); + health_router_from_receiver(rx) + } + + #[tokio::test] + async fn healthz_is_minimal_and_does_not_touch_the_database() { + // Liveness must succeed even when the database is unreachable — + // otherwise a transient outage would CrashLoopBackOff the gateway. + let store = in_memory_store().await; + store.close().await; + let (status, body) = get(health_router(store), "/healthz").await; + assert_eq!(status, StatusCode::OK); + assert!(body.is_null(), "healthz must return an empty body"); + } + + #[tokio::test] + async fn readyz_returns_200_with_healthy_payload_when_db_is_reachable() { + let store = in_memory_store().await; + let (status, body) = get(polled_health_router(store).await, "/readyz").await; + assert_eq!(status, StatusCode::OK); + assert_eq!(body["status"], "healthy"); + assert_eq!(body["checks"]["database"]["status"], "healthy"); + assert!( + body["checks"]["database"]["latency_ms"].is_number(), + "expected latency_ms in healthy payload" + ); + assert!( + body["checks"]["database"]["error"].is_null(), + "healthy payload must omit the error field" + ); + } + + #[tokio::test] + async fn health_alias_mirrors_readyz_when_db_is_reachable() { + let store = in_memory_store().await; + let (status, body) = get(polled_health_router(store).await, "/health").await; + assert_eq!(status, StatusCode::OK); + assert_eq!(body["status"], "healthy"); + assert_eq!(body["checks"]["database"]["status"], "healthy"); + } + + #[tokio::test] + async fn readyz_returns_503_with_unhealthy_payload_when_db_is_unreachable() { + let store = in_memory_store().await; + store.close().await; + let (status, body) = get(polled_health_router(store).await, "/readyz").await; + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + assert_eq!(body["status"], "unhealthy"); + assert_eq!(body["checks"]["database"]["status"], "unhealthy"); + assert_eq!( + body["checks"]["database"]["error"], + DATABASE_UNAVAILABLE_ERROR + ); + } + + #[tokio::test] + async fn health_alias_returns_503_when_db_is_unreachable() { + let store = in_memory_store().await; + store.close().await; + let (status, body) = get(polled_health_router(store).await, "/health").await; + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + assert_eq!(body["status"], "unhealthy"); + assert_eq!(body["checks"]["database"]["status"], "unhealthy"); + } + + #[tokio::test] + async fn readyz_reports_initializing_state_as_unhealthy_with_explicit_reason() { + let (status, body) = get(router_with_state(HealthState::Initializing), "/readyz").await; + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + assert_eq!(body["status"], "unhealthy"); + assert_eq!(body["checks"]["database"]["status"], "unhealthy"); + assert_eq!( + body["checks"]["database"]["error"], + DATABASE_INITIALIZING_ERROR + ); + assert!( + body["checks"]["database"]["latency_ms"].is_null(), + "initializing state has no latency to report yet" + ); + } + + #[tokio::test] + async fn readyz_renders_timeout_state_with_dedicated_error_string() { + let (status, body) = get( + router_with_state(HealthState::Unhealthy(HealthError::Timeout)), + "/readyz", + ) + .await; + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + assert_eq!(body["status"], "unhealthy"); + assert_eq!(body["checks"]["database"]["error"], DATABASE_TIMEOUT_ERROR); + assert!( + body["checks"]["database"]["latency_ms"].is_null(), + "timeout state has no completed-call latency" + ); + } +} diff --git a/crates/openshell-server/src/lib.rs b/crates/openshell-server/src/lib.rs index 220e45026..5df704ff7 100644 --- a/crates/openshell-server/src/lib.rs +++ b/crates/openshell-server/src/lib.rs @@ -32,6 +32,7 @@ mod multiplex; mod persistence; pub(crate) mod policy_store; mod provider_refresh; +mod readiness; mod sandbox_index; mod sandbox_watch; mod service_routing; @@ -57,7 +58,7 @@ pub use grpc::OpenShellService; pub use http::{health_router, http_router, metrics_router, service_http_router}; pub use multiplex::{MultiplexService, MultiplexedService}; use openshell_driver_kubernetes::KubernetesComputeConfig; -use persistence::Store; +pub use persistence::Store; use sandbox_index::SandboxIndex; use sandbox_watch::SandboxWatchBus; pub use tls::TlsAcceptor; @@ -171,6 +172,7 @@ pub async fn run_server( if database_url.is_empty() { return Err(Error::config("database_url is required")); } + let store = Arc::new(Store::connect(database_url).await?); let oidc_cache = if let Some(ref oidc) = config.oidc { @@ -255,9 +257,12 @@ pub async fn run_server( )) })?; info!(address = %health_bind_address, "Health server listening"); + // `health_router` returns immediately; the listener serves + // `Initializing → 503` until the background monitor publishes the + // first real probe outcome, so the endpoint is always responsive. + let router = health_router(store.clone()); tokio::spawn(async move { - if let Err(e) = axum::serve(health_listener, health_router().into_make_service()).await - { + if let Err(e) = axum::serve(health_listener, router.into_make_service()).await { error!("Health server error: {e}"); } }); diff --git a/crates/openshell-server/src/multiplex.rs b/crates/openshell-server/src/multiplex.rs index deac9ee78..9a793e82c 100644 --- a/crates/openshell-server/src/multiplex.rs +++ b/crates/openshell-server/src/multiplex.rs @@ -626,11 +626,19 @@ mod tests { assert_ne!(id1.header_value(), id2.header_value()); } + async fn test_health_store() -> Arc { + Arc::new( + crate::Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ) + } + async fn start_http_server_with_middleware() -> std::net::SocketAddr { let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); - let http_service = crate::http::health_router(); + let http_service = crate::http::health_router(test_health_store().await); let http_service = request_id_middleware!(http_service); let service = MultiplexedService::new(http_service.clone(), http_service); diff --git a/crates/openshell-server/src/persistence/mod.rs b/crates/openshell-server/src/persistence/mod.rs index 32875a9f9..6aa2c3bc7 100644 --- a/crates/openshell-server/src/persistence/mod.rs +++ b/crates/openshell-server/src/persistence/mod.rs @@ -164,6 +164,28 @@ impl Store { } } + /// Verify connectivity to the underlying database. + pub async fn ping(&self) -> PersistenceResult<()> { + match self { + Self::Postgres(store) => store.ping().await, + Self::Sqlite(store) => store.ping().await, + } + } + + /// Test support only: close the underlying connection pool. + /// + /// There is no runtime shutdown path yet. If we add graceful shutdown, + /// this API can be made public for that explicit shutdown flow. + /// + /// Do not call from runtime code today; this tears down the active pool. + #[cfg(any(test, feature = "test-support"))] + pub async fn close(&self) { + match self { + Self::Postgres(store) => store.close().await, + Self::Sqlite(store) => store.close().await, + } + } + /// Insert or update a generic object with compare-and-swap support. /// /// # Arguments diff --git a/crates/openshell-server/src/persistence/postgres.rs b/crates/openshell-server/src/persistence/postgres.rs index 8399fd734..529bc38be 100644 --- a/crates/openshell-server/src/persistence/postgres.rs +++ b/crates/openshell-server/src/persistence/postgres.rs @@ -10,7 +10,7 @@ use crate::policy_store::{ policy_record_from_parts, }; use sqlx::postgres::PgPoolOptions; -use sqlx::{PgPool, Row}; +use sqlx::{Connection, PgPool, Row}; static POSTGRES_MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/postgres"); @@ -39,6 +39,21 @@ impl PostgresStore { .map_err(|e| map_migrate_error(&e)) } + /// Verify the database is reachable by acquiring a pooled connection + /// and issuing a protocol-level ping. + pub async fn ping(&self) -> PersistenceResult<()> { + let mut conn = self.pool.acquire().await.map_err(|e| map_db_error(&e))?; + conn.ping().await.map_err(|e| map_db_error(&e)) + } + + /// Test support only: close the underlying connection pool. + /// + /// Do not call from runtime code; this tears down the active pool. + #[cfg(any(test, feature = "test-support"))] + pub async fn close(&self) { + self.pool.close().await; + } + pub async fn put( &self, object_type: &str, diff --git a/crates/openshell-server/src/persistence/sqlite.rs b/crates/openshell-server/src/persistence/sqlite.rs index bdfadc8b0..1958b3232 100644 --- a/crates/openshell-server/src/persistence/sqlite.rs +++ b/crates/openshell-server/src/persistence/sqlite.rs @@ -11,7 +11,7 @@ use crate::policy_store::{ }; use openshell_core::paths::set_file_owner_only; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; -use sqlx::{Row, SqlitePool}; +use sqlx::{Connection, Row, SqlitePool}; use std::path::{Path, PathBuf}; use std::str::FromStr; @@ -65,6 +65,21 @@ impl SqliteStore { .map_err(|e| map_migrate_error(&e)) } + /// Verify the database is reachable by acquiring a pooled connection + /// and issuing a ping. + pub async fn ping(&self) -> PersistenceResult<()> { + let mut conn = self.pool.acquire().await.map_err(|e| map_db_error(&e))?; + conn.ping().await.map_err(|e| map_db_error(&e)) + } + + /// Test support only: close the underlying connection pool. + /// + /// Do not call from runtime code; this tears down the active pool. + #[cfg(any(test, feature = "test-support"))] + pub async fn close(&self) { + self.pool.close().await; + } + pub async fn put( &self, object_type: &str, diff --git a/crates/openshell-server/src/readiness.rs b/crates/openshell-server/src/readiness.rs new file mode 100644 index 000000000..7ca21605a --- /dev/null +++ b/crates/openshell-server/src/readiness.rs @@ -0,0 +1,339 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Background database health monitor. +//! +//! Decouples the readiness probe from the underlying database call: a +//! background task periodically pings `Store::ping()` and publishes the +//! result to a [`tokio::sync::watch`] channel. The `/readyz` and `/health` +//! handlers read that channel synchronously, so probe responses are +//! sub-millisecond and never race with the kubelet's probe timeout. +//! +//! Both polling parameters are hardcoded by design: +//! +//! - [`DEFAULT_CHECK_INTERVAL`] (5s) bounds how stale the published state +//! can be — once a DB outage occurs, `/readyz` reflects it within at +//! most one interval (plus the next handler read by the kubelet). +//! - [`DEFAULT_CHECK_TIMEOUT`] (2s) bounds a single ping. A `SELECT 1` that +//! takes longer than 2s is itself a symptom of an unhealthy database; +//! the monitor records the iteration as a `Timeout` and the system goes +//! `Unhealthy`. +//! +//! The interval/timeout invariant (`timeout < interval`) is enforced by +//! construction; the only public constructor wires the defaults. + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use metrics::{gauge, histogram}; +use tokio::sync::watch; +use tracing::warn; + +use crate::persistence::Store; + +/// How often the background task pings the database. +pub const DEFAULT_CHECK_INTERVAL: Duration = Duration::from_secs(5); + +/// Maximum time a single database ping is allowed to take. +/// +/// Must be strictly less than [`DEFAULT_CHECK_INTERVAL`] so a slow ping +/// cannot push the effective polling cycle past the interval. +pub const DEFAULT_CHECK_TIMEOUT: Duration = Duration::from_secs(2); + +const _: () = assert!( + DEFAULT_CHECK_TIMEOUT.as_secs() < DEFAULT_CHECK_INTERVAL.as_secs(), + "DEFAULT_CHECK_TIMEOUT must be strictly less than DEFAULT_CHECK_INTERVAL" +); + +const METRIC_READINESS_DATABASE_HEALTHY: &str = "openshell_server_readiness_database_healthy"; +const METRIC_READINESS_DATABASE_PROBE_DURATION_SECONDS: &str = + "openshell_server_readiness_database_probe_duration_seconds"; +const METRIC_OUTCOME_LABEL: &str = "outcome"; + +/// Latest published database health state. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum HealthState { + /// The monitor has not yet completed its first ping. + /// + /// Treated as unhealthy by the handler: probes that arrive before the + /// first iteration see `503` and the pod stays `NotReady` until the + /// monitor settles. Resolves within one interval at startup. + Initializing, + + /// Latest ping succeeded. + Healthy { + /// Measured ping duration in milliseconds. + latency_ms: u64, + }, + + /// Latest ping failed or timed out. + Unhealthy(HealthError), +} + +impl HealthState { + /// Returns `true` when the latest published state is `Healthy`. + #[must_use] + pub const fn is_healthy(&self) -> bool { + matches!(self, Self::Healthy { .. }) + } +} + +/// Reason the latest iteration failed. +/// +/// Latency is carried per-variant because the invariant differs: an +/// `Unavailable` outcome always has a measured duration (the call returned +/// before the timeout fired), while `Timeout` never does (the call never +/// returned). Encoding this in the type prevents the call site from having +/// to invent a placeholder value for the timeout case. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum HealthError { + /// The persistence layer returned an error before the timeout fired. + Unavailable { + /// Measured duration of the failed ping, in milliseconds. + latency_ms: u64, + }, + /// The persistence layer did not respond within [`DEFAULT_CHECK_TIMEOUT`]. + Timeout, +} + +/// Background monitor that polls [`Store::ping`] on a fixed cadence and +/// publishes the latest result to a [`watch::Receiver`]. +/// +/// Construction spawns a Tokio task that lives until the runtime is +/// dropped. The task only holds an `Arc` clone and the +/// [`watch::Sender`], so dropping the monitor wrapper does not stop the +/// background polling — that is intentional, the gateway runtime owns the +/// monitor for the lifetime of the process. Tests rely on the same +/// "task lives until runtime exits" semantics since each test gets its own +/// Tokio runtime. +pub struct DatabaseHealthMonitor { + receiver: watch::Receiver, +} + +impl DatabaseHealthMonitor { + /// Spawn a monitor with the production defaults. + #[must_use] + pub fn spawn(store: Arc) -> Self { + Self::spawn_with(store, DEFAULT_CHECK_INTERVAL, DEFAULT_CHECK_TIMEOUT) + } + + /// Spawn a monitor with custom polling parameters. + /// + /// Intended for tests that want fast iteration. Production paths should + /// use [`spawn`] so the polling cadence stays consistent across + /// deployments. + #[must_use] + pub fn spawn_with(store: Arc, interval: Duration, timeout: Duration) -> Self { + let (tx, rx) = watch::channel(HealthState::Initializing); + tokio::spawn(monitor_loop(store, tx, interval, timeout)); + Self { receiver: rx } + } + + /// Subscribe to state updates. + /// + /// Returned receivers always observe the latest value with no lock + /// contention (`tokio::sync::watch` semantics). + #[must_use] + pub fn subscribe(&self) -> watch::Receiver { + self.receiver.clone() + } + + /// Wait until the monitor publishes its first non-`Initializing` state. + /// + /// Test-only: production builds intentionally do not block on the first + /// poll so the health listener is responsive from t=0 (probes during + /// the warmup window see a structured `Initializing → 503` instead of a + /// TCP-level hang). Tests that need a deterministic post-warmup state + /// call this before constructing the router. + #[cfg(test)] + pub(crate) async fn wait_until_polled(&mut self) { + while matches!(*self.receiver.borrow(), HealthState::Initializing) { + if self.receiver.changed().await.is_err() { + return; + } + } + } +} + +async fn monitor_loop( + store: Arc, + tx: watch::Sender, + interval: Duration, + timeout: Duration, +) { + let mut ticker = tokio::time::interval(interval); + // `Skip` keeps the schedule when a tick is missed (e.g. because a ping + // approached the timeout). `Burst` (the default) would fire back-to-back + // catch-up pings, defeating the bounded-cadence guarantee. + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + ticker.tick().await; + let state = run_check(store.as_ref(), timeout).await; + record_metrics(&state); + // Send errors only happen when every receiver is dropped, at which + // point the monitor is shutting down — exit cleanly. + if tx.send(state).is_err() { + break; + } + } +} + +async fn run_check(store: &Store, timeout: Duration) -> HealthState { + let started = Instant::now(); + match tokio::time::timeout(timeout, store.ping()).await { + Ok(Ok(())) => HealthState::Healthy { + latency_ms: elapsed_ms(started.elapsed()), + }, + Ok(Err(err)) => { + let latency_ms = elapsed_ms(started.elapsed()); + warn!(error = %err, latency_ms, "database health check failed"); + HealthState::Unhealthy(HealthError::Unavailable { latency_ms }) + } + Err(_) => { + warn!( + timeout_ms = u64::try_from(timeout.as_millis()).unwrap_or(u64::MAX), + "database health check timed out" + ); + HealthState::Unhealthy(HealthError::Timeout) + } + } +} + +fn record_metrics(state: &HealthState) { + let (healthy, outcome_label, latency_seconds) = match state { + HealthState::Initializing => return, + HealthState::Healthy { latency_ms } => { + (true, "success", duration_seconds_from_ms(*latency_ms)) + } + HealthState::Unhealthy(HealthError::Unavailable { latency_ms }) => { + (false, "db_error", duration_seconds_from_ms(*latency_ms)) + } + HealthState::Unhealthy(HealthError::Timeout) => { + (false, "timeout", DEFAULT_CHECK_TIMEOUT.as_secs_f64()) + } + }; + + gauge!(METRIC_READINESS_DATABASE_HEALTHY).set(if healthy { 1.0 } else { 0.0 }); + histogram!( + METRIC_READINESS_DATABASE_PROBE_DURATION_SECONDS, + METRIC_OUTCOME_LABEL => outcome_label + ) + .record(latency_seconds); +} + +fn duration_seconds_from_ms(ms: u64) -> f64 { + #[allow(clippy::cast_precision_loss)] + { + ms as f64 / 1000.0 + } +} + +fn elapsed_ms(elapsed: Duration) -> u64 { + u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX) +} + +#[cfg(test)] +mod tests { + use super::*; + + async fn fresh_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store"), + ) + } + + #[tokio::test] + async fn first_state_is_initializing_then_transitions_to_healthy() { + let store = fresh_store().await; + let mut monitor = DatabaseHealthMonitor::spawn_with( + store, + Duration::from_millis(10), + Duration::from_secs(1), + ); + + assert_eq!(*monitor.subscribe().borrow(), HealthState::Initializing); + + monitor.wait_until_polled().await; + let state = monitor.subscribe().borrow().clone(); + assert!( + matches!(state, HealthState::Healthy { .. }), + "expected Healthy, got {state:?}" + ); + } + + #[cfg(feature = "test-support")] + #[tokio::test] + async fn detects_database_outage_within_one_interval() { + let store = fresh_store().await; + let mut monitor = DatabaseHealthMonitor::spawn_with( + store.clone(), + Duration::from_millis(20), + Duration::from_secs(1), + ); + monitor.wait_until_polled().await; + assert!(monitor.subscribe().borrow().is_healthy()); + + store.close().await; + + // Wait for the next state change after the close (the polling loop + // will pick it up within the interval). + let mut rx = monitor.subscribe(); + let deadline = Instant::now() + Duration::from_secs(2); + loop { + assert!( + rx.changed().await.is_ok(), + "monitor task ended before reporting outage" + ); + let state = rx.borrow().clone(); + if matches!( + state, + HealthState::Unhealthy(HealthError::Unavailable { .. }) + ) { + break; + } + assert!( + Instant::now() < deadline, + "monitor did not transition to Unhealthy in time" + ); + } + } + + #[tokio::test] + async fn slow_ping_is_recorded_as_timeout() { + // Drive the loop directly so we can hand it a ping future that + // never completes, isolating the timeout path from the live Store. + let (tx, rx) = watch::channel(HealthState::Initializing); + let timeout = Duration::from_millis(10); + + tokio::spawn(async move { + let state = tokio::time::timeout( + timeout, + std::future::pending::>(), + ) + .await; + let outcome = match state { + Ok(_) => unreachable!("pending future cannot resolve"), + Err(_) => HealthState::Unhealthy(HealthError::Timeout), + }; + let _ = tx.send(outcome); + }); + + let mut rx = rx; + rx.changed().await.expect("monitor publishes a state"); + let state = rx.borrow().clone(); + assert!( + matches!(state, HealthState::Unhealthy(HealthError::Timeout)), + "expected Timeout, got {state:?}" + ); + } + + #[test] + fn default_check_timeout_is_strictly_less_than_default_check_interval() { + // Sanity guard duplicated as a runtime test so CI catches any + // regression on the const_assert above. + assert!(DEFAULT_CHECK_TIMEOUT < DEFAULT_CHECK_INTERVAL); + } +} diff --git a/crates/openshell-server/tests/auth_endpoint_integration.rs b/crates/openshell-server/tests/auth_endpoint_integration.rs index c1ea74b9b..b4d1e6436 100644 --- a/crates/openshell-server/tests/auth_endpoint_integration.rs +++ b/crates/openshell-server/tests/auth_endpoint_integration.rs @@ -394,13 +394,19 @@ async fn plaintext_server_accepts_grpc_and_http() { HealthRequest, ServiceStatus, open_shell_client::OpenShellClient, open_shell_server::OpenShellServer, }; - use openshell_server::{MultiplexedService, health_router}; + use openshell_server::{MultiplexedService, Store, health_router}; + use std::sync::Arc; let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); + let store = Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ); let grpc_service = OpenShellServer::new(TestOpenShell); - let http_service = health_router(); + let http_service = health_router(store); let service = MultiplexedService::new(grpc_service, http_service); let server = tokio::spawn(async move { diff --git a/crates/openshell-server/tests/common/mod.rs b/crates/openshell-server/tests/common/mod.rs index 3a8ecb5b3..9e46677e6 100644 --- a/crates/openshell-server/tests/common/mod.rs +++ b/crates/openshell-server/tests/common/mod.rs @@ -26,10 +26,11 @@ use openshell_core::proto::{ UpdateProviderRequest, WatchSandboxRequest, open_shell_server::{OpenShell, OpenShellServer}, }; -use openshell_server::{MultiplexedService, TlsAcceptor, health_router}; +use openshell_server::{MultiplexedService, Store, TlsAcceptor, health_router}; use rcgen::{CertificateParams, IsCa, KeyPair}; use std::io::Write; use std::net::SocketAddr; +use std::sync::Arc; use tempfile::tempdir; use tokio::net::TcpListener; use tokio::sync::mpsc; @@ -534,7 +535,7 @@ pub async fn start_test_server( let addr = listener.local_addr().unwrap(); let grpc_service = OpenShellServer::new(TestOpenShell); - let http_service = health_router(); + let http_service = health_router(test_health_store().await); let service = MultiplexedService::new(grpc_service, http_service); let handle = tokio::spawn(async move { @@ -557,3 +558,13 @@ pub async fn start_test_server( (addr, handle) } + +/// Build an in-memory store sufficient for wiring `health_router` in tests +/// where the persistence layer itself is not under test. +pub async fn test_health_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ) +} diff --git a/crates/openshell-server/tests/health_endpoint_integration.rs b/crates/openshell-server/tests/health_endpoint_integration.rs new file mode 100644 index 000000000..45cd50135 --- /dev/null +++ b/crates/openshell-server/tests/health_endpoint_integration.rs @@ -0,0 +1,146 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use bytes::Bytes; +use http_body_util::{BodyExt, Empty}; +use hyper::{Request, StatusCode}; +use hyper_util::rt::TokioIo; +use openshell_server::{Store, health_router}; +use serde_json::Value; +use std::sync::Arc; +use std::time::Duration; +use tokio::net::TcpListener; + +async fn start_health_server( + store: Arc, +) -> (std::net::SocketAddr, tokio::task::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind ephemeral health test listener"); + let addr = listener + .local_addr() + .expect("resolve local address for health test listener"); + + let router = health_router(store); + let server = tokio::spawn(async move { + let _ = axum::serve(listener, router.into_make_service()).await; + }); + + (addr, server) +} + +async fn http_get_json(addr: std::net::SocketAddr, path: &str) -> (StatusCode, Value) { + let stream = tokio::net::TcpStream::connect(addr) + .await + .expect("connect test HTTP client"); + let (mut sender, conn) = hyper::client::conn::http1::Builder::new() + .handshake(TokioIo::new(stream)) + .await + .expect("handshake HTTP/1 test client"); + tokio::spawn(async move { + let _ = conn.await; + }); + + let req = Request::builder() + .method("GET") + .uri(format!("http://{addr}{path}")) + .body(Empty::::new()) + .expect("build HTTP request"); + let resp = sender.send_request(req).await.expect("send HTTP request"); + let status = resp.status(); + let bytes = resp + .into_body() + .collect() + .await + .expect("collect response body") + .to_bytes(); + let body = if bytes.is_empty() { + Value::Null + } else { + serde_json::from_slice(&bytes).expect("response body must be valid JSON") + }; + (status, body) +} + +#[tokio::test] +async fn readyz_reports_healthy_when_database_is_reachable() { + let store = Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for health integration test"), + ); + let (addr, server) = start_health_server(store.clone()).await; + + // `health_router` does not block on the first poll, so /readyz starts in + // `Initializing → 503` until the background monitor publishes the first + // healthy state (sub-millisecond for in-memory SQLite, but still a race). + let (status, body) = wait_for_status(addr, StatusCode::OK, Duration::from_secs(2)) + .await + .expect("/readyz did not become healthy within 2s"); + assert_eq!(status, StatusCode::OK); + assert_eq!(body["status"], "healthy"); + assert_eq!(body["checks"]["database"]["status"], "healthy"); + + server.abort(); +} + +#[cfg(feature = "test-support")] +#[tokio::test] +async fn readyz_reports_database_health_transition_after_close() { + let store = Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for health integration test"), + ); + let (addr, server) = start_health_server(store.clone()).await; + + let (status, body) = wait_for_status(addr, StatusCode::OK, Duration::from_secs(2)) + .await + .expect("/readyz did not become healthy within 2s"); + assert_eq!(status, StatusCode::OK); + assert_eq!(body["status"], "healthy"); + assert_eq!(body["checks"]["database"]["status"], "healthy"); + + store.close().await; + + // The handler reads the cached state published by the background + // readiness monitor, so the transition to Unhealthy can only show up + // after the monitor's next tick. With the default 5s interval the + // outage surfaces within ~5s; poll with a generous deadline so the + // assertion never races the polling cycle. + let (status, body) = wait_for_status( + addr, + StatusCode::SERVICE_UNAVAILABLE, + Duration::from_secs(10), + ) + .await + .expect("/readyz did not transition to 503 after store.close() within 10s"); + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + assert_eq!(body["status"], "unhealthy"); + assert_eq!(body["checks"]["database"]["status"], "unhealthy"); + assert_eq!(body["checks"]["database"]["error"], "database unavailable"); + + server.abort(); +} + +/// Poll `/readyz` until it returns `expected`, or give up after `timeout`. +/// +/// Used to bridge the gap between `health_router`'s non-blocking startup +/// and the background monitor publishing its first probe outcome. +async fn wait_for_status( + addr: std::net::SocketAddr, + expected: StatusCode, + timeout: Duration, +) -> Option<(StatusCode, Value)> { + let deadline = tokio::time::Instant::now() + timeout; + loop { + let observation = http_get_json(addr, "/readyz").await; + if observation.0 == expected { + return Some(observation); + } + if tokio::time::Instant::now() >= deadline { + return None; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } +} diff --git a/crates/openshell-server/tests/multiplex_integration.rs b/crates/openshell-server/tests/multiplex_integration.rs index 9ca1ee3ee..2474ccebf 100644 --- a/crates/openshell-server/tests/multiplex_integration.rs +++ b/crates/openshell-server/tests/multiplex_integration.rs @@ -15,7 +15,8 @@ use openshell_core::proto::{ HealthRequest, ServiceStatus, open_shell_client::OpenShellClient, open_shell_server::OpenShellServer, }; -use openshell_server::{MultiplexedService, health_router}; +use openshell_server::{MultiplexedService, Store, health_router}; +use std::sync::Arc; use tokio::net::TcpListener; #[tokio::test] @@ -24,7 +25,7 @@ async fn serves_grpc_and_http_on_same_port() { let addr = listener.local_addr().unwrap(); let grpc_service = OpenShellServer::new(TestOpenShell); - let http_service = health_router(); + let http_service = health_router(test_health_store().await); let service = MultiplexedService::new(grpc_service, http_service); let server = tokio::spawn(async move { @@ -101,7 +102,7 @@ async fn grpc_response_propagates_request_id() { )) .layer(PropagateRequestIdLayer::new(x_request_id)) .service(OpenShellServer::new(TestOpenShell)); - let http_service = health_router(); + let http_service = health_router(test_health_store().await); let service = MultiplexedService::new(grpc_service, http_service); tokio::spawn(async move { @@ -139,3 +140,13 @@ async fn grpc_response_propagates_request_id() { let echoed = response.metadata().get("x-request-id").unwrap(); assert_eq!(echoed.to_str().unwrap(), "grpc-corr-id"); } + +/// Build an in-memory store sufficient for wiring `health_router` in tests +/// where the persistence layer itself is not under test. +async fn test_health_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ) +} diff --git a/crates/openshell-server/tests/supervisor_relay_integration.rs b/crates/openshell-server/tests/supervisor_relay_integration.rs index aae6d8cf1..1fb98d6f0 100644 --- a/crates/openshell-server/tests/supervisor_relay_integration.rs +++ b/crates/openshell-server/tests/supervisor_relay_integration.rs @@ -28,7 +28,7 @@ use openshell_core::proto::{ open_shell_server::{OpenShell, OpenShellServer}, }; use openshell_server::supervisor_session::SupervisorSessionRegistry; -use openshell_server::{MultiplexedService, health_router}; +use openshell_server::{MultiplexedService, Store, health_router}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; use tokio::sync::mpsc; @@ -405,7 +405,7 @@ async fn spawn_gateway(registry: Arc) -> Channel { let addr = listener.local_addr().unwrap(); let grpc = OpenShellServer::new(RelayGateway { registry }); - let service = MultiplexedService::new(grpc, health_router()); + let service = MultiplexedService::new(grpc, health_router(test_health_store().await)); tokio::spawn(async move { loop { @@ -696,3 +696,13 @@ async fn open_relay_enforces_per_sandbox_cap_under_concurrent_burst() { .await .expect("other sandbox should not be affected by sbx cap"); } + +/// Build an in-memory store sufficient for wiring `health_router` in tests +/// where the persistence layer itself is not under test. +async fn test_health_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ) +} diff --git a/crates/openshell-server/tests/ws_tunnel_integration.rs b/crates/openshell-server/tests/ws_tunnel_integration.rs index ee253e9dd..5e0d58ec9 100644 --- a/crates/openshell-server/tests/ws_tunnel_integration.rs +++ b/crates/openshell-server/tests/ws_tunnel_integration.rs @@ -44,8 +44,9 @@ use openshell_core::proto::{ HealthRequest, ServiceStatus, open_shell_client::OpenShellClient, open_shell_server::OpenShellServer, }; -use openshell_server::{MultiplexedService, health_router}; +use openshell_server::{MultiplexedService, Store, health_router}; use std::net::SocketAddr; +use std::sync::Arc; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio_tungstenite::tungstenite; @@ -289,7 +290,7 @@ async fn start_grpc_server() -> (SocketAddr, tokio::task::JoinHandle<()>) { let addr = listener.local_addr().unwrap(); let grpc_service = OpenShellServer::new(TestOpenShell); - let http_service = health_router(); + let http_service = health_router(test_health_store().await); let service = MultiplexedService::new(grpc_service, http_service); let handle = tokio::spawn(async move { @@ -317,7 +318,7 @@ async fn start_ws_tunnel_server() -> (SocketAddr, tokio::task::JoinHandle<()>) { let addr = listener.local_addr().unwrap(); let grpc_service = OpenShellServer::new(TestOpenShell); - let http_service = health_router(); + let http_service = health_router(test_health_store().await); let app = test_ws_tunnel_router(MultiplexedService::new(grpc_service, http_service)); let handle = tokio::spawn(async move { @@ -547,3 +548,13 @@ async fn ws_tunnel_graceful_close() { stack.abort(); } + +/// Build an in-memory store sufficient for wiring `health_router` in tests +/// where the persistence layer itself is not under test. +async fn test_health_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ) +} diff --git a/deploy/helm/openshell/README.md b/deploy/helm/openshell/README.md index 390571062..5f00e626a 100644 --- a/deploy/helm/openshell/README.md +++ b/deploy/helm/openshell/README.md @@ -58,6 +58,12 @@ See [`values.yaml`](values.yaml) for source defaults. Selected overlays: - [`ci/values-cert-manager.yaml`](ci/values-cert-manager.yaml) - cert-manager integration - [`ci/values-keycloak.yaml`](ci/values-keycloak.yaml) - Keycloak OIDC integration +`/readyz` reflects a cached database health state refreshed by an in-process +background task (interval and per-check timeout are hardcoded gateway-side). +Probe response latency is sub-millisecond, so no chart-level tuning of the +readiness check is needed - adjust `probes.readiness.*` like any standard +Kubernetes readiness probe. + ## PKI bootstrap By default, a pre-install/pre-upgrade hook Job runs `openshell-gateway generate-certs` @@ -118,7 +124,7 @@ cert-manager alternative. | probes.readiness.failureThreshold | int | `3` | Readiness probe failure threshold before the pod is marked not ready. | | probes.readiness.initialDelaySeconds | int | `1` | Readiness probe initial delay, in seconds. | | probes.readiness.periodSeconds | int | `2` | Readiness probe period, in seconds. | -| probes.readiness.timeoutSeconds | int | `1` | Readiness probe timeout, in seconds. | +| probes.readiness.timeoutSeconds | int | `2` | Readiness probe timeout, in seconds. | | probes.startup.failureThreshold | int | `30` | Startup probe failure threshold before the container is killed. | | probes.startup.periodSeconds | int | `2` | Startup probe period, in seconds. | | probes.startup.timeoutSeconds | int | `1` | Startup probe timeout, in seconds. | diff --git a/deploy/helm/openshell/README.md.gotmpl b/deploy/helm/openshell/README.md.gotmpl index 5068d6848..86f9489a2 100644 --- a/deploy/helm/openshell/README.md.gotmpl +++ b/deploy/helm/openshell/README.md.gotmpl @@ -58,6 +58,12 @@ See [`values.yaml`](values.yaml) for source defaults. Selected overlays: - [`ci/values-cert-manager.yaml`](ci/values-cert-manager.yaml) - cert-manager integration - [`ci/values-keycloak.yaml`](ci/values-keycloak.yaml) - Keycloak OIDC integration +`/readyz` reflects a cached database health state refreshed by an in-process +background task (interval and per-check timeout are hardcoded gateway-side). +Probe response latency is sub-millisecond, so no chart-level tuning of the +readiness check is needed - adjust `probes.readiness.*` like any standard +Kubernetes readiness probe. + ## PKI bootstrap By default, a pre-install/pre-upgrade hook Job runs `openshell-gateway generate-certs` diff --git a/deploy/helm/openshell/values.yaml b/deploy/helm/openshell/values.yaml index 26ba1b5b5..80e7e1f31 100644 --- a/deploy/helm/openshell/values.yaml +++ b/deploy/helm/openshell/values.yaml @@ -107,7 +107,7 @@ probes: # -- Readiness probe period, in seconds. periodSeconds: 2 # -- Readiness probe timeout, in seconds. - timeoutSeconds: 1 + timeoutSeconds: 2 # -- Readiness probe failure threshold before the pod is marked not ready. failureThreshold: 3 diff --git a/docs/kubernetes/setup.mdx b/docs/kubernetes/setup.mdx index bb9997305..8c8d507bb 100644 --- a/docs/kubernetes/setup.mdx +++ b/docs/kubernetes/setup.mdx @@ -149,6 +149,15 @@ helm upgrade --install openshell \ --values my-values.yaml ``` +## Probes + +The gateway exposes `/healthz` for process liveness and `/readyz` for dependency-aware readiness on the health port. The Helm chart wires both into Kubernetes probes: + +- `startupProbe` and `livenessProbe` use `/healthz`. +- `readinessProbe` uses `/readyz`, which reflects the latest result of an in-process background database check. + +Handler responses are sub-millisecond, so the readiness probe never races the kubelet's `timeoutSeconds`. No gateway-side tuning is exposed: the background check interval and per-check timeout are hardcoded to safe defaults. Adjust `probes.readiness.*` like any standard Kubernetes readiness probe. + ## Next Steps - To enable automatic certificate rotation with cert-manager, refer to [Managing Certificates](/kubernetes/managing-certificates). diff --git a/docs/sandboxes/manage-gateways.mdx b/docs/sandboxes/manage-gateways.mdx index 6cfa39121..0cf8837fb 100644 --- a/docs/sandboxes/manage-gateways.mdx +++ b/docs/sandboxes/manage-gateways.mdx @@ -130,6 +130,8 @@ openshell status openshell gateway info ``` +Gateways expose `/healthz` for process liveness and `/readyz` for dependency-aware readiness on the health port. `/readyz` reflects the latest result of a background database health check. + For Docker-backed local gateways, inspect Docker and the gateway process or container started by your local workflow: ```shell diff --git a/e2e/rust/Cargo.lock b/e2e/rust/Cargo.lock index 61f15866b..990aa5c46 100644 --- a/e2e/rust/Cargo.lock +++ b/e2e/rust/Cargo.lock @@ -8,6 +8,12 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "base64" version = "0.22.1" @@ -98,6 +104,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + [[package]] name = "generic-array" version = "0.14.7" @@ -160,6 +181,79 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "http" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +dependencies = [ + "bytes", + "itoa", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + +[[package]] +name = "hyper" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "http", + "http-body", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + +[[package]] +name = "hyper-util" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" +dependencies = [ + "bytes", + "http", + "http-body", + "hyper", + "pin-project-lite", + "tokio", +] + [[package]] name = "id-arena" version = "2.3.0" @@ -245,7 +339,11 @@ name = "openshell-e2e" version = "0.1.0" dependencies = [ "base64", + "bytes", "hex", + "http-body-util", + "hyper", + "hyper-util", "rand", "serde_json", "sha1", @@ -537,6 +635,12 @@ dependencies = [ "syn", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "typenum" version = "1.19.0" @@ -561,6 +665,15 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" diff --git a/e2e/rust/Cargo.toml b/e2e/rust/Cargo.toml index 7d7f1411c..26957baab 100644 --- a/e2e/rust/Cargo.toml +++ b/e2e/rust/Cargo.toml @@ -56,6 +56,11 @@ name = "vm_gateway_resume" path = "tests/vm_gateway_resume.rs" required-features = ["e2e-vm"] +[[test]] +name = "readyz_health" +path = "tests/readyz_health.rs" +required-features = ["e2e-kubernetes"] + [[test]] name = "websocket_conformance" path = "tests/websocket_conformance.rs" @@ -88,6 +93,10 @@ required-features = ["e2e-gpu"] [dependencies] base64 = "0.22" +bytes = "1" +http-body-util = "0.1" +hyper = { version = "1", features = ["client", "http1"] } +hyper-util = { version = "0.1", features = ["tokio"] } tokio = { version = "1.43", features = ["full"] } tempfile = "3" sha1 = "0.10" diff --git a/e2e/rust/e2e-kubernetes.sh b/e2e/rust/e2e-kubernetes.sh index 4ec0d485d..0644a0618 100755 --- a/e2e/rust/e2e-kubernetes.sh +++ b/e2e/rust/e2e-kubernetes.sh @@ -19,7 +19,7 @@ set -euo pipefail ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" -E2E_FEATURES="${OPENSHELL_E2E_KUBERNETES_FEATURES:-e2e,e2e-host-gateway}" +E2E_FEATURES="${OPENSHELL_E2E_KUBERNETES_FEATURES:-e2e,e2e-host-gateway,e2e-kubernetes}" cargo build -p openshell-cli --features openshell-core/dev-settings diff --git a/e2e/rust/tests/readyz_health.rs b/e2e/rust/tests/readyz_health.rs new file mode 100644 index 000000000..8f093dabe --- /dev/null +++ b/e2e/rust/tests/readyz_health.rs @@ -0,0 +1,95 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +#![cfg(feature = "e2e-kubernetes")] + +use bytes::Bytes; +use http_body_util::{BodyExt, Empty}; +use hyper::Request; +use hyper_util::rt::TokioIo; +use serde_json::Value; +use std::time::{Duration, Instant}; +use tokio::net::TcpStream; + +fn health_port_from_env() -> u16 { + let raw = std::env::var("OPENSHELL_E2E_HEALTH_PORT").unwrap_or_else(|_| { + panic!( + "OPENSHELL_E2E_HEALTH_PORT is not set. The Kubernetes e2e wrapper \ + (e2e/with-kube-gateway.sh) must export this variable so the \ + /readyz test can reach the gateway health listener." + ) + }); + raw.parse::().unwrap_or_else(|err| { + panic!("OPENSHELL_E2E_HEALTH_PORT=\"{raw}\" is not a valid u16 port: {err}") + }) +} + +async fn http_get_json(port: u16, path: &str) -> Result<(u16, Value), String> { + let stream = TcpStream::connect(("127.0.0.1", port)) + .await + .map_err(|err| format!("connect health endpoint :{port}: {err}"))?; + let (mut sender, conn) = hyper::client::conn::http1::Builder::new() + .handshake(TokioIo::new(stream)) + .await + .map_err(|err| format!("handshake health HTTP/1 client :{port}: {err}"))?; + tokio::spawn(async move { + let _ = conn.await; + }); + + let req = Request::builder() + .method("GET") + .uri(format!("http://127.0.0.1:{port}{path}")) + .body(Empty::::new()) + .map_err(|err| format!("build health request {path}: {err}"))?; + let resp = sender + .send_request(req) + .await + .map_err(|err| format!("send health request {path} to :{port}: {err}"))?; + let status_code = resp.status().as_u16(); + let bytes = resp + .into_body() + .collect() + .await + .map_err(|err| format!("read health response body {path}: {err}"))? + .to_bytes(); + let json = serde_json::from_slice::(&bytes) + .map_err(|err| format!("health endpoint {path} did not return valid JSON: {err}"))?; + + Ok((status_code, json)) +} + +#[tokio::test] +async fn readyz_reports_healthy_database_check() { + let port = health_port_from_env(); + + let deadline = Instant::now() + Duration::from_secs(20); + let timeout_detail = loop { + let observation = match http_get_json(port, "/readyz").await { + Ok((status, payload)) => { + let ready = status == 200 + && payload["status"] == "healthy" + && payload["checks"]["database"]["status"] == "healthy"; + if ready { + assert!( + payload["checks"]["database"]["latency_ms"].is_number(), + "readyz payload should include checks.database.latency_ms: {payload}" + ); + assert!( + payload["checks"]["database"]["error"].is_null(), + "readyz payload should not include checks.database.error when healthy: {payload}" + ); + return; + } + format!("unexpected /readyz response status={status} payload={payload}") + } + Err(err) => err, + }; + + if Instant::now() >= deadline { + break observation; + } + + tokio::time::sleep(Duration::from_secs(1)).await; + }; + panic!("timed out waiting for /readyz healthy response after 20s: {timeout_detail}"); +} diff --git a/e2e/rust/tests/user_namespaces.rs b/e2e/rust/tests/user_namespaces.rs index 9aa714767..a0b9d4590 100644 --- a/e2e/rust/tests/user_namespaces.rs +++ b/e2e/rust/tests/user_namespaces.rs @@ -114,6 +114,22 @@ async fn wait_for_sandbox_pod(name: &str, timeout_secs: u64) -> Result<(), Strin Err(format!("sandbox pod {name} did not appear within {timeout_secs}s")) } +// Disabled by default — not reachable from any project-controlled cluster +// and brittle by design. See the tracking issue for context. Re-enable +// with `cargo test -- --ignored` after the issues below are addressed. +// +// Blocking issues: +// 1. `kubectl` is invoked as `docker exec openshell-cluster-openshell kubectl`. +// No setup in the repo (helm-k3s-local.sh, e2e/with-kube-gateway.sh, the +// CI kind workflow) creates a docker container with that name; only an +// external OpenShift/manual setup matches it. +// 2. The test mutates the gateway StatefulSet via `kubectl set env`, which +// triggers a pod rollout mid-test. The wrapper's `kubectl port-forward` +// to the old pod is disrupted during the rollout, and the test's +// `openshell sandbox create` is spawned without capturing stderr, so +// transient connection failures surface as a generic "sandbox did not +// appear within 60s" with no actionable signal. +#[ignore = "broken: hardcoded docker exec container name + brittle mid-test gateway rollout (see header)"] #[tokio::test] async fn sandbox_pod_spec_has_user_namespace_fields() { // Enable user namespaces on the gateway. diff --git a/e2e/with-kube-gateway.sh b/e2e/with-kube-gateway.sh index 34a081516..848889485 100755 --- a/e2e/with-kube-gateway.sh +++ b/e2e/with-kube-gateway.sh @@ -42,6 +42,8 @@ NAMESPACE="openshell" RELEASE_NAME="openshell" PORTFORWARD_PID="" PORTFORWARD_LOG="${WORKDIR}/portforward.log" +PORTFORWARD_HEALTH_PID="" +PORTFORWARD_HEALTH_LOG="${WORKDIR}/portforward-health.log" HELM_INSTALLED=0 # Isolate CLI/SDK gateway metadata from the developer's real config. @@ -64,6 +66,11 @@ cleanup() { wait "${PORTFORWARD_PID}" >/dev/null 2>&1 || true fi + if [ -n "${PORTFORWARD_HEALTH_PID}" ]; then + kill "${PORTFORWARD_HEALTH_PID}" >/dev/null 2>&1 || true + wait "${PORTFORWARD_HEALTH_PID}" >/dev/null 2>&1 || true + fi + if [ "${exit_code}" -ne 0 ] && [ -n "${KUBE_CONTEXT}" ] && [ -n "${NAMESPACE}" ]; then if command -v kubectl >/dev/null 2>&1 \ && kctl get namespace "${NAMESPACE}" >/dev/null 2>&1; then @@ -83,6 +90,11 @@ cleanup() { cat "${PORTFORWARD_LOG}" || true echo "=== end port-forward log ===" fi + if [ -f "${PORTFORWARD_HEALTH_LOG}" ]; then + echo "=== health port-forward log ===" + cat "${PORTFORWARD_HEALTH_LOG}" || true + echo "=== end health port-forward log ===" + fi fi if [ "${HELM_INSTALLED}" = "1" ] && [ -n "${KUBE_CONTEXT}" ] && [ -n "${NAMESPACE}" ]; then @@ -291,6 +303,40 @@ if [ "${elapsed}" -ge "${timeout}" ]; then exit 1 fi +# Dedicated port-forward to the gateway pod's health listener. The chart's +# Service intentionally exposes only the gRPC and metrics ports — kubelet +# probes the health endpoint directly on the pod IP — so the /readyz e2e +# test reaches it through this separate forward. Target the named `health` +# containerPort declared on the StatefulSet so a future override of +# `service.healthPort` stays compatible without touching this script. +HEALTH_LOCAL_PORT="$(e2e_pick_port)" +echo "Starting kubectl port-forward sts/${RELEASE_NAME} ${HEALTH_LOCAL_PORT}:health..." +kctl -n "${NAMESPACE}" port-forward "sts/${RELEASE_NAME}" \ + "${HEALTH_LOCAL_PORT}:health" >"${PORTFORWARD_HEALTH_LOG}" 2>&1 & +PORTFORWARD_HEALTH_PID=$! + +elapsed=0 +timeout=30 +while [ "${elapsed}" -lt "${timeout}" ]; do + if ! kill -0 "${PORTFORWARD_HEALTH_PID}" 2>/dev/null; then + echo "ERROR: kubectl health port-forward exited before becoming reachable" >&2 + cat "${PORTFORWARD_HEALTH_LOG}" >&2 || true + exit 1 + fi + if curl -s -o /dev/null --connect-timeout 1 "http://127.0.0.1:${HEALTH_LOCAL_PORT}/healthz"; then + break + fi + sleep 1 + elapsed=$((elapsed + 1)) +done +if [ "${elapsed}" -ge "${timeout}" ]; then + echo "ERROR: health port-forward did not accept TCP within ${timeout}s" >&2 + cat "${PORTFORWARD_HEALTH_LOG}" >&2 || true + exit 1 +fi + +export OPENSHELL_E2E_HEALTH_PORT="${HEALTH_LOCAL_PORT}" + GATEWAY_NAME="openshell-e2e-kube-${LOCAL_PORT}" GATEWAY_ENDPOINT="http://127.0.0.1:${LOCAL_PORT}" e2e_register_plaintext_gateway \ diff --git a/tasks/test.toml b/tasks/test.toml index c6ac82180..f86ea7503 100644 --- a/tasks/test.toml +++ b/tasks/test.toml @@ -21,7 +21,12 @@ depends = ["e2e:python:gpu"] ["test:rust"] description = "Run Rust tests" -run = "cargo test --workspace" +run = [ + # Run the workspace once without openshell-server so we can run that crate + # with test-only helpers enabled. + "cargo test --workspace --exclude openshell-server", + "cargo test -p openshell-server --features test-support", +] hide = true ["test:python"]