Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions architecture/gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ health, metrics, or tunnel routes. The plaintext service router also rejects
browser requests whose Fetch Metadata, Origin, or Referer headers indicate a
cross-origin or sibling-subdomain request.

Dedicated health listeners expose `/healthz` (process liveness only) and
`/readyz` (dependency-aware readiness). Readiness reflects the latest result
of an in-process background task that pings the persistence layer on a
fixed cadence; the handler reads a cached state, so responses are
sub-millisecond and never race the kubelet probe timeout.

Supported auth modes:

| Mode | Use |
Expand Down
1 change: 1 addition & 0 deletions crates/openshell-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ x509-parser = "0.16"

[features]
dev-settings = ["openshell-core/dev-settings"]
test-support = []

[dev-dependencies]
hyper-rustls = { version = "0.27", default-features = false, features = ["native-tokio", "http1", "tls12", "logging", "ring", "webpki-tokio"] }
Expand Down
285 changes: 271 additions & 14 deletions crates/openshell-server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@
// SPDX-License-Identifier: Apache-2.0

//! HTTP health endpoints using Axum.
//!
//! Three endpoints with distinct semantics:
//! - `/healthz` — Kubernetes liveness probe. Returns `200 OK` whenever the
//! process is responsive. Intentionally does NOT depend on the database
//! so a transient outage does not cascade into a `CrashLoopBackOff`.
//! - `/readyz` — Kubernetes readiness probe. Reads the cached state
//! published by [`crate::readiness::DatabaseHealthMonitor`] and returns
//! `503 Service Unavailable` when the latest background check failed.
//! Handler latency is sub-millisecond: the database is never pinged from
//! inside the request path, so the response cannot race the kubelet's
//! probe timeout.
//! - `/health` — Alias of `/readyz` for external monitors
//! that conventionally probe `/health`.

