From 4514ffde5289540cf5b74e885caeba659e2df66a Mon Sep 17 00:00:00 2001 From: OneNoted Date: Wed, 15 Apr 2026 10:05:30 +0200 Subject: [PATCH 01/10] fix: stop inspectors from leaking runtime storage Session deletion kept qemu artifact directories on disk, qemu image prep could leak docker-backed storage, and guest-runtime lifecycle logic had drifted between the library and binary entrypoint. This change centralizes the runtime implementation in the library, reclaims session-owned runtime directories on teardown, adds safe marker-based startup janitoring plus an explicit reclaim endpoint, and makes qemu prep keep its temporary storage inside the tracked workdir so it can be cleaned deterministically. Constraint: Preserve qemu as the product path while keeping xvfb and display sessions honest Constraint: Avoid new dependencies while making cleanup safe enough for app-local storage Rejected: Docker volume prune as the primary fix | too broad and unsafe for unrelated local projects Rejected: Storage hotfix without lifecycle consolidation | likely to regress because lib.rs and main.rs had drifted Confidence: high Scope-risk: moderate Reversibility: clean Directive: Keep runtime/cache/exports ownership explicit and do not reintroduce session-owned files outside runtime storage Tested: cargo test -p guest-runtime --quiet; bun test apps/control-plane/src/server.desktop-app.test.ts Not-tested: Full end-to-end qemu product boot against a real docker/qemu environment --- .../src/server.desktop-app.test.ts | 46 + apps/control-plane/src/server.ts | 10 + crates/guest-runtime/src/lib.rs | 489 +++- crates/guest-runtime/src/main.rs | 2142 +---------------- crates/guest-runtime/tests/qemu_bridge.rs | 123 + scripts/qemu_guest_assets.py | 117 +- 6 files changed, 677 insertions(+), 2250 deletions(-) diff --git a/apps/control-plane/src/server.desktop-app.test.ts b/apps/control-plane/src/server.desktop-app.test.ts index 09c61eb..2d528ea 100644 --- a/apps/control-plane/src/server.desktop-app.test.ts +++ b/apps/control-plane/src/server.desktop-app.test.ts @@ -54,6 +54,27 @@ async function startGuestServer() { return; } + if (req.method === 'POST' && url.pathname === '/api/storage/reclaim') { + const chunks: Buffer[] = []; + for await (const chunk of req) chunks.push(Buffer.from(chunk)); + const body = JSON.parse(Buffer.concat(chunks).toString('utf8')) as Record; + res.writeHead(200, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ + mode: body.mode ?? 'report', + candidate_count: 1, + candidates: [ + { + path: '/tmp/inspectors/runtime/stale-session', + tier: 'runtime', + kind: 'legacy_runtime', + reason: 'legacy inspectors runtime directory without an active container reference', + }, + ], + reclaimed: body.mode === 'apply' ? ['/tmp/inspectors/runtime/stale-session'] : [], + })); + return; + } + res.writeHead(404, { 'content-type': 'application/json' }); res.end(JSON.stringify({ error: 'not found', path: url.pathname })); }); @@ -152,3 +173,28 @@ test('qemu product session creation preserves desktop user metadata and live vie await stopServers(controlPlane, guest.guestServer); } }); + +test('control-plane proxies storage reclaim requests', async () => { + const guest = await startGuestServer(); + const controlPlane = await startControlPlaneServer(0, guest.baseUrl); + const baseUrl = `http://127.0.0.1:${(controlPlane.server.address() as { port: number }).port}`; + + try { + const response = await fetch(`${baseUrl}/api/storage/reclaim`, { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ mode: 'apply' }), + }); + assert.equal(response.status, 200); + const payload = await response.json() as { + mode: string; + candidate_count: number; + reclaimed: string[]; + }; + assert.equal(payload.mode, 'apply'); + assert.equal(payload.candidate_count, 1); + assert.deepEqual(payload.reclaimed, ['/tmp/inspectors/runtime/stale-session']); + } finally { + await stopServers(controlPlane, guest.guestServer); + } +}); diff --git a/apps/control-plane/src/server.ts b/apps/control-plane/src/server.ts index 47a8497..209fa81 100644 --- a/apps/control-plane/src/server.ts +++ b/apps/control-plane/src/server.ts @@ -659,6 +659,16 @@ export function createRequestHandler(state: ControlPlaneState) { return; } + if (req.method === 'POST' && url.pathname === '/api/storage/reclaim') { + const body = await readJson(req); + const upstream = await guestRequest(state, '/api/storage/reclaim', { + method: 'POST', + body: JSON.stringify(body), + }); + json(res, upstream.status, upstream.payload); + return; + } + if (req.method === 'GET' && url.pathname === '/api/sessions') { const upstream = await guestRequest(state, '/api/sessions'); if (upstream.status === 200 && Array.isArray(upstream.payload?.sessions)) { diff --git a/crates/guest-runtime/src/lib.rs b/crates/guest-runtime/src/lib.rs index 4d6a109..f28de07 100644 --- a/crates/guest-runtime/src/lib.rs +++ b/crates/guest-runtime/src/lib.rs @@ -30,6 +30,9 @@ const SCREENSHOT_POLL_INTERVAL_MS: u64 = 3_000; const QEMU_PRODUCT_DESKTOP_USER: &str = "ubuntu"; const QEMU_PRODUCT_DESKTOP_HOME: &str = "/home/ubuntu"; const QEMU_PRODUCT_RUNTIME_DIR: &str = "/run/user/1000"; +const STORAGE_MARKER_FILE: &str = ".inspectors-storage.json"; +const STORAGE_OWNER: &str = "inspectors"; +const STORAGE_LAYOUT_VERSION: u8 = 1; #[derive(Clone)] struct AppState { @@ -213,6 +216,44 @@ struct QemuContainerSpec<'a> { disable_kvm: bool, } +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +struct StorageOwnershipMarker { + version: u8, + owner: String, + tier: String, + kind: String, + created_at: chrono::DateTime, + session_id: Option, + provider: Option, + qemu_profile: Option, + container_name: Option, + process_id: Option, +} + +impl StorageOwnershipMarker { + fn runtime_session( + session_id: &str, + provider: &str, + qemu_profile: Option<&str>, + container_name: Option<&str>, + process_id: Option, + ) -> Self { + Self { + version: STORAGE_LAYOUT_VERSION, + owner: STORAGE_OWNER.to_string(), + tier: "runtime".to_string(), + kind: "session".to_string(), + created_at: chrono::Utc::now(), + session_id: Some(session_id.to_string()), + provider: Some(provider.to_string()), + qemu_profile: qemu_profile.map(str::to_string), + container_name: container_name.map(str::to_string), + process_id, + } + } + +} + #[derive(Clone, Debug)] pub struct RuntimeConfig { pub bind_host: String, @@ -307,10 +348,15 @@ pub async fn run(config: RuntimeConfig) { qemu_bridge_probe_interval, }; + if let Err(error) = ensure_storage_roots(&state.artifacts_root).await { + panic!("failed to prepare storage roots: {error}"); + } cleanup_orphaned_qemu_containers().await; + janitor_managed_storage(&state.artifacts_root).await; let app = Router::new() .route("/health", get(health)) + .route("/api/storage/reclaim", axum::routing::post(reclaim_storage)) .route("/api/sessions", get(list_sessions).post(create_session)) .route( "/api/sessions/{id}", @@ -338,28 +384,297 @@ pub async fn run(config: RuntimeConfig) { .expect("serve guest runtime"); } -async fn cleanup_orphaned_qemu_containers() { +fn artifacts_base_root(runtime_root: &Path) -> PathBuf { + runtime_root + .parent() + .filter(|parent| !parent.as_os_str().is_empty()) + .map(Path::to_path_buf) + .unwrap_or_else(|| PathBuf::from(".")) +} + +fn qemu_cache_root(runtime_root: &Path) -> PathBuf { + artifacts_base_root(runtime_root).join("cache").join("qemu") +} + +fn qemu_build_root(runtime_root: &Path) -> PathBuf { + qemu_cache_root(runtime_root).join("_build") +} + +fn exports_root(runtime_root: &Path) -> PathBuf { + artifacts_base_root(runtime_root).join("exports") +} + +fn storage_marker_path(path: &Path) -> PathBuf { + path.join(STORAGE_MARKER_FILE) +} + +async fn ensure_storage_roots(runtime_root: &Path) -> std::io::Result<()> { + tokio::fs::create_dir_all(runtime_root).await?; + tokio::fs::create_dir_all(qemu_cache_root(runtime_root)).await?; + tokio::fs::create_dir_all(qemu_build_root(runtime_root)).await?; + tokio::fs::create_dir_all(exports_root(runtime_root)).await?; + Ok(()) +} + +async fn write_storage_marker( + directory: &Path, + marker: &StorageOwnershipMarker, +) -> std::io::Result<()> { + let payload = serde_json::to_vec_pretty(marker) + .map_err(|error| std::io::Error::other(error.to_string()))?; + tokio::fs::write( + storage_marker_path(directory), + payload, + ) + .await +} + +fn read_storage_marker(directory: &Path) -> Option { + let marker_path = storage_marker_path(directory); + let bytes = std::fs::read(marker_path).ok()?; + serde_json::from_slice(&bytes).ok() +} + +fn marker_matches_inspectors_layout(marker: &StorageOwnershipMarker) -> bool { + marker.owner == STORAGE_OWNER && marker.version == STORAGE_LAYOUT_VERSION +} + +fn process_is_live(process_id: u32) -> bool { + Path::new("/proc").join(process_id.to_string()).exists() +} + +async fn docker_container_exists(container_name: &str) -> bool { if !LinuxBackend::tool_exists("docker") { + return false; + } + Command::new("docker") + .args(["inspect", container_name]) + .output() + .await + .map(|output| output.status.success()) + .unwrap_or(false) +} + +async fn can_remove_marker_owned_directory(directory: &Path, marker: &StorageOwnershipMarker) -> bool { + if !marker_matches_inspectors_layout(marker) { + return false; + } + if marker.tier == "cache" || marker.tier == "exports" { + return false; + } + if let Some(container_name) = marker.container_name.as_deref() + && docker_container_exists(container_name).await + { + return false; + } + if let Some(process_id) = marker.process_id + && process_is_live(process_id) + { + return false; + } + directory.exists() +} + +async fn janitor_runtime_directories(runtime_root: &Path) { + let Ok(entries) = std::fs::read_dir(runtime_root) else { return; + }; + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_dir() { + continue; + } + let Some(marker) = read_storage_marker(&path) else { + continue; + }; + if can_remove_marker_owned_directory(&path, &marker).await { + let _ = tokio::fs::remove_dir_all(&path).await; + } } +} - let output = Command::new("docker") - .args(["ps", "--format", "{{.Names}}", "--filter", "name=acu-qemu-"]) - .output() - .await; - let Ok(output) = output else { +async fn janitor_qemu_build_directories(runtime_root: &Path) { + let build_root = qemu_build_root(runtime_root); + let Ok(entries) = std::fs::read_dir(&build_root) else { return; }; - if !output.status.success() { + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_dir() { + continue; + } + let Some(marker) = read_storage_marker(&path) else { + continue; + }; + if can_remove_marker_owned_directory(&path, &marker).await { + let _ = tokio::fs::remove_dir_all(&path).await; + } + } +} + +async fn janitor_managed_storage(runtime_root: &Path) { + janitor_runtime_directories(runtime_root).await; + janitor_qemu_build_directories(runtime_root).await; +} + +fn looks_like_legacy_runtime_directory(path: &Path) -> bool { + let Some(name) = path.file_name().and_then(|value| value.to_str()) else { + return false; + }; + if Uuid::parse_str(name).is_ok() { + return true; + } + path.join("seed").exists() + || path.join("data.img").exists() + || path.join("product-boot.qcow2").exists() + || path.join("regression-boot.qcow2").exists() +} + +fn looks_like_legacy_build_directory(path: &Path) -> bool { + path.join("boot.qcow2").exists() || path.join("seed.iso").exists() +} + +#[derive(serde::Serialize)] +struct ReclaimCandidate { + path: String, + tier: String, + kind: String, + reason: String, +} + +async fn collect_runtime_reclaim_candidates(runtime_root: &Path) -> Vec { + let Ok(entries) = std::fs::read_dir(runtime_root) else { + return Vec::new(); + }; + let mut candidates = Vec::new(); + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_dir() { + continue; + } + if let Some(marker) = read_storage_marker(&path) { + if can_remove_marker_owned_directory(&path, &marker).await { + candidates.push(ReclaimCandidate { + path: path.to_string_lossy().to_string(), + tier: marker.tier, + kind: marker.kind, + reason: "marker-owned stale runtime state".to_string(), + }); + } + continue; + } + if !looks_like_legacy_runtime_directory(&path) { + continue; + } + let name = path.file_name().and_then(|value| value.to_str()).unwrap_or_default(); + let derived_container_name = format!("acu-qemu-{}", name.chars().take(12).collect::()); + if docker_container_exists(&derived_container_name).await { + continue; + } + candidates.push(ReclaimCandidate { + path: path.to_string_lossy().to_string(), + tier: "runtime".to_string(), + kind: "legacy_runtime".to_string(), + reason: "legacy inspectors runtime directory without an active container reference".to_string(), + }); + } + candidates +} + +async fn collect_build_reclaim_candidates(runtime_root: &Path) -> Vec { + let build_root = qemu_build_root(runtime_root); + let Ok(entries) = std::fs::read_dir(&build_root) else { + return Vec::new(); + }; + let mut candidates = Vec::new(); + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_dir() { + continue; + } + if let Some(marker) = read_storage_marker(&path) { + if can_remove_marker_owned_directory(&path, &marker).await { + candidates.push(ReclaimCandidate { + path: path.to_string_lossy().to_string(), + tier: marker.tier, + kind: marker.kind, + reason: "marker-owned stale qemu prep workdir".to_string(), + }); + } + continue; + } + if looks_like_legacy_build_directory(&path) { + candidates.push(ReclaimCandidate { + path: path.to_string_lossy().to_string(), + tier: "runtime".to_string(), + kind: "legacy_prepare_build".to_string(), + reason: "legacy qemu prep workdir without ownership marker".to_string(), + }); + } + } + candidates +} + +#[derive(serde::Deserialize)] +struct ReclaimStorageRequest { + mode: Option, +} + +async fn reclaim_storage( + State(state): State, + Json(request): Json, +) -> Response { + let apply = request.mode.as_deref() == Some("apply"); + let mut candidates = collect_runtime_reclaim_candidates(&state.artifacts_root).await; + candidates.extend(collect_build_reclaim_candidates(&state.artifacts_root).await); + + let mut reclaimed = Vec::new(); + if apply { + for candidate in &candidates { + remove_runtime_directory(Path::new(&candidate.path)).await; + reclaimed.push(candidate.path.clone()); + } + } + + ( + StatusCode::OK, + Json(json!({ + "mode": if apply { "apply" } else { "report" }, + "runtime_root": state.artifacts_root, + "cache_root": qemu_cache_root(&state.artifacts_root), + "exports_root": exports_root(&state.artifacts_root), + "candidate_count": candidates.len(), + "candidates": candidates, + "reclaimed": reclaimed, + })), + ) + .into_response() +} + +async fn cleanup_orphaned_qemu_containers() { + if !LinuxBackend::tool_exists("docker") { return; } - let names = String::from_utf8_lossy(&output.stdout); - for container_name in names.lines().map(str::trim).filter(|line| !line.is_empty()) { - let _ = Command::new("docker") - .args(["rm", "-f", container_name]) + for prefix in ["acu-qemu-", "acu-image-prep-"] { + let output = Command::new("docker") + .args(["ps", "-a", "--format", "{{.Names}}", "--filter", &format!("name={prefix}")]) .output() .await; + let Ok(output) = output else { + continue; + }; + if !output.status.success() { + continue; + } + + let names = String::from_utf8_lossy(&output.stdout); + for container_name in names.lines().map(str::trim).filter(|line| !line.is_empty()) { + let _ = Command::new("docker") + .args(["rm", "-f", "-v", container_name]) + .output() + .await; + } } } @@ -488,6 +803,54 @@ async fn create_session_impl( } } +async fn create_session_artifacts_dir( + state: &AppState, + session_id: &str, +) -> Result { + let artifacts_dir = state.artifacts_root.join(session_id); + tokio::fs::create_dir_all(&artifacts_dir) + .await + .map_err(|error| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + json!({ "code": "artifacts_dir_failed", "message": error.to_string() }), + ) + })?; + std::fs::canonicalize(&artifacts_dir).map_err(|error| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + json!({ "code": "artifacts_dir_canonicalize_failed", "message": error.to_string() }), + ) + }) +} + +async fn mark_runtime_session_directory( + directory: &Path, + session_id: &str, + provider: &str, + qemu_profile: Option<&str>, + container_name: Option<&str>, + process_id: Option, +) -> Result<(), (StatusCode, Value)> { + let marker = StorageOwnershipMarker::runtime_session( + session_id, + provider, + qemu_profile, + container_name, + process_id, + ); + write_storage_marker(directory, &marker).await.map_err(|error| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + json!({ "code": "artifacts_marker_failed", "message": error.to_string() }), + ) + }) +} + +async fn remove_runtime_directory(path: &Path) { + let _ = tokio::fs::remove_dir_all(path).await; +} + async fn create_xvfb_session( state: &AppState, request: CreateSessionRequest, @@ -500,15 +863,7 @@ async fn create_xvfb_session( } let session_id = Uuid::new_v4().to_string(); - let artifacts_dir = state.artifacts_root.join(&session_id); - tokio::fs::create_dir_all(&artifacts_dir) - .await - .map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "artifacts_dir_failed", "message": error.to_string() }), - ) - })?; + let artifacts_dir = create_session_artifacts_dir(state, &session_id).await?; let screen_geometry = format!("{}x{}x24", request.width, request.height); let mut selected = None; @@ -541,17 +896,30 @@ async fn create_xvfb_session( break; } - let (display, child) = selected.ok_or_else(|| { + let (display, child) = if let Some(selected) = selected { + selected + } else { + remove_runtime_directory(&artifacts_dir).await; let message = if let Some((display, status)) = last_status { format!("Xvfb exited early on {display}: {status}") } else { "Xvfb could not find an available display".to_string() }; - ( + return Err(( StatusCode::FAILED_DEPENDENCY, json!({ "code": "xvfb_early_exit", "message": message }), - ) - })?; + )); + }; + let process_id = child.id(); + if let Err(error) = + mark_runtime_session_directory(&artifacts_dir, &session_id, "xvfb", None, None, process_id) + .await + { + let mut child = child; + let _ = child.kill().await; + remove_runtime_directory(&artifacts_dir).await; + return Err(error); + } let backend = LinuxBackend::new(BackendOptions { display: display.clone(), @@ -620,15 +988,9 @@ async fn create_existing_display_session( .desktop_runtime_dir .clone() .or_else(|| std::env::var("XDG_RUNTIME_DIR").ok()); - let artifacts_dir = state.artifacts_root.join(&session_id); - tokio::fs::create_dir_all(&artifacts_dir) - .await - .map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "artifacts_dir_failed", "message": error.to_string() }), - ) - })?; + let artifacts_dir = create_session_artifacts_dir(state, &session_id).await?; + mark_runtime_session_directory(&artifacts_dir, &session_id, "display", None, None, None) + .await?; let backend = LinuxBackend::new(BackendOptions { display: display.clone(), @@ -700,23 +1062,19 @@ async fn create_qemu_session( json!({ "code": "guest_runtime_binary_unavailable", "message": error.to_string() }), ) })?; - let artifacts_dir = state.artifacts_root.join(&session_id); - tokio::fs::create_dir_all(&artifacts_dir) - .await - .map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "artifacts_dir_failed", "message": error.to_string() }), - ) - })?; - let absolute_artifacts_dir = std::fs::canonicalize(&artifacts_dir).map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "artifacts_dir_canonicalize_failed", "message": error.to_string() }), - ) - })?; + let artifacts_dir = create_session_artifacts_dir(state, &session_id).await?; + let absolute_artifacts_dir = artifacts_dir.clone(); let container_name = format!("acu-qemu-{}", &session_id[..12]); + mark_runtime_session_directory( + &absolute_artifacts_dir, + &session_id, + "qemu", + Some(qemu_profile.as_str()), + Some(&container_name), + None, + ) + .await?; let image = request .container_image .clone() @@ -789,6 +1147,7 @@ async fn create_qemu_session( let launch_mode = match launch_qemu_container(&container_spec).await { Ok(mode) => mode, Err(message) => { + remove_runtime_directory(&absolute_artifacts_dir).await; return Err(( StatusCode::FAILED_DEPENDENCY, json!({ @@ -805,7 +1164,8 @@ async fn create_qemu_session( let logs = docker_output(&["logs", &container_name]) .await .unwrap_or_default(); - let _ = docker_output(&["rm", "-f", &container_name]).await; + let _ = docker_output(&["rm", "-f", "-v", &container_name]).await; + remove_runtime_directory(&absolute_artifacts_dir).await; return Err(( StatusCode::FAILED_DEPENDENCY, json!({ @@ -818,16 +1178,21 @@ async fn create_qemu_session( let container_ip = docker_container_ip(&container_name).await?; let viewer_port = docker_mapped_port(&container_name, state.qemu_viewer_port).await?; let runtime_port = docker_mapped_port(&container_name, state.qemu_guest_runtime_port).await?; - let viewer_url = resolve_qemu_endpoint(viewer_port, &container_ip, state.qemu_viewer_port) - .ok_or_else(|| { - ( - StatusCode::FAILED_DEPENDENCY, - json!({ - "code": "qemu_container_ip_missing", - "message": "qemu container started but did not expose a viewer port or bridge-network IP", - }), - ) - })?; + let viewer_url = if let Some(viewer_url) = + resolve_qemu_endpoint(viewer_port, &container_ip, state.qemu_viewer_port) + { + viewer_url + } else { + let _ = docker_output(&["rm", "-f", "-v", &container_name]).await; + remove_runtime_directory(&absolute_artifacts_dir).await; + return Err(( + StatusCode::FAILED_DEPENDENCY, + json!({ + "code": "qemu_container_ip_missing", + "message": "qemu container started but did not expose a viewer port or bridge-network IP", + }), + )); + }; let remote_runtime_url = match launch_mode { QemuLaunchMode::PublishedPorts => { runtime_port.map(|port| format!("http://127.0.0.1:{port}")) @@ -1114,7 +1479,7 @@ async fn ensure_qemu_profile_image( state: &AppState, profile: QemuSessionProfile, ) -> Result { - let cache_root = state.artifacts_root.join("_qemu_images"); + let cache_root = qemu_cache_root(&state.artifacts_root); tokio::fs::create_dir_all(&cache_root) .await .map_err(|error| { @@ -1711,6 +2076,7 @@ async fn delete_session(State(state): State, AxumPath(id): AxumPath { + let artifacts_dir = PathBuf::from(&handle.record.artifacts_dir); if let Some(remote_bridge) = handle.remote_bridge.as_ref() { if let Some(remote_session_id) = remote_bridge.session_id.as_ref() { let _ = state @@ -1729,9 +2095,10 @@ async fn delete_session(State(state): State, AxumPath(id): AxumPath {} SessionProviderHandle::QemuDocker { container_name } => { - let _ = docker_output(&["rm", "-f", container_name]).await; + let _ = docker_output(&["rm", "-f", "-v", container_name]).await; } } + remove_runtime_directory(&artifacts_dir).await; (StatusCode::OK, Json(json!({ "ok": true }))).into_response() } None => ( diff --git a/crates/guest-runtime/src/main.rs b/crates/guest-runtime/src/main.rs index 5201c67..9548f46 100644 --- a/crates/guest-runtime/src/main.rs +++ b/crates/guest-runtime/src/main.rs @@ -1,2144 +1,6 @@ -#![allow(clippy::result_large_err)] -use std::collections::{BTreeSet, HashMap}; -use std::net::SocketAddr; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::time::Duration; - -use axum::{ - Json, Router, - extract::{Path as AxumPath, State}, - http::{StatusCode, header}, - response::{IntoResponse, Response}, - routing::get, -}; -use desktop_core::{ - ActionReceipt, ActionRequest, ArtifactRef, CreateSessionRequest, LiveDesktopView, Observation, - RuntimeCapabilities, SessionRecord, StructuredError, capability_descriptor, -}; -use linux_backend::{BackendOptions, LinuxBackend}; -use reqwest::Client; -use serde::de::DeserializeOwned; -use serde_json::{Value, json}; -use tokio::process::{Child, Command}; -use tokio::sync::Mutex; -use tower_http::cors::CorsLayer; -use tower_http::trace::TraceLayer; -use uuid::Uuid; - -const SCREENSHOT_POLL_INTERVAL_MS: u64 = 3_000; -const QEMU_PRODUCT_DESKTOP_USER: &str = "ubuntu"; -const QEMU_PRODUCT_DESKTOP_HOME: &str = "/home/ubuntu"; -const QEMU_PRODUCT_RUNTIME_DIR: &str = "/run/user/1000"; - -#[derive(Clone)] -struct AppState { - sessions: Arc>>, - artifacts_root: PathBuf, - browser_command: String, - runtime_base_url: String, - http_client: Client, - qemu_viewer_port: u16, - qemu_guest_runtime_port: u16, - qemu_guest_display: String, - qemu_bridge_probe_timeout: Duration, - qemu_bridge_probe_interval: Duration, -} - -struct SessionHandle { - record: SessionRecord, - backend: Option, - provider_handle: SessionProviderHandle, - remote_bridge: Option, -} - -enum SessionProviderHandle { - Xvfb { child: Child }, - ExistingDisplay, - QemuDocker { container_name: String }, -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum QemuSessionProfile { - Product, - Regression, -} - -impl QemuSessionProfile { - fn from_request(request: &CreateSessionRequest) -> Self { - match request.qemu_profile.as_deref() { - Some("regression") => Self::Regression, - _ => Self::Product, - } - } - - fn as_str(self) -> &'static str { - match self { - Self::Product => "product", - Self::Regression => "regression", - } - } -} - -#[derive(Clone)] -struct RemoteBridgeHandle { - base_url: String, - session_id: Option, -} - -fn derive_live_desktop_view(record: &SessionRecord) -> LiveDesktopView { - match record.provider.as_str() { - "qemu" => { - let debug_url = record.viewer_url.clone(); - match record.qemu_profile.as_deref() { - Some("regression") => LiveDesktopView { - mode: "screenshot_poll".to_string(), - status: if record.bridge_status.as_deref() == Some("runtime_ready") { - "ready".to_string() - } else { - "unavailable".to_string() - }, - provider_surface: "guest_xvfb_screenshot".to_string(), - matches_action_plane: true, - canonical_url: None, - debug_url, - reason: Some( - "qemu regression keeps the VM viewer as debug-only because the action plane runs inside guest xvfb" - .to_string(), - ), - refresh_interval_ms: Some(SCREENSHOT_POLL_INTERVAL_MS), - }, - _ => LiveDesktopView { - mode: "stream".to_string(), - status: if record.viewer_url.is_some() { - "ready".to_string() - } else { - "unavailable".to_string() - }, - provider_surface: "qemu_novnc".to_string(), - matches_action_plane: true, - canonical_url: None, - debug_url, - reason: None, - refresh_interval_ms: None, - }, - } - } - "xvfb" => LiveDesktopView { - mode: "screenshot_poll".to_string(), - status: "ready".to_string(), - provider_surface: "guest_xvfb_screenshot".to_string(), - matches_action_plane: true, - canonical_url: None, - debug_url: None, - reason: Some( - "xvfb is an honest local/dev screenshot fallback without a live desktop stream" - .to_string(), - ), - refresh_interval_ms: Some(SCREENSHOT_POLL_INTERVAL_MS), - }, - "display" => LiveDesktopView { - mode: "screenshot_poll".to_string(), - status: "ready".to_string(), - provider_surface: "display_screenshot".to_string(), - matches_action_plane: true, - canonical_url: None, - debug_url: None, - reason: Some("display sessions expose screenshot polling only".to_string()), - refresh_interval_ms: Some(SCREENSHOT_POLL_INTERVAL_MS), - }, - _ => LiveDesktopView { - mode: "unavailable".to_string(), - status: "unavailable".to_string(), - provider_surface: "none".to_string(), - matches_action_plane: false, - canonical_url: None, - debug_url: record.viewer_url.clone(), - reason: Some("live desktop view is unavailable for this provider".to_string()), - refresh_interval_ms: None, - }, - } -} - -fn enrich_session_record(record: &SessionRecord) -> SessionRecord { - let mut enriched = record.clone(); - enriched.live_desktop_view = Some(derive_live_desktop_view(record)); - enriched -} - -#[derive(serde::Deserialize)] -struct BridgeSessionResponse { - session: SessionRecord, -} - -#[derive(serde::Deserialize)] -struct EnsuredQemuImage { - image_path: String, -} - -struct QemuBridgeMonitor { - sessions: Arc>>, - http_client: Client, - host_runtime_base_url: String, - guest_display: String, - browser_command: String, - qemu_profile: QemuSessionProfile, - session_id: String, - width: u32, - height: u32, - artifacts_dir: PathBuf, - remote_runtime_url: String, - viewer_url: String, - timeout: Duration, - interval: Duration, -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum QemuLaunchMode { - PublishedPorts, - BridgeNetwork, -} - -struct QemuContainerSpec<'a> { - container_name: &'a str, - image: &'a str, - boot: &'a str, - artifacts_dir: &'a Path, - guest_runtime_binary_path: &'a Path, - boot_image_path: Option<&'a Path>, - seed_iso_path: Option<&'a Path>, - shared_host_path: Option<&'a Path>, - viewer_port: u16, - runtime_port: u16, - disable_kvm: bool, -} +use guest_runtime::RuntimeConfig; #[tokio::main] async fn main() { - let bind_host = arg_value("--host") - .or_else(|| std::env::var("ACU_BIND_HOST").ok()) - .unwrap_or_else(|| "127.0.0.1".to_string()); - let port = arg_value("--port") - .and_then(|value| value.parse().ok()) - .unwrap_or(4001); - let artifacts_root = PathBuf::from( - arg_value("--artifacts-dir").unwrap_or_else(|| "artifacts/runtime".to_string()), - ); - let browser_command = arg_value("--browser-command").unwrap_or_else(|| "firefox".to_string()); - let runtime_base_url = format!("http://127.0.0.1:{port}"); - let qemu_viewer_port = std::env::var("ACU_QEMU_VIEWER_PORT") - .ok() - .and_then(|value| value.parse().ok()) - .unwrap_or(8006); - let qemu_guest_runtime_port = std::env::var("ACU_QEMU_GUEST_RUNTIME_PORT") - .ok() - .and_then(|value| value.parse().ok()) - .unwrap_or(4001); - let qemu_guest_display = - std::env::var("ACU_QEMU_GUEST_DISPLAY").unwrap_or_else(|_| ":0".to_string()); - let qemu_bridge_probe_timeout = Duration::from_millis( - std::env::var("ACU_QEMU_BRIDGE_TIMEOUT_MS") - .ok() - .and_then(|value| value.parse().ok()) - .unwrap_or(45_000), - ); - let qemu_bridge_probe_interval = Duration::from_millis( - std::env::var("ACU_QEMU_BRIDGE_INTERVAL_MS") - .ok() - .and_then(|value| value.parse().ok()) - .unwrap_or(1_000), - ); - - let state = AppState { - sessions: Arc::new(Mutex::new(HashMap::new())), - artifacts_root, - browser_command, - runtime_base_url, - http_client: Client::new(), - qemu_viewer_port, - qemu_guest_runtime_port, - qemu_guest_display, - qemu_bridge_probe_timeout, - qemu_bridge_probe_interval, - }; - - cleanup_orphaned_qemu_containers().await; - - let app = Router::new() - .route("/health", get(health)) - .route("/api/sessions", get(list_sessions).post(create_session)) - .route( - "/api/sessions/{id}", - get(get_session).delete(delete_session), - ) - .route("/api/sessions/{id}/observation", get(get_observation)) - .route( - "/api/sessions/{id}/actions", - get(get_available_actions).post(perform_action), - ) - .route("/api/sessions/{id}/screenshot", get(get_screenshot)) - .with_state(state) - .layer(CorsLayer::permissive()) - .layer(TraceLayer::new_for_http()); - - let addr: SocketAddr = format!("{bind_host}:{port}") - .parse() - .expect("parse guest runtime bind address"); - let listener = tokio::net::TcpListener::bind(addr) - .await - .expect("bind guest runtime"); - println!("guest-runtime listening on http://{}", addr); - axum::serve(listener, app) - .await - .expect("serve guest runtime"); -} - -async fn cleanup_orphaned_qemu_containers() { - if !LinuxBackend::tool_exists("docker") { - return; - } - - let output = Command::new("docker") - .args(["ps", "--format", "{{.Names}}", "--filter", "name=acu-qemu-"]) - .output() - .await; - let Ok(output) = output else { - return; - }; - if !output.status.success() { - return; - } - - let names = String::from_utf8_lossy(&output.stdout); - for container_name in names.lines().map(str::trim).filter(|line| !line.is_empty()) { - let _ = Command::new("docker") - .args(["rm", "-f", container_name]) - .output() - .await; - } -} - -fn display_session_env( - display: &str, - desktop_home: Option<&str>, - desktop_runtime_dir: Option<&str>, -) -> Vec<(String, String)> { - if display != ":0" { - return vec![]; - } - - let mut env = Vec::new(); - let runtime_dir = desktop_runtime_dir - .map(PathBuf::from) - .or_else(|| std::env::var("XDG_RUNTIME_DIR").ok().map(PathBuf::from)) - .or_else(|| { - Path::new(QEMU_PRODUCT_RUNTIME_DIR) - .exists() - .then(|| PathBuf::from(QEMU_PRODUCT_RUNTIME_DIR)) - }); - if let Some(runtime_dir) = runtime_dir.as_ref() - && runtime_dir.exists() - { - env.push(( - "XDG_RUNTIME_DIR".to_string(), - runtime_dir.to_string_lossy().to_string(), - )); - let session_bus = runtime_dir.join("bus"); - if session_bus.exists() { - env.push(( - "DBUS_SESSION_BUS_ADDRESS".to_string(), - format!("unix:path={}", session_bus.to_string_lossy()), - )); - } - } - - if let Ok(xauthority) = std::env::var("XAUTHORITY") - && Path::new(&xauthority).exists() - { - env.push(("XAUTHORITY".to_string(), xauthority)); - return env; - } - - let mut candidates = Vec::new(); - if let Some(runtime_dir) = runtime_dir.as_ref() { - candidates.push(runtime_dir.join("gdm/Xauthority")); - candidates.push(runtime_dir.join("Xauthority")); - } - if let Some(desktop_home) = desktop_home { - candidates.push(Path::new(desktop_home).join(".Xauthority")); - } - - for candidate in candidates { - if candidate.exists() { - env.push(( - "XAUTHORITY".to_string(), - candidate.to_string_lossy().to_string(), - )); - break; - } - } - - env -} - -fn arg_value(flag: &str) -> Option { - let mut iter = std::env::args().skip(1); - while let Some(candidate) = iter.next() { - if candidate == flag { - return iter.next(); - } - } - None -} - -async fn health(State(state): State) -> impl IntoResponse { - let sessions = state.sessions.lock().await; - Json(json!({ - "status": "ok", - "session_count": sessions.len(), - "runtime_base_url": state.runtime_base_url, - })) -} - -async fn create_session( - State(state): State, - Json(request): Json, -) -> Response { - match create_session_impl(&state, request).await { - Ok(session) => ( - StatusCode::CREATED, - Json(json!({ "session": enrich_session_record(&session) })), - ) - .into_response(), - Err((status, error)) => (status, Json(json!({ "error": error }))).into_response(), - } -} - -async fn list_sessions(State(state): State) -> Response { - let sessions = state - .sessions - .lock() - .await - .values() - .map(|handle| enrich_session_record(&handle.record)) - .collect::>(); - (StatusCode::OK, Json(json!({ "sessions": sessions }))).into_response() -} - -async fn create_session_impl( - state: &AppState, - request: CreateSessionRequest, -) -> Result { - match request.provider.as_str() { - "xvfb" => create_xvfb_session(state, request).await, - "display" => create_existing_display_session(state, request).await, - "qemu" => create_qemu_session(state, request).await, - other => Err(( - StatusCode::BAD_REQUEST, - json!({ - "code": "unsupported_provider", - "message": format!("Unsupported provider `{other}`"), - }), - )), - } -} - -async fn create_xvfb_session( - state: &AppState, - request: CreateSessionRequest, -) -> Result { - if !LinuxBackend::tool_exists("Xvfb") { - return Err(( - StatusCode::FAILED_DEPENDENCY, - json!({ "code": "missing_tool", "message": "Xvfb is required for the local sandbox provider" }), - )); - } - - let session_id = Uuid::new_v4().to_string(); - let artifacts_dir = state.artifacts_root.join(&session_id); - tokio::fs::create_dir_all(&artifacts_dir) - .await - .map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "artifacts_dir_failed", "message": error.to_string() }), - ) - })?; - - let screen_geometry = format!("{}x{}x24", request.width, request.height); - let mut selected = None; - let mut last_status = None; - for display in candidate_displays(state).await { - let mut child = Command::new("Xvfb") - .arg(&display) - .args(["-screen", "0", &screen_geometry, "-nolisten", "tcp", "-ac"]) - .kill_on_drop(true) - .spawn() - .map_err(|error| { - ( - StatusCode::FAILED_DEPENDENCY, - json!({ "code": "xvfb_spawn_failed", "message": error.to_string() }), - ) - })?; - - tokio::time::sleep(Duration::from_millis(350)).await; - if let Some(status) = child.try_wait().map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "xvfb_status_failed", "message": error.to_string() }), - ) - })? { - last_status = Some((display, status.to_string())); - continue; - } - - selected = Some((display, child)); - break; - } - - let (display, child) = selected.ok_or_else(|| { - let message = if let Some((display, status)) = last_status { - format!("Xvfb exited early on {display}: {status}") - } else { - "Xvfb could not find an available display".to_string() - }; - ( - StatusCode::FAILED_DEPENDENCY, - json!({ "code": "xvfb_early_exit", "message": message }), - ) - })?; - - let backend = LinuxBackend::new(BackendOptions { - display: display.clone(), - artifacts_dir: artifacts_dir.clone(), - browser_command: state.browser_command.clone(), - session_env: vec![], - default_user: None, - default_user_home: None, - }); - let record = SessionRecord { - id: session_id.clone(), - provider: "xvfb".to_string(), - qemu_profile: None, - display: Some(display), - width: request.width, - height: request.height, - state: "running".to_string(), - created_at: chrono::Utc::now(), - artifacts_dir: artifacts_dir.to_string_lossy().to_string(), - capabilities: backend.capabilities(), - browser_command: Some(state.browser_command.clone()), - desktop_user: None, - desktop_home: None, - desktop_runtime_dir: None, - runtime_base_url: Some(state.runtime_base_url.clone()), - viewer_url: None, - live_desktop_view: None, - bridge_status: Some("runtime_ready".to_string()), - readiness_state: Some("runtime_ready".to_string()), - bridge_error: None, - }; - - state.sessions.lock().await.insert( - session_id, - SessionHandle { - record: record.clone(), - backend: Some(backend), - provider_handle: SessionProviderHandle::Xvfb { child }, - remote_bridge: None, - }, - ); - - Ok(record) -} - -async fn create_existing_display_session( - state: &AppState, - request: CreateSessionRequest, -) -> Result { - let session_id = Uuid::new_v4().to_string(); - let display = request - .display - .clone() - .or_else(|| std::env::var("DISPLAY").ok()) - .unwrap_or_else(|| ":0".to_string()); - let browser_command = request - .browser_command - .clone() - .unwrap_or_else(|| state.browser_command.clone()); - let default_user = request.desktop_user.clone(); - let desktop_home = request - .desktop_home - .clone() - .or_else(|| std::env::var("HOME").ok()); - let desktop_runtime_dir = request - .desktop_runtime_dir - .clone() - .or_else(|| std::env::var("XDG_RUNTIME_DIR").ok()); - let artifacts_dir = state.artifacts_root.join(&session_id); - tokio::fs::create_dir_all(&artifacts_dir) - .await - .map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "artifacts_dir_failed", "message": error.to_string() }), - ) - })?; - - let backend = LinuxBackend::new(BackendOptions { - display: display.clone(), - artifacts_dir: artifacts_dir.clone(), - browser_command: browser_command.clone(), - session_env: display_session_env( - &display, - desktop_home.as_deref(), - desktop_runtime_dir.as_deref(), - ), - default_user: default_user.clone(), - default_user_home: desktop_home.clone(), - }); - let record = SessionRecord { - id: session_id.clone(), - provider: "display".to_string(), - qemu_profile: None, - display: Some(display), - width: request.width, - height: request.height, - state: "running".to_string(), - created_at: chrono::Utc::now(), - artifacts_dir: artifacts_dir.to_string_lossy().to_string(), - capabilities: backend.capabilities(), - browser_command: Some(browser_command), - desktop_user: request.desktop_user.clone(), - desktop_home, - desktop_runtime_dir, - runtime_base_url: Some(state.runtime_base_url.clone()), - viewer_url: None, - live_desktop_view: None, - bridge_status: Some("runtime_ready".to_string()), - readiness_state: Some("runtime_ready".to_string()), - bridge_error: None, - }; - - state.sessions.lock().await.insert( - session_id, - SessionHandle { - record: record.clone(), - backend: Some(backend), - provider_handle: SessionProviderHandle::ExistingDisplay, - remote_bridge: None, - }, - ); - - Ok(record) -} - -async fn create_qemu_session( - state: &AppState, - request: CreateSessionRequest, -) -> Result { - if !LinuxBackend::tool_exists("docker") { - return Err(( - StatusCode::FAILED_DEPENDENCY, - json!({ - "code": "missing_tool", - "message": "Docker is required for the qemu container provider in this environment", - }), - )); - } - - let qemu_profile = QemuSessionProfile::from_request(&request); - let session_id = Uuid::new_v4().to_string(); - let guest_runtime_binary_path = std::env::current_exe().map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "guest_runtime_binary_unavailable", "message": error.to_string() }), - ) - })?; - let artifacts_dir = state.artifacts_root.join(&session_id); - tokio::fs::create_dir_all(&artifacts_dir) - .await - .map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "artifacts_dir_failed", "message": error.to_string() }), - ) - })?; - let absolute_artifacts_dir = std::fs::canonicalize(&artifacts_dir).map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "artifacts_dir_canonicalize_failed", "message": error.to_string() }), - ) - })?; - - let container_name = format!("acu-qemu-{}", &session_id[..12]); - let image = request - .container_image - .clone() - .or_else(|| std::env::var("ACU_QEMU_CONTAINER_IMAGE").ok()) - .unwrap_or_else(|| "qemux/qemu".to_string()); - let mut boot = request - .boot - .clone() - .or_else(|| std::env::var("ACU_QEMU_BOOT").ok()) - .unwrap_or_else(|| "alpine".to_string()); - let mut boot_image_path = None; - let mut seed_iso_path = None; - if request.boot.is_none() { - let template_image = ensure_qemu_profile_image(state, qemu_profile).await?; - let session_boot_image = - absolute_artifacts_dir.join(format!("{}-boot.qcow2", qemu_profile.as_str())); - tokio::fs::copy(&template_image, &session_boot_image) - .await - .map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "qemu_image_copy_failed", "message": error.to_string() }), - ) - })?; - boot = "/boot.qcow2".to_string(); - boot_image_path = Some(session_boot_image); - seed_iso_path = Some( - build_qemu_session_seed_iso( - &absolute_artifacts_dir, - &session_id, - qemu_profile, - &image, - state.qemu_guest_runtime_port, - &state.browser_command, - ) - .await?, - ); - } - let shared_host_path = if let Some(shared_host_path) = request.shared_host_path.as_deref() { - Some(std::fs::canonicalize(shared_host_path).map_err(|error| { - ( - StatusCode::BAD_REQUEST, - json!({ - "code": "shared_host_path_invalid", - "message": error.to_string(), - "path": shared_host_path, - }), - ) - })?) - } else { - None - }; - let disable_kvm = request - .disable_kvm - .unwrap_or_else(|| !Path::new("/dev/kvm").exists()); - - let container_spec = QemuContainerSpec { - container_name: &container_name, - image: &image, - boot: &boot, - artifacts_dir: &absolute_artifacts_dir, - guest_runtime_binary_path: &guest_runtime_binary_path, - boot_image_path: boot_image_path.as_deref(), - seed_iso_path: seed_iso_path.as_deref(), - shared_host_path: shared_host_path.as_deref(), - viewer_port: state.qemu_viewer_port, - runtime_port: state.qemu_guest_runtime_port, - disable_kvm, - }; - let launch_mode = match launch_qemu_container(&container_spec).await { - Ok(mode) => mode, - Err(message) => { - return Err(( - StatusCode::FAILED_DEPENDENCY, - json!({ - "code": "qemu_container_launch_failed", - "message": message, - }), - )); - } - }; - - tokio::time::sleep(Duration::from_secs(8)).await; - let running = docker_output(&["inspect", "-f", "{{.State.Running}}", &container_name]).await?; - if running.trim() != "true" { - let logs = docker_output(&["logs", &container_name]) - .await - .unwrap_or_default(); - let _ = docker_output(&["rm", "-f", &container_name]).await; - return Err(( - StatusCode::FAILED_DEPENDENCY, - json!({ - "code": "qemu_container_not_running", - "message": "qemu container exited before the viewer became available", - "logs": logs, - }), - )); - } - let container_ip = docker_container_ip(&container_name).await?; - let viewer_port = docker_mapped_port(&container_name, state.qemu_viewer_port).await?; - let runtime_port = docker_mapped_port(&container_name, state.qemu_guest_runtime_port).await?; - let viewer_url = resolve_qemu_endpoint(viewer_port, &container_ip, state.qemu_viewer_port) - .ok_or_else(|| { - ( - StatusCode::FAILED_DEPENDENCY, - json!({ - "code": "qemu_container_ip_missing", - "message": "qemu container started but did not expose a viewer port or bridge-network IP", - }), - ) - })?; - let remote_runtime_url = match launch_mode { - QemuLaunchMode::PublishedPorts => { - runtime_port.map(|port| format!("http://127.0.0.1:{port}")) - } - QemuLaunchMode::BridgeNetwork => { - resolve_qemu_endpoint(runtime_port, &container_ip, state.qemu_guest_runtime_port) - } - }; - let mut capabilities = vec![ - "vm".to_string(), - "viewer".to_string(), - "qemu_container".to_string(), - format!("qemu_profile:{}", qemu_profile.as_str()), - ]; - if launch_mode == QemuLaunchMode::BridgeNetwork { - capabilities.push("bridge_network_access".to_string()); - } - if remote_runtime_url.is_some() { - capabilities.push("guest_runtime_http".to_string()); - } - let bridge_status = if remote_runtime_url.is_some() { - "bridge_waiting".to_string() - } else { - "viewer_only".to_string() - }; - let desktop_user = (qemu_profile == QemuSessionProfile::Product) - .then(|| QEMU_PRODUCT_DESKTOP_USER.to_string()); - let desktop_home = desktop_user - .as_ref() - .map(|_| QEMU_PRODUCT_DESKTOP_HOME.to_string()); - let desktop_runtime_dir = desktop_user - .as_ref() - .map(|_| QEMU_PRODUCT_RUNTIME_DIR.to_string()); - let record = SessionRecord { - id: session_id.clone(), - provider: "qemu".to_string(), - qemu_profile: Some(qemu_profile.as_str().to_string()), - display: None, - width: request.width, - height: request.height, - state: "running".to_string(), - created_at: chrono::Utc::now(), - artifacts_dir: artifacts_dir.to_string_lossy().to_string(), - capabilities, - browser_command: Some(state.browser_command.clone()), - desktop_user, - desktop_home, - desktop_runtime_dir, - runtime_base_url: None, - viewer_url: Some(viewer_url.clone()), - live_desktop_view: None, - bridge_status: Some(bridge_status), - readiness_state: Some("booting".to_string()), - bridge_error: None, - }; - - state.sessions.lock().await.insert( - session_id.clone(), - SessionHandle { - record: record.clone(), - backend: None, - provider_handle: SessionProviderHandle::QemuDocker { container_name }, - remote_bridge: remote_runtime_url - .as_ref() - .map(|base_url| RemoteBridgeHandle { - base_url: base_url.clone(), - session_id: None, - }), - }, - ); - - if let Some(remote_runtime_url) = remote_runtime_url { - let bridge_timeout = match qemu_profile { - QemuSessionProfile::Product => { - std::cmp::max(state.qemu_bridge_probe_timeout, Duration::from_secs(180)) - } - QemuSessionProfile::Regression => { - std::cmp::max(state.qemu_bridge_probe_timeout, Duration::from_secs(90)) - } - }; - tokio::spawn(monitor_qemu_bridge(QemuBridgeMonitor { - sessions: state.sessions.clone(), - http_client: state.http_client.clone(), - host_runtime_base_url: state.runtime_base_url.clone(), - guest_display: state.qemu_guest_display.clone(), - browser_command: state.browser_command.clone(), - qemu_profile, - session_id, - width: request.width, - height: request.height, - artifacts_dir, - remote_runtime_url, - viewer_url, - timeout: bridge_timeout, - interval: state.qemu_bridge_probe_interval, - })); - } - - Ok(record) -} - -async fn monitor_qemu_bridge(monitor: QemuBridgeMonitor) { - let started_at = chrono::Utc::now(); - let deadline = tokio::time::Instant::now() + monitor.timeout; - let mut attempts = 0usize; - let mut last_error = String::new(); - - while tokio::time::Instant::now() < deadline { - attempts += 1; - if monitor.qemu_profile == QemuSessionProfile::Product - && monitor - .http_client - .get(&monitor.viewer_url) - .send() - .await - .map(|response| response.status().is_success()) - .unwrap_or(false) - { - let mut guard = monitor.sessions.lock().await; - if let Some(handle) = guard.get_mut(&monitor.session_id) { - promote_readiness(&mut handle.record, "desktop_ready"); - } - } - { - let mut guard = monitor.sessions.lock().await; - if let Some(handle) = guard.get_mut(&monitor.session_id) { - promote_readiness(&mut handle.record, "bridge_listening"); - } - } - let health = bridge_json::( - &monitor.http_client, - &monitor.remote_runtime_url, - "/health", - None, - ) - .await; - match health { - Ok(_) => { - { - let mut guard = monitor.sessions.lock().await; - if let Some(handle) = guard.get_mut(&monitor.session_id) { - promote_readiness(&mut handle.record, "bridge_attached"); - } - } - let create_request = CreateSessionRequest { - provider: match monitor.qemu_profile { - QemuSessionProfile::Product => "display".to_string(), - QemuSessionProfile::Regression => "xvfb".to_string(), - }, - width: monitor.width, - height: monitor.height, - display: (monitor.qemu_profile == QemuSessionProfile::Product) - .then(|| monitor.guest_display.clone()), - browser_command: Some(monitor.browser_command.clone()), - boot: None, - container_image: None, - disable_kvm: None, - qemu_profile: None, - shared_host_path: None, - desktop_user: (monitor.qemu_profile == QemuSessionProfile::Product) - .then(|| QEMU_PRODUCT_DESKTOP_USER.to_string()), - desktop_home: (monitor.qemu_profile == QemuSessionProfile::Product) - .then(|| QEMU_PRODUCT_DESKTOP_HOME.to_string()), - desktop_runtime_dir: (monitor.qemu_profile == QemuSessionProfile::Product) - .then(|| QEMU_PRODUCT_RUNTIME_DIR.to_string()), - }; - match bridge_json::( - &monitor.http_client, - &monitor.remote_runtime_url, - "/api/sessions", - Some(&create_request), - ) - .await - { - Ok(response) => { - let remote_session_id = response.session.id.clone(); - let remote_capabilities = response.session.capabilities.clone(); - let mut guard = monitor.sessions.lock().await; - if let Some(handle) = guard.get_mut(&monitor.session_id) { - if let Some(remote_bridge) = handle.remote_bridge.as_mut() { - remote_bridge.session_id = Some(remote_session_id); - } - handle.record.runtime_base_url = - Some(monitor.host_runtime_base_url.clone()); - handle.record.bridge_status = Some("runtime_ready".to_string()); - promote_readiness(&mut handle.record, "runtime_ready"); - handle.record.bridge_error = None; - handle.record.capabilities = merge_capabilities( - &handle.record.capabilities, - &remote_capabilities, - ); - } - return; - } - Err(error) => { - last_error = error; - } - } - } - Err(error) => { - last_error = error; - } - } - tokio::time::sleep(monitor.interval).await; - } - - let diagnostics_path = monitor.artifacts_dir.join("qemu-bridge-diagnostics.json"); - let artifact_path = diagnostics_path.to_string_lossy().to_string(); - let payload = json!({ - "session_id": monitor.session_id, - "bridge_status": "failed", - "remote_runtime_url": monitor.remote_runtime_url, - "attempts": attempts, - "started_at": started_at, - "finished_at": chrono::Utc::now(), - "last_error": last_error, - }); - let _ = tokio::fs::write( - &diagnostics_path, - serde_json::to_vec_pretty(&payload).unwrap_or_default(), - ) - .await; - let bridge_error = StructuredError { - code: "qemu_bridge_attach_failed".to_string(), - message: "QEMU guest runtime bridge did not become ready in time".to_string(), - retryable: false, - category: "provider".to_string(), - details: json!({ - "remote_runtime_url": monitor.remote_runtime_url, - "attempts": attempts, - "last_error": last_error, - }), - artifact_refs: vec![ArtifactRef { - kind: "qemu_bridge_diagnostics".to_string(), - path: artifact_path, - mime_type: Some("application/json".to_string()), - }], - }; - - let mut failed_container = None; - let mut guard = monitor.sessions.lock().await; - if let Some(handle) = guard.get_mut(&monitor.session_id) { - handle.record.bridge_status = Some("failed".to_string()); - promote_readiness(&mut handle.record, "failed"); - handle.record.bridge_error = Some(bridge_error); - handle.record.viewer_url = None; - handle.record.runtime_base_url = None; - if let SessionProviderHandle::QemuDocker { container_name } = &handle.provider_handle { - failed_container = Some(container_name.clone()); - } - } - drop(guard); - if let Some(container_name) = failed_container { - let _ = docker_output(&["rm", "-f", &container_name]).await; - } -} - -async fn docker_output(args: &[&str]) -> Result { - let output = Command::new("docker") - .args(args) - .output() - .await - .map_err(|error| { - ( - StatusCode::FAILED_DEPENDENCY, - json!({ "code": "docker_command_failed", "message": error.to_string() }), - ) - })?; - if output.status.success() { - Ok(String::from_utf8_lossy(&output.stdout).into_owned()) - } else { - Err(( - StatusCode::FAILED_DEPENDENCY, - json!({ - "code": "docker_command_failed", - "args": args, - "stderr": String::from_utf8_lossy(&output.stderr), - }), - )) - } -} - -async fn ensure_qemu_profile_image( - state: &AppState, - profile: QemuSessionProfile, -) -> Result { - let cache_root = state.artifacts_root.join("_qemu_images"); - tokio::fs::create_dir_all(&cache_root) - .await - .map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "qemu_cache_dir_failed", "message": error.to_string() }), - ) - })?; - let script_path = std::env::var("ACU_QEMU_ASSET_SCRIPT") - .map(PathBuf::from) - .unwrap_or_else(|_| { - PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("..") - .join("..") - .join("scripts") - .join("qemu_guest_assets.py") - }); - let guest_runtime_binary = std::env::current_exe().map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "guest_runtime_exe_missing", "message": error.to_string() }), - ) - })?; - let qemu_container_image = - std::env::var("ACU_QEMU_CONTAINER_IMAGE").unwrap_or_else(|_| "qemux/qemu".to_string()); - let output = Command::new("python3") - .args([ - script_path.to_string_lossy().as_ref(), - "ensure-image", - "--profile", - profile.as_str(), - "--cache-root", - cache_root.to_string_lossy().as_ref(), - "--guest-runtime-binary", - guest_runtime_binary.to_string_lossy().as_ref(), - "--qemu-image", - &qemu_container_image, - "--browser-command", - &state.browser_command, - ]) - .output() - .await - .map_err(|error| { - ( - StatusCode::FAILED_DEPENDENCY, - json!({ "code": "qemu_asset_builder_failed", "message": error.to_string() }), - ) - })?; - if !output.status.success() { - return Err(( - StatusCode::FAILED_DEPENDENCY, - json!({ - "code": "qemu_asset_builder_failed", - "status": output.status.code(), - "stderr": String::from_utf8_lossy(&output.stderr), - }), - )); - } - let ensured: EnsuredQemuImage = serde_json::from_slice(&output.stdout).map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ - "code": "qemu_asset_builder_decode_failed", - "message": error.to_string(), - "stdout": String::from_utf8_lossy(&output.stdout), - }), - ) - })?; - Ok(PathBuf::from(ensured.image_path)) -} - -async fn build_qemu_session_seed_iso( - artifacts_dir: &Path, - session_id: &str, - profile: QemuSessionProfile, - qemu_image: &str, - runtime_port: u16, - browser_command: &str, -) -> Result { - let seed_dir = artifacts_dir.join("seed"); - tokio::fs::create_dir_all(&seed_dir) - .await - .map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "qemu_seed_dir_failed", "message": error.to_string() }), - ) - })?; - let user_data = match profile { - QemuSessionProfile::Regression => regression_seed_user_data(runtime_port, browser_command), - QemuSessionProfile::Product => product_seed_user_data(runtime_port, browser_command), - }; - let meta_data = format!( - "instance-id: acu-session-{session_id}\nlocal-hostname: acu-{}\n", - profile.as_str() - ); - tokio::fs::write(seed_dir.join("user-data"), user_data) - .await - .map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "qemu_seed_write_failed", "message": error.to_string() }), - ) - })?; - tokio::fs::write(seed_dir.join("meta-data"), meta_data) - .await - .map_err(|error| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - json!({ "code": "qemu_seed_write_failed", "message": error.to_string() }), - ) - })?; - let output = Command::new("docker") - .args([ - "run", - "--rm", - "-v", - &format!("{}:/work", seed_dir.to_string_lossy()), - "--entrypoint", - "sh", - qemu_image, - "-lc", - "cd /work && genisoimage -output /work/seed.iso -volid cidata -joliet -rock user-data meta-data >/dev/null 2>&1", - ]) - .output() - .await - .map_err(|error| { - ( - StatusCode::FAILED_DEPENDENCY, - json!({ "code": "qemu_seed_iso_failed", "message": error.to_string() }), - ) - })?; - if !output.status.success() { - return Err(( - StatusCode::FAILED_DEPENDENCY, - json!({ - "code": "qemu_seed_iso_failed", - "stderr": String::from_utf8_lossy(&output.stderr), - }), - )); - } - Ok(seed_dir.join("seed.iso")) -} - -fn regression_seed_user_data(runtime_port: u16, browser_command: &str) -> String { - format!( - r#"#cloud-config -packages: - - xdotool - - x11-utils - - x11-apps - - x11-xserver-utils - - xinit - - xvfb - - imagemagick - - curl -write_files: - - path: /etc/systemd/system/acu-guest-runtime.service - permissions: '0644' - content: | - [Unit] - Description=ACU Guest Runtime - After=network-online.target - Wants=network-online.target - - [Service] - ExecStart=/usr/local/bin/acu-guest-runtime --host 0.0.0.0 --port {runtime_port} --browser-command {browser_command} - Restart=always - RestartSec=2 - StandardOutput=journal+console - StandardError=journal+console - - [Install] - WantedBy=multi-user.target -runcmd: - - [ bash, -lc, 'systemctl disable --now ufw || true' ] - - [ bash, -lc, 'echo regression > /var/lib/acu-session-profile' ] - - [ bash, -lc, 'modprobe 9pnet_virtio || true; modprobe 9p || true; mkdir -p /mnt/shared; mount -t 9p -o trans=virtio shared /mnt/shared || true' ] - - [ bash, -lc, 'install -m 0755 /mnt/shared/guest-runtime /usr/local/bin/acu-guest-runtime' ] - - [ bash, -lc, 'systemctl daemon-reload && systemctl enable acu-guest-runtime.service && systemctl restart acu-guest-runtime.service' ] - "#, - ) -} - -fn product_seed_user_data(runtime_port: u16, browser_command: &str) -> String { - format!( - r#"#cloud-config -write_files: - - path: /etc/gdm3/custom.conf - permissions: '0644' - content: | - [daemon] - WaylandEnable=false - AutomaticLoginEnable=true - AutomaticLogin=ubuntu - - path: /etc/xdg/autostart/acu-guest-runtime.desktop - permissions: '0644' - content: | - [Desktop Entry] - Type=Application - Name=ACU Guest Runtime - Exec=/usr/local/bin/acu-guest-runtime --host 0.0.0.0 --port {runtime_port} --browser-command {browser_command} - X-GNOME-Autostart-enabled=true - Terminal=false -runcmd: - - [ bash, -lc, 'systemctl disable --now ufw || true' ] - - [ bash, -lc, 'echo product > /var/lib/acu-session-profile' ] - - [ bash, -lc, 'modprobe 9pnet_virtio || true; modprobe 9p || true; mkdir -p /mnt/shared; mount -t 9p -o trans=virtio shared /mnt/shared || true' ] - - [ bash, -lc, 'install -m 0755 /mnt/shared/guest-runtime /usr/local/bin/acu-guest-runtime' ] - - [ bash, -lc, 'ln -sf /usr/bin/epiphany-browser /usr/local/bin/firefox || true' ] - - [ bash, -lc, 'systemctl set-default graphical.target || true' ] - "#, - ) -} - -async fn launch_qemu_container(spec: &QemuContainerSpec<'_>) -> Result { - let primary = docker_run_qemu_container(spec, QemuLaunchMode::PublishedPorts).await?; - if primary.status.success() { - return Ok(QemuLaunchMode::PublishedPorts); - } - - let primary_stderr = String::from_utf8_lossy(&primary.stderr).into_owned(); - if !should_retry_qemu_without_published_ports(&primary_stderr) { - return Err(primary_stderr); - } - - let fallback = docker_run_qemu_container(spec, QemuLaunchMode::BridgeNetwork).await?; - if fallback.status.success() { - Ok(QemuLaunchMode::BridgeNetwork) - } else { - let fallback_stderr = String::from_utf8_lossy(&fallback.stderr); - Err(format!( - "{}\nbridge-network retry failed:\n{}", - primary_stderr, fallback_stderr - )) - } -} - -async fn docker_run_qemu_container( - spec: &QemuContainerSpec<'_>, - launch_mode: QemuLaunchMode, -) -> Result { - let args = docker_run_args(spec, launch_mode); - Command::new("docker") - .args(&args) - .output() - .await - .map_err(|error| error.to_string()) -} - -fn docker_run_args(spec: &QemuContainerSpec<'_>, launch_mode: QemuLaunchMode) -> Vec { - let boot_value = if spec.boot_image_path.is_some() { - "/boot.qcow2" - } else { - spec.boot - }; - let mut args = vec![ - "run".to_string(), - "-d".to_string(), - "--rm".to_string(), - "--name".to_string(), - spec.container_name.to_string(), - ]; - if launch_mode == QemuLaunchMode::PublishedPorts { - args.push("-p".to_string()); - args.push(format!("127.0.0.1::{}", spec.viewer_port)); - args.push("-p".to_string()); - args.push(format!("127.0.0.1::{}", spec.runtime_port)); - } - args.push("-e".to_string()); - args.push(format!("BOOT={boot_value}")); - args.push("-e".to_string()); - args.push(format!("USER_PORTS=22,{}", spec.runtime_port)); - args.push("-v".to_string()); - args.push(format!("{}:/storage", spec.artifacts_dir.to_string_lossy())); - if let Some(boot_image_path) = spec.boot_image_path { - args.push("-v".to_string()); - args.push(format!("{}:/boot.qcow2", boot_image_path.to_string_lossy())); - } - if let Some(seed_iso_path) = spec.seed_iso_path { - args.push("-v".to_string()); - args.push(format!("{}:/seed.iso:ro", seed_iso_path.to_string_lossy())); - args.push("-e".to_string()); - args.push("ARGUMENTS=-drive file=/seed.iso,format=raw,media=cdrom,readonly=on".to_string()); - } - if let Some(shared_host_path) = spec.shared_host_path { - args.push("-v".to_string()); - args.push(format!( - "{}:/shared/hostshare:ro", - shared_host_path.to_string_lossy() - )); - } - args.push("-v".to_string()); - args.push(format!( - "{}:/shared/guest-runtime:ro", - spec.guest_runtime_binary_path.to_string_lossy() - )); - if spec.disable_kvm { - args.push("-e".to_string()); - args.push("KVM=N".to_string()); - } else { - args.push("--device".to_string()); - args.push("/dev/kvm".to_string()); - } - if Path::new("/dev/net/tun").exists() { - args.push("--device".to_string()); - args.push("/dev/net/tun".to_string()); - } - args.push("--cap-add".to_string()); - args.push("NET_ADMIN".to_string()); - args.push(spec.image.to_string()); - args -} - -async fn docker_mapped_port( - container_name: &str, - container_port: u16, -) -> Result, (StatusCode, Value)> { - let output = Command::new("docker") - .args(["port", container_name, &format!("{container_port}/tcp")]) - .output() - .await - .map_err(|error| { - ( - StatusCode::FAILED_DEPENDENCY, - json!({ "code": "docker_command_failed", "message": error.to_string() }), - ) - })?; - if !output.status.success() { - return Ok(None); - } - Ok(parse_published_port(&String::from_utf8_lossy( - &output.stdout, - ))) -} - -async fn docker_container_ip(container_name: &str) -> Result { - Ok(docker_output(&[ - "inspect", - "-f", - "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}", - container_name, - ]) - .await? - .trim() - .to_string()) -} - -fn parse_published_port(output: &str) -> Option { - output - .lines() - .filter_map(|line| line.trim().rsplit(':').next()) - .find_map(|port| port.parse::().ok()) -} - -fn resolve_qemu_endpoint( - published_port: Option, - container_ip: &str, - container_port: u16, -) -> Option { - if let Some(port) = published_port { - return Some(format!("http://127.0.0.1:{port}")); - } - let trimmed_ip = container_ip.trim(); - if trimmed_ip.is_empty() { - None - } else { - Some(format!("http://{trimmed_ip}:{container_port}")) - } -} - -fn should_retry_qemu_without_published_ports(stderr: &str) -> bool { - let stderr = stderr.to_ascii_lowercase(); - stderr.contains("unable to enable dnat rule") - || stderr.contains("no chain/target/match by that name") - || stderr.contains("driver failed programming external connectivity") -} - -async fn bridge_json( - client: &Client, - base_url: &str, - path: &str, - body: Option<&CreateSessionRequest>, -) -> Result { - let request = if let Some(body) = body { - client.post(format!("{base_url}{path}")).json(body) - } else { - client.get(format!("{base_url}{path}")) - }; - let response = request.send().await.map_err(|error| error.to_string())?; - let status = response.status(); - let text = response.text().await.map_err(|error| error.to_string())?; - if !status.is_success() { - return Err(format!( - "bridge request to {path} failed with {status}: {text}" - )); - } - serde_json::from_str(&text).map_err(|error| error.to_string()) -} - -fn ready_remote_bridge(remote_bridge: &RemoteBridgeHandle) -> Option<&RemoteBridgeHandle> { - remote_bridge.session_id.as_ref()?; - Some(remote_bridge) -} - -fn merge_capabilities(left: &[String], right: &[String]) -> Vec { - let mut merged = BTreeSet::new(); - merged.extend(left.iter().cloned()); - merged.extend(right.iter().cloned()); - merged.into_iter().collect() -} - -fn readiness_rank(stage: &str) -> usize { - match stage { - "booting" => 0, - "desktop_ready" => 1, - "bridge_listening" => 2, - "bridge_attached" => 3, - "runtime_ready" => 4, - "failed" => 5, - _ => 0, - } -} - -fn promote_readiness(record: &mut SessionRecord, next_stage: &str) { - let current_rank = record - .readiness_state - .as_deref() - .map(readiness_rank) - .unwrap_or(0); - if readiness_rank(next_stage) >= current_rank { - record.readiness_state = Some(next_stage.to_string()); - } -} - -async fn proxy_bridge_json( - state: &AppState, - remote_bridge: &RemoteBridgeHandle, - endpoint: &str, - action: Option<&ActionRequest>, -) -> Result { - let remote_session_id = remote_bridge - .session_id - .as_ref() - .ok_or_else(|| StructuredError { - code: "provider_bridge_unavailable".to_string(), - message: "remote bridge session is not ready".to_string(), - retryable: true, - category: "provider".to_string(), - details: json!({ "base_url": remote_bridge.base_url }), - artifact_refs: vec![], - })?; - let url = format!( - "{}/api/sessions/{}/{}", - remote_bridge.base_url, remote_session_id, endpoint - ); - let request = if let Some(action) = action { - state.http_client.post(url).json(action) - } else { - state.http_client.get(url) - }; - let response = request.send().await.map_err(|error| StructuredError { - code: "remote_bridge_request_failed".to_string(), - message: error.to_string(), - retryable: true, - category: "provider".to_string(), - details: json!({ "base_url": remote_bridge.base_url, "endpoint": endpoint }), - artifact_refs: vec![], - })?; - let status = response.status(); - let text = response.text().await.map_err(|error| StructuredError { - code: "remote_bridge_response_failed".to_string(), - message: error.to_string(), - retryable: true, - category: "provider".to_string(), - details: json!({ "base_url": remote_bridge.base_url, "endpoint": endpoint }), - artifact_refs: vec![], - })?; - if !status.is_success() { - return Err(StructuredError { - code: "remote_bridge_status_failed".to_string(), - message: format!("remote bridge returned {status}"), - retryable: true, - category: "provider".to_string(), - details: json!({ - "base_url": remote_bridge.base_url, - "endpoint": endpoint, - "status": status.as_u16(), - "body": text, - }), - artifact_refs: vec![], - }); - } - serde_json::from_str(&text).map_err(|error| StructuredError { - code: "remote_bridge_decode_failed".to_string(), - message: error.to_string(), - retryable: true, - category: "provider".to_string(), - details: json!({ "base_url": remote_bridge.base_url, "endpoint": endpoint, "body": text }), - artifact_refs: vec![], - }) -} - -async fn proxy_bridge_bytes( - state: &AppState, - remote_bridge: &RemoteBridgeHandle, - endpoint: &str, -) -> Result, StructuredError> { - let remote_session_id = remote_bridge - .session_id - .as_ref() - .ok_or_else(|| StructuredError { - code: "provider_bridge_unavailable".to_string(), - message: "remote bridge session is not ready".to_string(), - retryable: true, - category: "provider".to_string(), - details: json!({ "base_url": remote_bridge.base_url }), - artifact_refs: vec![], - })?; - let url = format!( - "{}/api/sessions/{}/{}", - remote_bridge.base_url, remote_session_id, endpoint - ); - let response = state - .http_client - .get(url) - .send() - .await - .map_err(|error| StructuredError { - code: "remote_bridge_request_failed".to_string(), - message: error.to_string(), - retryable: true, - category: "provider".to_string(), - details: json!({ "base_url": remote_bridge.base_url, "endpoint": endpoint }), - artifact_refs: vec![], - })?; - let status = response.status(); - if !status.is_success() { - let body = response.text().await.unwrap_or_default(); - return Err(StructuredError { - code: "remote_bridge_status_failed".to_string(), - message: format!("remote bridge returned {status}"), - retryable: true, - category: "provider".to_string(), - details: json!({ - "base_url": remote_bridge.base_url, - "endpoint": endpoint, - "status": status.as_u16(), - "body": body, - }), - artifact_refs: vec![], - }); - } - response - .bytes() - .await - .map(|bytes| bytes.to_vec()) - .map_err(|error| StructuredError { - code: "remote_bridge_bytes_failed".to_string(), - message: error.to_string(), - retryable: true, - category: "provider".to_string(), - details: json!({ "base_url": remote_bridge.base_url, "endpoint": endpoint }), - artifact_refs: vec![], - }) -} - -async fn get_session(State(state): State, AxumPath(id): AxumPath) -> Response { - match session_snapshot(&state, &id).await { - Some(session) => ( - StatusCode::OK, - Json(json!({ "session": enrich_session_record(&session) })), - ) - .into_response(), - None => ( - StatusCode::NOT_FOUND, - Json(json!({ "error": "session not found" })), - ) - .into_response(), - } -} - -async fn delete_session(State(state): State, AxumPath(id): AxumPath) -> Response { - let handle = state.sessions.lock().await.remove(&id); - match handle { - Some(mut handle) => { - if let Some(remote_bridge) = handle.remote_bridge.as_ref() { - if let Some(remote_session_id) = remote_bridge.session_id.as_ref() { - let _ = state - .http_client - .delete(format!( - "{}/api/sessions/{}", - remote_bridge.base_url, remote_session_id - )) - .send() - .await; - } - } - match &mut handle.provider_handle { - SessionProviderHandle::Xvfb { child } => { - let _ = child.kill().await; - } - SessionProviderHandle::ExistingDisplay => {} - SessionProviderHandle::QemuDocker { container_name } => { - let _ = docker_output(&["rm", "-f", container_name]).await; - } - } - (StatusCode::OK, Json(json!({ "ok": true }))).into_response() - } - None => ( - StatusCode::NOT_FOUND, - Json(json!({ "error": "session not found" })), - ) - .into_response(), - } -} - -async fn get_observation( - State(state): State, - AxumPath(id): AxumPath, -) -> Response { - let session = match session_clone(&state, &id).await { - Some(session) => session, - None => { - return ( - StatusCode::NOT_FOUND, - Json(json!({ "error": "session not found" })), - ) - .into_response(); - } - }; - if let Some(backend) = session.backend { - match backend.observation().await { - Ok(observation) => (StatusCode::OK, Json(json!(observation))).into_response(), - Err(error) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": error })), - ) - .into_response(), - } - } else if let Some(remote_bridge) = session.remote_bridge.as_ref().and_then(ready_remote_bridge) - { - match proxy_bridge_json::(&state, remote_bridge, "observation", None).await { - Ok(observation) => (StatusCode::OK, Json(json!(observation))).into_response(), - Err(error) => { - (StatusCode::BAD_GATEWAY, Json(json!({ "error": error }))).into_response() - } - } - } else { - provider_bridge_unavailable_response( - &session.record, - "observation requires a guest runtime bridge inside the VM", - ) - } -} - -async fn get_screenshot(State(state): State, AxumPath(id): AxumPath) -> Response { - let session = match session_clone(&state, &id).await { - Some(session) => session, - None => { - return ( - StatusCode::NOT_FOUND, - Json(json!({ "error": "session not found" })), - ) - .into_response(); - } - }; - if let Some(backend) = session.backend { - match backend.screenshot_png().await { - Ok((bytes, _path)) => { - let mut response = Response::new(bytes.into()); - response.headers_mut().insert( - header::CONTENT_TYPE, - header::HeaderValue::from_static("image/png"), - ); - response - } - Err(error) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": error })), - ) - .into_response(), - } - } else if let Some(remote_bridge) = session.remote_bridge.as_ref().and_then(ready_remote_bridge) - { - match proxy_bridge_bytes(&state, remote_bridge, "screenshot").await { - Ok(bytes) => { - let mut response = Response::new(bytes.into()); - response.headers_mut().insert( - header::CONTENT_TYPE, - header::HeaderValue::from_static("image/png"), - ); - response - } - Err(error) => { - (StatusCode::BAD_GATEWAY, Json(json!({ "error": error }))).into_response() - } - } - } else { - provider_bridge_unavailable_response( - &session.record, - "screenshot capture requires a guest runtime bridge inside the VM", - ) - } -} - -async fn get_available_actions( - State(state): State, - AxumPath(id): AxumPath, -) -> Response { - let session = match session_clone(&state, &id).await { - Some(session) => session, - None => { - return ( - StatusCode::NOT_FOUND, - Json(json!({ "error": "session not found" })), - ) - .into_response(); - } - }; - if let Some(remote_bridge) = session.remote_bridge.as_ref().and_then(ready_remote_bridge) { - match proxy_bridge_json::(&state, remote_bridge, "actions", None).await - { - Ok(mut capabilities) => { - capabilities.provider = session.record.provider.clone(); - capabilities.vm_mode = if session.record.provider == "qemu" { - "qemu".to_string() - } else { - capabilities.vm_mode - }; - capabilities.enrichments = - merge_capabilities(&capabilities.enrichments, &session.record.capabilities); - return (StatusCode::OK, Json(json!(capabilities))).into_response(); - } - Err(error) => { - return (StatusCode::BAD_GATEWAY, Json(json!({ "error": error }))).into_response(); - } - } - } - let mut capabilities = runtime_capabilities( - &session.record.provider, - session.record.capabilities.clone(), - ); - if session.backend.is_none() { - capabilities.actions.clear(); - capabilities.browser_mode = "viewer_only".to_string(); - } - (StatusCode::OK, Json(json!(capabilities))).into_response() -} - -async fn perform_action( - State(state): State, - AxumPath(id): AxumPath, - Json(action): Json, -) -> Response { - let session = match session_clone(&state, &id).await { - Some(session) => session, - None => { - return ( - StatusCode::NOT_FOUND, - Json(json!({ "error": "session not found" })), - ) - .into_response(); - } - }; - if let Some(backend) = session.backend { - let receipt = backend.perform_action(action).await; - (StatusCode::OK, Json(json!(receipt))).into_response() - } else if let Some(remote_bridge) = session.remote_bridge.as_ref().and_then(ready_remote_bridge) - { - match proxy_bridge_json::(&state, remote_bridge, "actions", Some(&action)) - .await - { - Ok(receipt) => (StatusCode::OK, Json(json!(receipt))).into_response(), - Err(error) => { - (StatusCode::BAD_GATEWAY, Json(json!({ "error": error }))).into_response() - } - } - } else { - provider_bridge_unavailable_response( - &session.record, - "actions require a guest runtime bridge inside the VM", - ) - } -} - -fn provider_bridge_unavailable_response(record: &SessionRecord, reason: &str) -> Response { - let error = StructuredError { - code: "provider_bridge_unavailable".to_string(), - message: reason.to_string(), - retryable: false, - category: "provider".to_string(), - details: json!({ - "provider": record.provider, - "qemu_profile": record.qemu_profile, - "viewer_url": record.viewer_url, - "live_desktop_view": derive_live_desktop_view(record), - "bridge_status": record.bridge_status, - "readiness_state": record.readiness_state, - "bridge_error": record.bridge_error, - }), - artifact_refs: record - .bridge_error - .as_ref() - .map(|error| error.artifact_refs.clone()) - .unwrap_or_default(), - }; - (StatusCode::CONFLICT, Json(json!({ "error": error }))).into_response() -} - -async fn session_snapshot(state: &AppState, id: &str) -> Option { - state - .sessions - .lock() - .await - .get(id) - .map(|handle| handle.record.clone()) -} - -async fn session_clone(state: &AppState, id: &str) -> Option { - state - .sessions - .lock() - .await - .get(id) - .map(|handle| SessionHandleClone { - record: handle.record.clone(), - backend: handle.backend.clone(), - remote_bridge: handle.remote_bridge.clone(), - }) -} - -struct SessionHandleClone { - record: SessionRecord, - backend: Option, - remote_bridge: Option, -} - -fn runtime_capabilities(provider: &str, enrichments: Vec) -> RuntimeCapabilities { - capability_descriptor(provider, enrichments) -} - -async fn candidate_displays(state: &AppState) -> Vec { - let sessions = state.sessions.lock().await; - let in_use: BTreeSet = sessions - .values() - .filter_map(|handle| handle.record.display.as_deref()) - .filter_map(|display| display.strip_prefix(':')) - .filter_map(|display| display.parse::().ok()) - .collect(); - - let mut displays = Vec::new(); - for candidate in 90..140 { - if !in_use.contains(&candidate) { - displays.push(format!(":{candidate}")); - } - } - let mut candidate = 140; - while displays.len() < 20 { - if !in_use.contains(&candidate) { - displays.push(format!(":{candidate}")); - } - candidate += 1; - } - displays -} - -#[cfg(test)] -mod tests { - use super::{ - QEMU_PRODUCT_DESKTOP_HOME, QEMU_PRODUCT_DESKTOP_USER, QEMU_PRODUCT_RUNTIME_DIR, - QemuContainerSpec, QemuLaunchMode, SessionRecord, derive_live_desktop_view, - docker_run_args, enrich_session_record, merge_capabilities, parse_published_port, - resolve_qemu_endpoint, should_retry_qemu_without_published_ports, - }; - use chrono::Utc; - use std::path::Path; - - #[test] - fn parses_published_port_from_docker_output() { - assert_eq!(parse_published_port("127.0.0.1:49153\n"), Some(49153)); - assert_eq!( - parse_published_port("0.0.0.0:49153\n:::49153\n"), - Some(49153) - ); - } - - #[test] - fn merges_capabilities_without_duplicates() { - let merged = merge_capabilities( - &["vm".to_string(), "viewer".to_string()], - &["viewer".to_string(), "shell".to_string()], - ); - assert_eq!( - merged, - vec!["shell".to_string(), "viewer".to_string(), "vm".to_string()] - ); - } - - #[test] - fn resolves_qemu_endpoint_from_published_port_or_bridge_ip() { - assert_eq!( - resolve_qemu_endpoint(Some(49153), "172.17.0.2", 4001), - Some("http://127.0.0.1:49153".to_string()) - ); - assert_eq!( - resolve_qemu_endpoint(None, "172.17.0.2", 4001), - Some("http://172.17.0.2:4001".to_string()) - ); - assert_eq!(resolve_qemu_endpoint(None, "", 4001), None); - } - - #[test] - fn detects_nat_failures_that_need_bridge_retry() { - assert!(should_retry_qemu_without_published_ports( - "Unable to enable DNAT rule: iptables: No chain/target/match by that name" - )); - assert!(should_retry_qemu_without_published_ports( - "driver failed programming external connectivity on endpoint" - )); - assert!(!should_retry_qemu_without_published_ports( - "manifest for qemux/qemu:missing not found" - )); - } - - #[test] - fn docker_run_args_include_seed_iso_and_shared_path_mounts() { - let spec = QemuContainerSpec { - container_name: "acu-test", - image: "qemux/qemu", - boot: "/boot.qcow2", - artifacts_dir: Path::new("/tmp/artifacts"), - guest_runtime_binary_path: Path::new("/tmp/guest-runtime"), - boot_image_path: Some(Path::new("/tmp/boot.qcow2")), - seed_iso_path: Some(Path::new("/tmp/seed.iso")), - shared_host_path: Some(Path::new("/tmp/shared")), - viewer_port: 8006, - runtime_port: 4001, - disable_kvm: true, - }; - let args = docker_run_args(&spec, QemuLaunchMode::BridgeNetwork); - assert!(args.iter().any(|arg| arg == "BOOT=/boot.qcow2")); - assert!( - args.iter() - .any(|arg| arg.contains("/tmp/boot.qcow2:/boot.qcow2")) - ); - assert!( - args.iter() - .any(|arg| arg.contains("/tmp/seed.iso:/seed.iso:ro")) - ); - assert!(args.iter().any(|arg| { - arg == "ARGUMENTS=-drive file=/seed.iso,format=raw,media=cdrom,readonly=on" - })); - assert!( - args.iter() - .any(|arg| arg.contains("/tmp/shared:/shared/hostshare:ro")) - ); - assert!( - args.iter() - .any(|arg| arg.contains("/tmp/guest-runtime:/shared/guest-runtime:ro")) - ); - } - - #[test] - fn product_seed_user_data_reinstalls_guest_runtime_and_enables_gui_bootstrap() { - let user_data = super::product_seed_user_data(4900, "firefox"); - assert!(user_data.contains( - "install -m 0755 /mnt/shared/guest-runtime /usr/local/bin/acu-guest-runtime" - )); - assert!(user_data.contains("/etc/xdg/autostart/acu-guest-runtime.desktop")); - assert!(user_data.contains("AutomaticLoginEnable=true")); - assert!(user_data.contains("systemctl set-default graphical.target")); - assert!(user_data.contains("acu-guest-runtime --host 0.0.0.0 --port 4900")); - } - - #[test] - fn regression_seed_user_data_reinstalls_guest_runtime_and_enables_service() { - let user_data = super::regression_seed_user_data(4900, "firefox"); - assert!(user_data.contains( - "install -m 0755 /mnt/shared/guest-runtime /usr/local/bin/acu-guest-runtime" - )); - assert!(user_data.contains("systemctl enable acu-guest-runtime.service")); - assert!(user_data.contains("echo regression > /var/lib/acu-session-profile")); - assert!(user_data.contains("acu-guest-runtime --host 0.0.0.0 --port 4900")); - } - - #[test] - fn derives_truthful_live_desktop_view_modes() { - let qemu_product = SessionRecord { - id: "qemu-product".to_string(), - provider: "qemu".to_string(), - qemu_profile: Some("product".to_string()), - display: None, - width: 1440, - height: 900, - state: "running".to_string(), - created_at: Utc::now(), - artifacts_dir: "artifacts/runtime/qemu-product".to_string(), - capabilities: vec!["viewer".to_string()], - browser_command: Some("firefox".to_string()), - desktop_user: Some(QEMU_PRODUCT_DESKTOP_USER.to_string()), - desktop_home: Some(QEMU_PRODUCT_DESKTOP_HOME.to_string()), - desktop_runtime_dir: Some(QEMU_PRODUCT_RUNTIME_DIR.to_string()), - runtime_base_url: None, - viewer_url: Some("http://127.0.0.1:32771".to_string()), - live_desktop_view: None, - bridge_status: Some("runtime_ready".to_string()), - readiness_state: Some("runtime_ready".to_string()), - bridge_error: None, - }; - let qemu_regression = SessionRecord { - qemu_profile: Some("regression".to_string()), - ..qemu_product.clone() - }; - let xvfb = SessionRecord { - id: "xvfb".to_string(), - provider: "xvfb".to_string(), - qemu_profile: None, - display: Some(":90".to_string()), - viewer_url: None, - ..qemu_product.clone() - }; - - let qemu_product_view = derive_live_desktop_view(&qemu_product); - assert_eq!(qemu_product_view.mode, "stream"); - assert_eq!(qemu_product_view.provider_surface, "qemu_novnc"); - assert!(qemu_product_view.matches_action_plane); - - let qemu_regression_view = derive_live_desktop_view(&qemu_regression); - assert_eq!(qemu_regression_view.mode, "screenshot_poll"); - assert_eq!( - qemu_regression_view.provider_surface, - "guest_xvfb_screenshot" - ); - assert!(qemu_regression_view.debug_url.is_some()); - - let xvfb_view = derive_live_desktop_view(&xvfb); - assert_eq!(xvfb_view.mode, "screenshot_poll"); - assert_eq!(xvfb_view.status, "ready"); - assert!(xvfb_view.reason.is_some()); - } - - #[test] - fn enriches_session_record_with_live_desktop_view() { - let record = SessionRecord { - id: "qemu-product".to_string(), - provider: "qemu".to_string(), - qemu_profile: Some("product".to_string()), - display: None, - width: 1440, - height: 900, - state: "running".to_string(), - created_at: Utc::now(), - artifacts_dir: "artifacts/runtime/qemu-product".to_string(), - capabilities: vec!["viewer".to_string()], - browser_command: Some("firefox".to_string()), - desktop_user: Some(QEMU_PRODUCT_DESKTOP_USER.to_string()), - desktop_home: Some(QEMU_PRODUCT_DESKTOP_HOME.to_string()), - desktop_runtime_dir: Some(QEMU_PRODUCT_RUNTIME_DIR.to_string()), - runtime_base_url: None, - viewer_url: Some("http://127.0.0.1:32771".to_string()), - live_desktop_view: None, - bridge_status: Some("runtime_ready".to_string()), - readiness_state: Some("runtime_ready".to_string()), - bridge_error: None, - }; - - let enriched = enrich_session_record(&record); - assert!(enriched.live_desktop_view.is_some()); - assert_eq!(record.live_desktop_view, None); - } + guest_runtime::run(RuntimeConfig::from_env_and_args()).await; } diff --git a/crates/guest-runtime/tests/qemu_bridge.rs b/crates/guest-runtime/tests/qemu_bridge.rs index a4d6362..7aeba21 100644 --- a/crates/guest-runtime/tests/qemu_bridge.rs +++ b/crates/guest-runtime/tests/qemu_bridge.rs @@ -196,6 +196,123 @@ async fn qemu_boot_failures_include_container_logs() { runtime.shutdown().await; } +#[tokio::test] +async fn deleting_qemu_sessions_removes_runtime_artifacts() { + let _guard = test_lock().lock().await; + let fake_bin_dir = TestDir::new("guest-runtime-fake-bin"); + write_fake_docker(fake_bin_dir.path(), DockerMode::HealthyViewerOnly); + let artifacts_dir = TestDir::new("guest-runtime-artifacts"); + let mut runtime = GuestRuntimeHarness::start(fake_bin_dir.path(), artifacts_dir.path()).await; + + let (create_status, create_payload) = runtime.json_request( + "POST", + "/api/sessions", + Some(&json!({ + "provider": "qemu", + "boot": "alpine", + "width": 1280, + "height": 720, + })), + ); + assert_eq!( + create_status, 201, + "unexpected create payload: {create_payload}" + ); + let session = &create_payload["session"]; + let session_id = session["id"].as_str().expect("session id"); + let session_artifacts = PathBuf::from( + session["artifacts_dir"] + .as_str() + .expect("session artifacts dir"), + ); + fs::write(session_artifacts.join("data.img"), b"artifact").expect("seed artifact"); + assert!(session_artifacts.exists()); + + let (delete_status, delete_payload) = + runtime.json_request("DELETE", &format!("/api/sessions/{session_id}"), None); + assert_eq!( + delete_status, 200, + "unexpected delete payload: {delete_payload}" + ); + assert!( + !session_artifacts.exists(), + "session artifacts dir should be removed after delete" + ); + + runtime.shutdown().await; +} + +#[tokio::test] +async fn startup_janitor_reaps_marked_runtime_directories() { + let _guard = test_lock().lock().await; + let fake_bin_dir = TestDir::new("guest-runtime-fake-bin"); + write_fake_docker(fake_bin_dir.path(), DockerMode::HealthyViewerOnly); + let artifacts_dir = TestDir::new("guest-runtime-artifacts"); + let stale_dir = artifacts_dir.path().join(Uuid::new_v4().to_string()); + fs::create_dir_all(&stale_dir).expect("create stale dir"); + fs::write(stale_dir.join("data.img"), b"stale").expect("seed stale data"); + fs::write( + stale_dir.join(".inspectors-storage.json"), + serde_json::to_vec_pretty(&json!({ + "version": 1, + "owner": "inspectors", + "tier": "runtime", + "kind": "session", + "created_at": "2026-04-15T08:19:32Z", + "session_id": stale_dir.file_name().and_then(|value| value.to_str()), + "provider": "qemu", + "qemu_profile": "product", + "container_name": null, + "process_id": null, + })) + .expect("serialize marker"), + ) + .expect("write marker"); + + let mut runtime = GuestRuntimeHarness::start(fake_bin_dir.path(), artifacts_dir.path()).await; + assert!( + !stale_dir.exists(), + "startup janitor should remove stale marked runtime dirs" + ); + + runtime.shutdown().await; +} + +#[tokio::test] +async fn reclaim_endpoint_reports_and_reclaims_legacy_runtime_state() { + let _guard = test_lock().lock().await; + let fake_bin_dir = TestDir::new("guest-runtime-fake-bin"); + write_fake_docker(fake_bin_dir.path(), DockerMode::HealthyViewerOnly); + let artifacts_dir = TestDir::new("guest-runtime-artifacts"); + let legacy_dir = artifacts_dir.path().join(Uuid::new_v4().to_string()); + fs::create_dir_all(legacy_dir.join("seed")).expect("create legacy seed dir"); + fs::write(legacy_dir.join("data.img"), b"legacy").expect("write legacy data"); + + let mut runtime = GuestRuntimeHarness::start(fake_bin_dir.path(), artifacts_dir.path()).await; + + let (report_status, report_payload) = runtime.json_request( + "POST", + "/api/storage/reclaim", + Some(&json!({ "mode": "report" })), + ); + assert_eq!(report_status, 200, "unexpected report payload: {report_payload}"); + assert_eq!(report_payload["candidate_count"], 1); + assert_eq!(report_payload["candidates"][0]["kind"], "legacy_runtime"); + assert!(legacy_dir.exists()); + + let (apply_status, apply_payload) = runtime.json_request( + "POST", + "/api/storage/reclaim", + Some(&json!({ "mode": "apply" })), + ); + assert_eq!(apply_status, 200, "unexpected apply payload: {apply_payload}"); + assert_eq!(apply_payload["candidate_count"], 1); + assert_eq!(apply_payload["reclaimed"][0], legacy_dir.to_string_lossy().to_string()); + assert!(!legacy_dir.exists(), "legacy runtime dir should be reclaimed"); + + runtime.shutdown().await; +} + #[derive(Clone, Copy)] enum DockerMode { HealthyViewerOnly, @@ -210,6 +327,9 @@ set -eu cmd="${1:-}" shift || true case "$cmd" in + ps) + exit 0 + ;; run) echo "stub-container-id" ;; @@ -242,6 +362,9 @@ set -eu cmd="${1:-}" shift || true case "$cmd" in + ps) + exit 0 + ;; run) echo "stub-container-id" ;; diff --git a/scripts/qemu_guest_assets.py b/scripts/qemu_guest_assets.py index a155553..254b02f 100755 --- a/scripts/qemu_guest_assets.py +++ b/scripts/qemu_guest_assets.py @@ -18,6 +18,7 @@ PROFILE_TIMEOUTS = {"regression": 600, "product": 2400} PROFILE_RAM_MB = {"regression": "4096", "product": "8192"} PROFILE_DISK_GB = {"regression": "20G", "product": "40G"} +STORAGE_MARKER_FILE = ".inspectors-storage.json" def run(cmd: list[str], *, capture: bool = False, check: bool = True, cwd: Path | None = None, env: dict[str, str] | None = None) -> subprocess.CompletedProcess[str]: @@ -36,6 +37,19 @@ def ensure_dir(path: Path) -> Path: return path +def write_storage_marker(path: Path, profile: str) -> None: + payload = { + "version": 1, + "owner": "inspectors", + "tier": "runtime", + "kind": "prepare_build", + "created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "provider": "qemu", + "qemu_profile": profile, + } + (path / STORAGE_MARKER_FILE).write_text(json.dumps(payload, indent=2)) + + def http_json(url: str, method: str = "GET", payload: dict[str, Any] | None = None) -> Any: data = None if payload is None else json.dumps(payload).encode("utf-8") req = urllib.request.Request(url, method=method, data=data, headers={"Content-Type": "application/json"}) @@ -290,6 +304,7 @@ def ensure_image(profile: str, cache_root: Path, guest_runtime_binary: Path, qem work_dir = Path( tempfile.mkdtemp(prefix=f"acu-qemu-{profile}-", dir=str(ensure_dir(cache_root / "_build"))) ) + write_storage_marker(work_dir, profile) build_image = work_dir / "boot.qcow2" shutil.copy2(base_image, build_image) resize_qcow2(qemu_image, build_image, PROFILE_DISK_GB[profile]) @@ -331,6 +346,8 @@ def ensure_image(profile: str, cache_root: Path, guest_runtime_binary: Path, qem "-e", "ARGUMENTS=-drive file=/seed.iso,format=raw,media=cdrom,readonly=on", "-v", + f"{work_dir}:/storage", + "-v", f"{build_image}:/boot.qcow2", "-v", f"{seed_iso}:/seed.iso:ro", @@ -346,56 +363,58 @@ def ensure_image(profile: str, cache_root: Path, guest_runtime_binary: Path, qem logs_path = profile_dir / "prepare.log" try: - deadline = time.time() + PROFILE_TIMEOUTS[profile] - base_runtime_url = None - while time.time() < deadline: - if not container_exists(container_name): - raise RuntimeError(f"{container_name} exited before guest runtime became reachable") - ip = inspect_container_ip(container_name) - if ip: - base_runtime_url = f"http://{ip}:4001" - try: - health = http_json(f"{base_runtime_url}/health") - if health.get("status") == "ok": - break - except Exception: - pass - time.sleep(5) - else: - raise TimeoutError(f"timed out waiting for {profile} guest runtime health") - - assert base_runtime_url is not None - session_id, display = maybe_attach_runtime(base_runtime_url, profile, browser_command) - graceful_shutdown(base_runtime_url, session_id) - shutdown_deadline = time.time() + 60 - while time.time() < shutdown_deadline: - result = run(["docker", "inspect", "-f", "{{.State.Running}}", container_name], capture=True, check=False) - if result.returncode != 0 or result.stdout.strip() != "true": - break - time.sleep(2) - except Exception: - ensure_dir(logs_path.parent) - logs_path.write_text(docker_logs(container_name)) - raise + try: + deadline = time.time() + PROFILE_TIMEOUTS[profile] + base_runtime_url = None + while time.time() < deadline: + if not container_exists(container_name): + raise RuntimeError(f"{container_name} exited before guest runtime became reachable") + ip = inspect_container_ip(container_name) + if ip: + base_runtime_url = f"http://{ip}:4001" + try: + health = http_json(f"{base_runtime_url}/health") + if health.get("status") == "ok": + break + except Exception: + pass + time.sleep(5) + else: + raise TimeoutError(f"timed out waiting for {profile} guest runtime health") + + assert base_runtime_url is not None + session_id, display = maybe_attach_runtime(base_runtime_url, profile, browser_command) + graceful_shutdown(base_runtime_url, session_id) + shutdown_deadline = time.time() + 60 + while time.time() < shutdown_deadline: + result = run(["docker", "inspect", "-f", "{{.State.Running}}", container_name], capture=True, check=False) + if result.returncode != 0 or result.stdout.strip() != "true": + break + time.sleep(2) + except Exception: + ensure_dir(logs_path.parent) + logs_path.write_text(docker_logs(container_name)) + raise + finally: + run(["docker", "rm", "-f", "-v", container_name], check=False, capture=True) + + ensure_dir(profile_dir) + shutil.move(str(build_image), template_image) + metadata = { + "profile": profile, + "image_path": str(template_image), + "guest_runtime_binary": str(guest_runtime_binary), + "base_image": str(base_image), + "built_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "browser_command": browser_command, + "logs_path": str(logs_path), + "ssh_private_key": str(private_key), + "ssh_user": "ubuntu", + } + metadata_path.write_text(json.dumps(metadata, indent=2)) + return {"profile": profile, "image_path": str(template_image), "metadata_path": str(metadata_path), "created": True} finally: - run(["docker", "rm", "-f", container_name], check=False, capture=True) - - ensure_dir(profile_dir) - shutil.move(str(build_image), template_image) - metadata = { - "profile": profile, - "image_path": str(template_image), - "guest_runtime_binary": str(guest_runtime_binary), - "base_image": str(base_image), - "built_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), - "browser_command": browser_command, - "logs_path": str(logs_path), - "ssh_private_key": str(private_key), - "ssh_user": "ubuntu", - } - metadata_path.write_text(json.dumps(metadata, indent=2)) - shutil.rmtree(work_dir, ignore_errors=True) - return {"profile": profile, "image_path": str(template_image), "metadata_path": str(metadata_path), "created": True} + shutil.rmtree(work_dir, ignore_errors=True) def main() -> int: From 548a874f633bf607c40874c1e02abdbbe12613d4 Mon Sep 17 00:00:00 2001 From: OneNoted Date: Wed, 15 Apr 2026 10:45:47 +0200 Subject: [PATCH 02/10] feat: make the default inspectors flow easier for agents to drive The happy path still existed in the backend, but the default surface started from provider/debug knobs instead of the session -> task -> live view -> cleanup loop an agent should infer first. This change promotes the qemu-first start-and-watch workflow, adds explicit delete and stale-storage reclaim actions, wires optional desktop activation for installed-app oversight, updates SDK/docs around the canonical contract, and keeps advanced/debug controls available without making them primary. Constraint: Preserve honest xvfb fallback semantics while keeping qemu product as the canonical live path Constraint: Keep the packaged desktop resources in sync with the source web UI and control-plane server Rejected: Full UI redesign before storage and contract ergonomics were in place | too broad for the approved plan Rejected: Desktop activation as a required step in the agent contract | should remain optional additive ergonomics Confidence: high Scope-risk: moderate Reversibility: clean Directive: Keep the default path start-and-watch first; new debug controls belong under advanced surfaces, not the main flow Tested: bun run typecheck; bun test apps/control-plane/src/server.desktop-app.test.ts apps/web-ui/public/provider-default.test.js crates/desktop-app/resources/control-plane/ui/provider-default.test.js packages/ts-sdk/src/index.test.ts; bun run --filter @acu/control-plane build; bun run sync:desktop-resources; cargo test -p guest-runtime --quiet; bun run test; bun run lint Not-tested: Manual desktop activation against a real installed desktop bundle outside the test harness --- README.md | 12 ++ .../src/server.desktop-app.test.ts | 45 +++++++- apps/control-plane/src/server.ts | 30 ++++- apps/web-ui/public/app.js | 42 +++++-- apps/web-ui/public/index.html | 103 ++++++++++-------- apps/web-ui/public/provider-default.test.js | 13 +++ apps/web-ui/public/styles/components.css | 7 ++ .../resources/control-plane/dist/server.js | 40 ++++++- .../resources/control-plane/ui/app.js | 42 +++++-- .../resources/control-plane/ui/index.html | 103 ++++++++++-------- .../control-plane/ui/provider-default.test.js | 13 +++ .../control-plane/ui/styles/components.css | 7 ++ crates/guest-runtime/src/lib.rs | 71 ++++++++---- crates/guest-runtime/tests/qemu_bridge.rs | 72 +++++++++++- docs/api-reference.md | 35 ++++++ docs/architecture.md | 7 +- docs/getting-started.md | 20 ++++ packages/ts-sdk/src/index.test.ts | 1 + packages/ts-sdk/src/index.ts | 14 +++ python/sdk/client.py | 3 + 20 files changed, 537 insertions(+), 143 deletions(-) diff --git a/README.md b/README.md index f408f2d..ddc13eb 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,18 @@ Agent Computer Use Platform gives agents a real Linux desktop to work in, with s - Gives you one place to work with screenshots, shell commands, files, desktop input, and session state. - Keeps operators in the loop with a live desktop view, structured receipts, and clear fallback modes. +## Default flow + +The default happy path is now: + +1. Start a session with default settings (`qemu` + `product`) +2. Wait for readiness +3. Submit a task +4. Watch `live_desktop_view` or the truthful screenshot fallback +5. Delete the session when done unless you explicitly export evidence + +That is the workflow agents should infer first. Advanced/debug controls still exist, but they are secondary. + ## Quickstart ```bash diff --git a/apps/control-plane/src/server.desktop-app.test.ts b/apps/control-plane/src/server.desktop-app.test.ts index 2d528ea..058e9a2 100644 --- a/apps/control-plane/src/server.desktop-app.test.ts +++ b/apps/control-plane/src/server.desktop-app.test.ts @@ -2,7 +2,7 @@ import test from 'node:test'; import assert from 'node:assert/strict'; import { once } from 'node:events'; import { createServer } from 'node:http'; -import { mkdtempSync, mkdirSync, rmSync, writeFileSync } from 'node:fs'; +import { mkdtempSync, mkdirSync, readFileSync, rmSync, writeFileSync } from 'node:fs'; import { join } from 'node:path'; import { tmpdir } from 'node:os'; @@ -198,3 +198,46 @@ test('control-plane proxies storage reclaim requests', async () => { await stopServers(controlPlane, guest.guestServer); } }); + +test('qemu product session creation can activate the desktop app when configured', async () => { + const activationDir = tempDir('acu-desktop-activate'); + const activationScript = join(activationDir, 'activate.js'); + const activationOutput = join(activationDir, 'activation.json'); + writeFileSync( + activationScript, + `const fs = require('node:fs'); fs.writeFileSync(process.argv[2], JSON.stringify(process.argv.slice(3)));`, + ); + + const previousActivateBin = process.env.ACU_DESKTOP_ACTIVATE_BIN; + const previousActivateArgs = process.env.ACU_DESKTOP_ACTIVATE_ARGS_JSON; + process.env.ACU_DESKTOP_ACTIVATE_BIN = process.execPath; + process.env.ACU_DESKTOP_ACTIVATE_ARGS_JSON = JSON.stringify([activationScript, activationOutput]); + + const guest = await startGuestServer(); + const controlPlane = await startControlPlaneServer(0, guest.baseUrl); + const baseUrl = `http://127.0.0.1:${(controlPlane.server.address() as { port: number }).port}`; + + try { + const response = await fetch(`${baseUrl}/api/sessions`, { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ provider: 'qemu', qemu_profile: 'product' }), + }); + assert.equal(response.status, 201); + const activationArgs = JSON.parse(readFileSync(activationOutput, 'utf8')) as string[]; + assert.deepEqual(activationArgs, ['--activate-desktop', '--session', 'qemu-product']); + } finally { + if (previousActivateBin === undefined) { + delete process.env.ACU_DESKTOP_ACTIVATE_BIN; + } else { + process.env.ACU_DESKTOP_ACTIVATE_BIN = previousActivateBin; + } + if (previousActivateArgs === undefined) { + delete process.env.ACU_DESKTOP_ACTIVATE_ARGS_JSON; + } else { + process.env.ACU_DESKTOP_ACTIVATE_ARGS_JSON = previousActivateArgs; + } + await stopServers(controlPlane, guest.guestServer); + rmSync(activationDir, { recursive: true, force: true }); + } +}); diff --git a/apps/control-plane/src/server.ts b/apps/control-plane/src/server.ts index 209fa81..2aac9cf 100644 --- a/apps/control-plane/src/server.ts +++ b/apps/control-plane/src/server.ts @@ -57,6 +57,10 @@ function resolveArtifactRoot(): string { return resolve(process.env.ACU_ARTIFACT_ROOT ?? join(repoRoot, 'artifacts')); } +function resolveExportRoot(artifactRoot: string): string { + return join(artifactRoot, 'exports'); +} + async function loadPlaywrightModule(): Promise { if (!playwrightEnabled) { throw new Error('playwright browser adapter is disabled in this environment'); @@ -294,6 +298,28 @@ async function fetchBrowserSnapshot(url: string): Promise typeof value === 'string') : []; + } catch { + return []; + } +} + +async function maybeActivateDesktop(session: SessionRecord): Promise { + const activateBin = process.env.ACU_DESKTOP_ACTIVATE_BIN; + if (!activateBin) return; + const liveDesktopView = withLiveDesktopView(session).live_desktop_view; + if (!liveDesktopView || liveDesktopView.mode !== 'stream') return; + if (session.provider !== 'qemu' || session.qemu_profile === 'regression') return; + + const args = [...getDesktopActivateArgs(), '--activate-desktop', '--session', session.id]; + await execFileAsync(activateBin, args); +} + async function guestRequest(state: ControlPlaneState, path: string, init?: RequestInit): Promise<{ status: number; payload: any }> { const response = await fetch(`${state.guestRuntimeUrl}${path}`, { headers: { 'content-type': 'application/json', ...(init?.headers ?? {}) }, @@ -587,7 +613,7 @@ async function handleBrowserAction(state: ControlPlaneState, sessionId: string, } break; case 'browser_screenshot': { - const path = join(state.artifactRoot, `${sessionId}-${Date.now()}-browser.png`); + const path = join(resolveExportRoot(state.artifactRoot), `${sessionId}-${Date.now()}-browser.png`); await mkdir(dirname(path), { recursive: true }); await browser.page.screenshot({ path, fullPage: true }); result = { path }; @@ -690,6 +716,7 @@ export function createRequestHandler(state: ControlPlaneState) { }); if (upstream.status === 201 && upstream.payload?.session) { upstream.payload.session = withLiveDesktopView(upstream.payload.session as SessionRecord); + await maybeActivateDesktop(upstream.payload.session as SessionRecord).catch(() => undefined); } json(res, upstream.status, upstream.payload); return; @@ -969,6 +996,7 @@ export async function startControlPlaneServer(port = Number(process.env.PORT ?? const uiRoot = resolveUiRoot(); const artifactRoot = resolveArtifactRoot(); await mkdir(artifactRoot, { recursive: true }); + await mkdir(resolveExportRoot(artifactRoot), { recursive: true }); const state: ControlPlaneState = { guestRuntimeUrl, uiRoot, diff --git a/apps/web-ui/public/app.js b/apps/web-ui/public/app.js index ab5dff6..2299b5a 100644 --- a/apps/web-ui/public/app.js +++ b/apps/web-ui/public/app.js @@ -41,6 +41,19 @@ function updateProviderOptions() { qemuOptions.hidden = !isQemu; } +function clearLocalSelection() { + sessionId = null; + taskId = null; + existingSessionInput.value = ''; + syncSessionLocation(); + sessionSummary.textContent = 'No session'; + sessionMeta.textContent = 'No session'; + observation.textContent = ''; + historyEl.textContent = ''; + tasksEl.textContent = ''; + resetDesktopState(); +} + function syncSessionLocation() { const nextUrl = buildSessionUrl(sessionId); window.history.replaceState({}, '', nextUrl); @@ -211,6 +224,15 @@ document.getElementById('create-session').addEventListener('click', async () => }); document.getElementById('refresh-session').addEventListener('click', refresh); +document.getElementById('delete-session').addEventListener('click', async () => { + if (!sessionId) { + sessionSummary.textContent = 'Select a session before deleting it.'; + return; + } + await json(`/api/sessions/${sessionId}`, { method: 'DELETE' }); + clearLocalSelection(); + await refreshSessionPicker(); +}); document.getElementById('attach-session').addEventListener('click', async () => { const nextSessionId = parseSessionReference(existingSessionInput.value); if (!nextSessionId) { @@ -228,17 +250,15 @@ sessionPicker?.addEventListener('change', async () => { existingSessionInput.value = sessionId; await refresh(); }); -document.getElementById('clear-session').addEventListener('click', () => { - sessionId = null; - taskId = null; - existingSessionInput.value = ''; - syncSessionLocation(); - sessionSummary.textContent = 'No session'; - sessionMeta.textContent = 'No session'; - observation.textContent = ''; - historyEl.textContent = ''; - tasksEl.textContent = ''; - resetDesktopState(); +document.getElementById('clear-session').addEventListener('click', clearLocalSelection); +document.getElementById('reclaim-storage')?.addEventListener('click', async () => { + const payload = await json('/api/storage/reclaim', { + method: 'POST', + body: JSON.stringify({ mode: 'apply' }), + }); + tasksEl.textContent = JSON.stringify(payload, null, 2); + sessionSummary.textContent = `reclaim=${payload.reclaimed?.length ?? 0} · candidates=${payload.candidate_count ?? 0}`; + await refreshSessionPicker(); }); providerSelect.addEventListener('change', updateProviderOptions); updateProviderOptions(); diff --git a/apps/web-ui/public/index.html b/apps/web-ui/public/index.html index 3b67132..42bae69 100644 --- a/apps/web-ui/public/index.html +++ b/apps/web-ui/public/index.html @@ -94,45 +94,55 @@

Session

-
- - +

Default path: create a QEMU product session, watch the live desktop, and delete it when you are done.

+
+ + +
-
-
- -
- -
- +
diff --git a/apps/web-ui/public/provider-default.test.js b/apps/web-ui/public/provider-default.test.js index e92647c..1fbf0d0 100644 --- a/apps/web-ui/public/provider-default.test.js +++ b/apps/web-ui/public/provider-default.test.js @@ -22,3 +22,16 @@ test('session picker is rendered for choosing running sessions', () => { assert.match(html, /