use axum::{
Json, Router,
Expand All @@ -14,43 +27,147 @@ use axum::{
use metrics_exporter_prometheus::PrometheusHandle;
use serde::Serialize;
use std::sync::Arc;
use tokio::sync::watch;

/// Health check response.
use crate::persistence::Store;
use crate::readiness::{DatabaseHealthMonitor, HealthError, HealthState};

const STATUS_HEALTHY: &str = "healthy";
const STATUS_UNHEALTHY: &str = "unhealthy";
const DATABASE_INITIALIZING_ERROR: &str = "readiness monitor still initializing";
const DATABASE_UNAVAILABLE_ERROR: &str = "database unavailable";
const DATABASE_TIMEOUT_ERROR: &str = "database health check timed out";

#[derive(Clone)]
struct HealthRouterState {
health: watch::Receiver<HealthState>,
}

/// Per-dependency check entry exposed under `checks` in the JSON payload.
#[derive(Debug, Serialize)]
pub struct DependencyCheck {
/// `"healthy"` or `"unhealthy"`.
pub status: &'static str,

/// Wall-clock time of the latest background ping, when measurable.
#[serde(skip_serializing_if = "Option::is_none")]
pub latency_ms: Option<u64>,

/// Failure detail. Absent on success.
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}

/// Aggregated dependency results.
#[derive(Debug, Serialize)]
pub struct HealthChecks {
pub database: DependencyCheck,
}

/// Readiness response payload.
#[derive(Debug, Serialize)]
pub struct HealthResponse {
/// Service status.
/// Overall status: `"healthy"` if every dependency is healthy.
pub status: &'static str,

/// Service version.
pub version: &'static str,
}

/// Simple health check - returns 200 OK.
async fn health() -> impl IntoResponse {
StatusCode::OK
/// Per-dependency breakdown.
pub checks: HealthChecks,
}

/// Kubernetes liveness probe.
/// Kubernetes liveness probe — process responsiveness only.
async fn healthz() -> impl IntoResponse {
StatusCode::OK
}

/// Kubernetes readiness probe with detailed status.
async fn readyz() -> impl IntoResponse {
/// Kubernetes readiness probe — reflects the cached background DB state.
async fn readyz(State(state): State<Arc<HealthRouterState>>) -> impl IntoResponse {
render_response(&state.health.borrow())
}

/// Convenience alias of [`readyz`] for monitors that probe `/health`.
async fn health(State(state): State<Arc<HealthRouterState>>) -> impl IntoResponse {
render_response(&state.health.borrow())
}

fn render_response(state: &HealthState) -> (StatusCode, Json<HealthResponse>) {
let database = render_database(state);
let healthy = state.is_healthy();
let response = HealthResponse {
status: "healthy",
status: if healthy {
STATUS_HEALTHY
} else {
STATUS_UNHEALTHY
},
version: openshell_core::VERSION,
checks: HealthChecks { database },
};
let code = if healthy {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
(code, Json(response))
}

fn render_database(state: &HealthState) -> DependencyCheck {
match state {
HealthState::Initializing => DependencyCheck {
status: STATUS_UNHEALTHY,
latency_ms: None,
error: Some(DATABASE_INITIALIZING_ERROR.to_string()),
},
HealthState::Healthy { latency_ms } => DependencyCheck {
status: STATUS_HEALTHY,
latency_ms: Some(*latency_ms),
error: None,
},
HealthState::Unhealthy {
reason: HealthError::Unavailable,
latency_ms,
} => DependencyCheck {
status: STATUS_UNHEALTHY,
latency_ms: *latency_ms,
error: Some(DATABASE_UNAVAILABLE_ERROR.to_string()),
},
HealthState::Unhealthy {
reason: HealthError::Timeout,
..
} => DependencyCheck {
status: STATUS_UNHEALTHY,
latency_ms: None,
error: Some(DATABASE_TIMEOUT_ERROR.to_string()),
},
}
}

(StatusCode::OK, Json(response))
/// Build the health router by spawning a background [`DatabaseHealthMonitor`]
/// for `store` and wiring its receiver into the handlers.
///
/// Awaits the monitor's first poll before returning so callers (tests and
/// the server runtime) get a router whose `/readyz` already reflects the
/// real database state by the time it accepts traffic. The background task
/// continues running detached for the remainder of the runtime.
pub async fn health_router(store: Arc<Store>) -> Router {
let mut monitor = DatabaseHealthMonitor::spawn(store);
monitor.wait_until_polled().await;
health_router_from_receiver(monitor.subscribe())
}

/// Create the health router.
pub fn health_router() -> Router {
/// Build the health router from an existing monitor receiver.
///
/// Crate-internal: used by [`health_router`] and by tests that drive the
/// `HealthState` directly without spinning up the polling task.
pub fn health_router_from_receiver(receiver: watch::Receiver<HealthState>) -> Router {
let state = Arc::new(HealthRouterState { health: receiver });

Router::new()
.route("/health", get(health))
.route("/healthz", get(healthz))
.route("/readyz", get(readyz))
.with_state(state)
}

/// Create the metrics router for the dedicated metrics port.
Expand All @@ -64,7 +181,7 @@ async fn render_metrics(State(handle): State<PrometheusHandle>) -> impl IntoResp
handle.render()
}

/// Create the HTTP router.
/// Create the HTTP router served on the multiplexed gateway port.
pub fn http_router(state: Arc<crate::ServerState>) -> Router {
crate::ws_tunnel::router(state.clone())
.merge(crate::auth::router(state.clone()))
Expand Down Expand Up @@ -305,3 +422,143 @@ mod tests {
assert!(!browser_context_allows_plaintext_service_request(&req));
}
}

#[cfg(test)]
mod readiness_tests {
use super::*;
use axum::body::Body;
use http::Request;
use http_body_util::BodyExt;
use tower::ServiceExt;

async fn in_memory_store() -> Arc<Store> {
Arc::new(
Store::connect("sqlite::memory:")
.await
.expect("connect in-memory sqlite store"),
)
}

async fn get(router: Router, path: &str) -> (StatusCode, serde_json::Value) {
let response = router
.oneshot(Request::get(path).body(Body::empty()).unwrap())
.await
.expect("router responds");
let status = response.status();
let bytes = response
.into_body()
.collect()
.await
.expect("collect body")
.to_bytes();
let body = if bytes.is_empty() {
serde_json::Value::Null
} else {
serde_json::from_slice(&bytes).expect("response is valid JSON")
};
(status, body)
}

/// Build a router whose state is driven by a `HealthState` we control,
/// so each handler-shape test can pin the exact mapping under test.
fn router_with_state(state: HealthState) -> Router {
let (_tx, rx) = watch::channel(state);
health_router_from_receiver(rx)
}

#[tokio::test]
async fn healthz_is_minimal_and_does_not_touch_the_database() {
// Liveness must succeed even when the database is unreachable —
// otherwise a transient outage would CrashLoopBackOff the gateway.
let store = in_memory_store().await;
store.close().await;
let (status, body) = get(health_router(store).await, "/healthz").await;
assert_eq!(status, StatusCode::OK);
assert!(body.is_null(), "healthz must return an empty body");
}

#[tokio::test]
async fn readyz_returns_200_with_healthy_payload_when_db_is_reachable() {
let store = in_memory_store().await;
let (status, body) = get(health_router(store).await, "/readyz").await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["status"], "healthy");
assert_eq!(body["checks"]["database"]["status"], "healthy");
assert!(
body["checks"]["database"]["latency_ms"].is_number(),
"expected latency_ms in healthy payload"
);
assert!(
body["checks"]["database"]["error"].is_null(),
"healthy payload must omit the error field"
);
}

#[tokio::test]
async fn health_alias_mirrors_readyz_when_db_is_reachable() {
let store = in_memory_store().await;
let (status, body) = get(health_router(store).await, "/health").await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["status"], "healthy");
assert_eq!(body["checks"]["database"]["status"], "healthy");
}

#[tokio::test]
async fn readyz_returns_503_with_unhealthy_payload_when_db_is_unreachable() {
let store = in_memory_store().await;
store.close().await;
let (status, body) = get(health_router(store).await, "/readyz").await;
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(body["status"], "unhealthy");
assert_eq!(body["checks"]["database"]["status"], "unhealthy");
assert_eq!(
body["checks"]["database"]["error"],
DATABASE_UNAVAILABLE_ERROR
);
}

#[tokio::test]
async fn health_alias_returns_503_when_db_is_unreachable() {
let store = in_memory_store().await;
store.close().await;
let (status, body) = get(health_router(store).await, "/health").await;
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(body["status"], "unhealthy");
assert_eq!(body["checks"]["database"]["status"], "unhealthy");
}

#[tokio::test]
async fn readyz_reports_initializing_state_as_unhealthy_with_explicit_reason() {
let (status, body) = get(router_with_state(HealthState::Initializing), "/readyz").await;
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(body["status"], "unhealthy");
assert_eq!(body["checks"]["database"]["status"], "unhealthy");
assert_eq!(
body["checks"]["database"]["error"],
DATABASE_INITIALIZING_ERROR
);
assert!(
body["checks"]["database"]["latency_ms"].is_null(),
"initializing state has no latency to report yet"
);
}

#[tokio::test]
async fn readyz_renders_timeout_state_with_dedicated_error_string() {
let (status, body) = get(
router_with_state(HealthState::Unhealthy {
reason: HealthError::Timeout,
latency_ms: None,
}),
"/readyz",
)
.await;
assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(body["status"], "unhealthy");
assert_eq!(body["checks"]["database"]["error"], DATABASE_TIMEOUT_ERROR);
assert!(
body["checks"]["database"]["latency_ms"].is_null(),
"timeout state has no completed-call latency"
);
}
}
10 changes: 7 additions & 3 deletions crates/openshell-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod multiplex;
mod persistence;
pub(crate) mod policy_store;
mod provider_refresh;
mod readiness;
mod sandbox_index;
mod sandbox_watch;
mod service_routing;
Expand All @@ -57,7 +58,7 @@ pub use grpc::OpenShellService;
pub use http::{health_router, http_router, metrics_router, service_http_router};
pub use multiplex::{MultiplexService, MultiplexedService};
use openshell_driver_kubernetes::KubernetesComputeConfig;
use persistence::Store;
pub use persistence::Store;
use sandbox_index::SandboxIndex;
use sandbox_watch::SandboxWatchBus;
pub use tls::TlsAcceptor;
Expand Down Expand Up @@ -171,6 +172,7 @@ pub async fn run_server(
if database_url.is_empty() {
return Err(Error::config("database_url is required"));
}

let store = Arc::new(Store::connect(database_url).await?);

let oidc_cache = if let Some(ref oidc) = config.oidc {
Expand Down Expand Up @@ -255,9 +257,11 @@ pub async fn run_server(
))
})?;
info!(address = %health_bind_address, "Health server listening");
// `health_router` awaits the readiness monitor's first poll so the
// listener never serves a stale-by-default state on its first probe.
let router = health_router(store.clone()).await;
tokio::spawn(async move {
if let Err(e) = axum::serve(health_listener, health_router().into_make_service()).await
{
if let Err(e) = axum::serve(health_listener, router.into_make_service()).await {
error!("Health server error: {e}");
}
});
Expand Down
Loading
Loading