From e2232155a707a5e75c64053c26d0a791bbf9206f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 14:06:01 +0000 Subject: [PATCH 1/4] Initial plan From 085fae78d11d50c441e2dc78035c7f1c81a241aa Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 14:22:24 +0000 Subject: [PATCH 2/4] Add LAVA artifact upload feature via axum HTTP server - Add axum = "0.8" dependency; enable full tokio features - New src/upload.rs: UploadServer, UploadStore, JobArtifacts (RAII-based, auto-deregisters on drop, no global static, no periodic cleanup) - Add runner.ARTIFACT_UPLOAD_URL template variable via TransformVariables.runner - Add Artifact{path, data} to LavaUploadableFileType (path only, no id prefix) - Run struct gains upload_server and artifacts fields - transform() takes upload_url param; submit command creates JobArtifacts - get_uploadable_files() includes artifacts from submit jobs - upload_artifact axum handler uses State>> - main() starts server only if LAVA_ARTIFACT_UPLOAD_BASE_URL is set - Single env var replaces separate host/port; uses TcpListener::bind + axum::serve Agent-Logs-Url: https://github.com/collabora/lava-gitlab-runner/sessions/831d8c7f-5dc6-47d2-82c1-7e79eed0c6bf Co-authored-by: sjoerdsimons <22603932+sjoerdsimons@users.noreply.github.com> --- Cargo.lock | 91 ++++++++++++++++++++++++++++++++ Cargo.toml | 3 +- src/main.rs | 142 +++++++++++++++++++++++++++++++++++++++++--------- src/upload.rs | 138 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 349 insertions(+), 25 deletions(-) create mode 100644 src/upload.rs diff --git a/Cargo.lock b/Cargo.lock index eba9d53..300dc43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -153,6 +153,58 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "axum" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" +dependencies = [ + "axum-core", + "bytes", + "form_urlencoded", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "base64" version = "0.22.1" @@ -1731,6 +1783,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "hyper" version = "1.8.1" @@ -1745,6 +1803,7 @@ dependencies = [ "http", "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "pin-utils", @@ -2153,6 +2212,7 @@ name = "lava-gitlab-runner" version = "0.3.4" dependencies = [ "async-trait", + "axum", "bytes", "chrono", "clap", @@ -2274,6 +2334,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "maybe-async" version = "0.2.10" @@ -3042,6 +3108,29 @@ dependencies = [ "unsafe-libyaml-norway", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_with" version = "3.12.0" @@ -3476,6 +3565,7 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -3514,6 +3604,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index 44be709..348bd45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,8 @@ colored = "3" gitlab-runner = "0.3.0" lava-api = "0.2" url = "2" -tokio = "1" +tokio = { version = "1", features = ["full"] } +axum = "0.8" async-trait = "0.1" futures = "0.3" handlebars = "6" diff --git a/src/main.rs b/src/main.rs index 620ab25..25af2a8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,14 @@ use std::borrow::Cow; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, HashSet}; +use std::env; use std::io::{IsTerminal, Read}; use std::sync::{Arc, LazyLock, Mutex}; use std::time::Duration; +use axum::Router; +use axum::extract::{Path, State}; +use axum::routing::post; use bytes::{Buf, Bytes}; use clap::Parser; use colored::{Color, Colorize}; @@ -33,6 +37,9 @@ use url::Url; mod throttled; use throttled::{ThrottledLava, Throttler}; +mod upload; +use upload::{JobArtifacts, UploadServer, UploadStore}; + const MASK_PATTERN: &str = "[MASKED]"; #[derive(Debug, Clone)] @@ -118,6 +125,7 @@ struct MonitorJobs { #[derive(Clone, Debug, Serialize)] struct TransformVariables<'a> { pub job: BTreeMap<&'a str, &'a str>, + pub runner: BTreeMap<&'a str, &'a str>, } #[derive(Debug)] @@ -288,15 +296,16 @@ impl AvailableArtifactStore { } } -#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] +#[derive(Clone, Debug, PartialEq, Eq)] enum LavaUploadableFileType { Log { id: i64 }, Junit { id: i64 }, + Artifact { path: String, data: Bytes }, } #[derive(Clone)] struct LavaUploadableFile { - store: Arc, + store: Option>, which: LavaUploadableFileType, } @@ -314,30 +323,25 @@ impl core::cmp::PartialEq for LavaUploadableFile { impl core::cmp::Eq for LavaUploadableFile {} -impl core::cmp::PartialOrd for LavaUploadableFile { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl core::cmp::Ord for LavaUploadableFile { - fn cmp(&self, other: &Self) -> core::cmp::Ordering { - self.which.cmp(&other.which) - } -} - impl LavaUploadableFile { pub fn log(id: i64, store: Arc) -> Self { Self { which: LavaUploadableFileType::Log { id }, - store, + store: Some(store), } } pub fn junit(id: i64, store: Arc) -> Self { Self { which: LavaUploadableFileType::Junit { id }, - store, + store: Some(store), + } + } + + pub fn artifact(path: String, data: Bytes) -> Self { + Self { + which: LavaUploadableFileType::Artifact { path, data }, + store: None, } } } @@ -347,9 +351,10 @@ impl UploadableFile for LavaUploadableFile { type Data<'a> = Box; fn get_path(&self) -> Cow<'_, str> { - match self.which { + match &self.which { LavaUploadableFileType::Log { id } => format!("{}_log.yaml", id).into(), LavaUploadableFileType::Junit { id } => format!("{}_junit.xml", id).into(), + LavaUploadableFileType::Artifact { path, .. } => path.as_str().into(), } } @@ -357,10 +362,27 @@ impl UploadableFile for LavaUploadableFile { outputln!("Uploading {}", self.get_path()); match &self.which { LavaUploadableFileType::Log { id } => { - Ok(Box::new(self.store.get_log(*id).into_async_read())) + Ok(Box::new( + self.store + .as_ref() + .unwrap() + .get_log(*id) + .into_async_read(), + )) } LavaUploadableFileType::Junit { id } => { - Ok(Box::new(self.store.get_junit(*id).into_async_read())) + Ok(Box::new( + self.store + .as_ref() + .unwrap() + .get_junit(*id) + .into_async_read(), + )) + } + LavaUploadableFileType::Artifact { data, .. } => { + Ok(Box::new(futures::io::AllowStdIo::new( + std::io::Cursor::new(data.to_vec()), + ))) } } } @@ -374,6 +396,8 @@ struct Run { masker: Arc, ids: Vec, cancel_behaviour: Option, + upload_server: Option>>, + artifacts: Option, } impl Run { @@ -382,6 +406,7 @@ impl Run { url: Url, job: Job, cancel_behaviour: Option, + upload_server: Option>>, ) -> Self { let masked = job .variables() @@ -398,6 +423,8 @@ impl Run { masker, ids: Vec::new(), cancel_behaviour, + upload_server, + artifacts: None, } } @@ -725,7 +752,7 @@ impl Run { } } - fn transform(&self, definition: String) -> Result { + fn transform(&self, definition: String, upload_url: &str) -> Result { let mut handlebars = Handlebars::new(); handlebars.set_strict_mode(true); handlebars @@ -740,6 +767,7 @@ impl Run { .variables() .map(|var| (var.key(), var.value())) .collect(), + runner: BTreeMap::from([("ARTIFACT_UPLOAD_URL", upload_url)]), }; handlebars.render("definition", &mappings).map_err(|e| { outputln!("Failed to substitute in template: {}", e); @@ -755,8 +783,15 @@ impl Run { "submit" => { if let Some(filename) = p.next() { let data = self.find_file(filename).await?; + let artifacts = self.upload_server.as_ref().and_then(|s| { + s.lock().unwrap().add_new_job() + }); + let upload_url = artifacts + .as_ref() + .map(|a| a.upload_url().to_string()) + .unwrap_or_default(); let definition = match String::from_utf8(data) { - Ok(data) => self.transform(data)?, + Ok(data) => self.transform(data, &upload_url)?, Err(_) => { outputln!("Job definition is not utf-8"); return Err(()); @@ -764,6 +799,7 @@ impl Run { }; let ids = self.submit_definition(&definition).await?; self.ids.extend(&ids); + self.artifacts = artifacts; self.follow_job(ids[0], cancel_token, JobCancelBehaviour::CancelLava) .await } else { @@ -836,6 +872,14 @@ impl CancellableJobHandler for Run { available_files.push(LavaUploadableFile::log(*id, self.store.clone())); available_files.push(LavaUploadableFile::junit(*id, self.store.clone())); } + if let Some(artifacts) = &self.artifacts { + for path in artifacts.artifact_paths() { + let data = artifacts + .artifact_data(&path) + .expect("Artifact data missing for path returned by artifact_paths"); + available_files.push(LavaUploadableFile::artifact(path, data)); + } + } Ok(Box::new(available_files.into_iter())) } } @@ -846,7 +890,10 @@ static LAVA_MAP: LazyLock = LazyLock::new(|| Arc::new(Mutex::new(BTreeM static MAX_CONCURRENT_REQUESTS: LazyLock>> = LazyLock::new(|| Arc::new(Mutex::new(20))); -async fn new_job(job: Job) -> Result, ()> { +async fn new_job( + job: Job, + upload_server: Option>>, +) -> Result, ()> { info!("Creating new run for job: {}", job.id()); let lava_url = match job.variable("LAVA_URL") { Some(u) => u, @@ -918,7 +965,15 @@ async fn new_job(job: Job) -> Result>>, + Path((key, path)): Path<(String, String)>, + body: Bytes, +) { + store.lock().unwrap().upload_file(&key, &path, body); } #[tokio::main] @@ -984,8 +1039,47 @@ async fn main() { .build() .await; + let upload_server = match env::var("LAVA_ARTIFACT_UPLOAD_BASE_URL") { + Ok(base_url_str) => { + let base_url: url::Url = base_url_str + .parse() + .expect("LAVA_ARTIFACT_UPLOAD_BASE_URL is not a valid URL"); + let listen_addr = env::var("LAVA_ARTIFACT_UPLOAD_LISTEN_ADDR") + .unwrap_or_else(|_| "0.0.0.0:0".to_string()); + + let mut server = UploadServer::new(); + server.set_base_url(base_url); + let server = Arc::new(Mutex::new(server)); + let store = server.lock().unwrap().store(); + + let app = Router::new() + .route("/artifacts/{key}/{*path}", post(upload_artifact)) + .with_state(store); + + let listener = tokio::net::TcpListener::bind(&listen_addr) + .await + .expect("Failed to bind upload listener"); + info!( + "Artifact upload server listening on {}", + listener.local_addr().unwrap() + ); + + tokio::spawn(async move { + axum::serve(listener, app) + .await + .expect("Artifact upload server failed"); + }); + + Some(server) + } + Err(_) => None, + }; + runner - .run(new_job, 64) + .run( + move |job| new_job(job, upload_server.clone()), + 64, + ) .await .expect("Couldn't pick up jobs"); } diff --git a/src/upload.rs b/src/upload.rs new file mode 100644 index 0000000..8f9b8f0 --- /dev/null +++ b/src/upload.rs @@ -0,0 +1,138 @@ +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex}; + +use bytes::Bytes; +use tracing::warn; + +/// Shared state between the HTTP server and the job handlers. +pub struct UploadStore { + jobs: BTreeMap>>, +} + +impl UploadStore { + fn new() -> Self { + Self { + jobs: BTreeMap::new(), + } + } + + /// Called by the axum handler to store an uploaded file. + pub fn upload_file(&mut self, key: &str, path: &str, data: Bytes) { + if let Some(ja) = self.jobs.get(key) { + ja.lock().unwrap().upload_artifact(path, data); + } else { + warn!( + "Ignoring attempt to upload {} for non-existent job key", + path + ); + } + } +} + +pub struct UploadServer { + base_url: Option, + store: Arc>, +} + +impl UploadServer { + pub fn new() -> Self { + Self { + base_url: None, + store: Arc::new(Mutex::new(UploadStore::new())), + } + } + + /// Set the externally-routable base URL for the upload server. + /// E.g. "http://my-upload-host:2456/artifacts" + pub fn set_base_url(&mut self, base_url: url::Url) { + self.base_url = Some(base_url); + } + + pub fn store(&self) -> Arc> { + self.store.clone() + } + + /// Create a new JobArtifacts registration. The returned value is owned by Run. + /// When it is dropped, its key is automatically removed from the store. + pub fn add_new_job(&mut self) -> Option { + let base_url = self.base_url.as_ref()?; + let key = generate_unique_id(); + let mut upload_url = base_url.clone(); + upload_url.path_segments_mut().ok()?.push(&key); + // Ensure trailing slash so relative path resolution works correctly. + let upload_url_str = format!("{}/", upload_url); + + let inner = Arc::new(Mutex::new(JobArtifactsInner::new())); + self.store + .lock() + .unwrap() + .jobs + .insert(key.clone(), inner.clone()); + Some(JobArtifacts { + key, + upload_url: upload_url_str, + inner, + store: self.store.clone(), + }) + } +} + +fn generate_unique_id() -> String { + let bytes: [u8; 32] = rand::random(); + bytes.iter().map(|b| format!("{:02x}", b)).collect() +} + +/// Owned by Run. Dropped when Run is done — automatically deregisters from the store. +pub struct JobArtifacts { + key: String, + upload_url: String, + inner: Arc>, + store: Arc>, +} + +impl JobArtifacts { + pub fn upload_url(&self) -> &str { + &self.upload_url + } + + pub fn artifact_paths(&self) -> Vec { + self.inner + .lock() + .unwrap() + .artifacts + .keys() + .cloned() + .collect() + } + + pub fn artifact_data(&self, path: &str) -> Option { + self.inner + .lock() + .unwrap() + .artifacts + .get(path) + .cloned() + } +} + +impl Drop for JobArtifacts { + fn drop(&mut self) { + self.store.lock().unwrap().jobs.remove(&self.key); + } +} + +struct JobArtifactsInner { + artifacts: BTreeMap, +} + +impl JobArtifactsInner { + fn new() -> Self { + Self { + artifacts: BTreeMap::new(), + } + } + + fn upload_artifact(&mut self, path: &str, data: Bytes) { + self.artifacts.insert(path.to_string(), data); + } +} From b2400c130b96f82a1e061691f3cb47d52595663e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 14:39:59 +0000 Subject: [PATCH 3/4] Fix cargo fmt formatting issues causing CI failure Agent-Logs-Url: https://github.com/collabora/lava-gitlab-runner/sessions/c13029b6-3fd3-44cc-ac99-b9bf964efe56 Co-authored-by: sjoerdsimons <22603932+sjoerdsimons@users.noreply.github.com> --- src/main.rs | 48 ++++++++++++++++++------------------------------ src/upload.rs | 7 +------ 2 files changed, 19 insertions(+), 36 deletions(-) diff --git a/src/main.rs b/src/main.rs index 25af2a8..038e8db 100644 --- a/src/main.rs +++ b/src/main.rs @@ -361,29 +361,19 @@ impl UploadableFile for LavaUploadableFile { async fn get_data(&self) -> Result, ()> { outputln!("Uploading {}", self.get_path()); match &self.which { - LavaUploadableFileType::Log { id } => { - Ok(Box::new( - self.store - .as_ref() - .unwrap() - .get_log(*id) - .into_async_read(), - )) - } - LavaUploadableFileType::Junit { id } => { - Ok(Box::new( - self.store - .as_ref() - .unwrap() - .get_junit(*id) - .into_async_read(), - )) - } - LavaUploadableFileType::Artifact { data, .. } => { - Ok(Box::new(futures::io::AllowStdIo::new( - std::io::Cursor::new(data.to_vec()), - ))) - } + LavaUploadableFileType::Log { id } => Ok(Box::new( + self.store.as_ref().unwrap().get_log(*id).into_async_read(), + )), + LavaUploadableFileType::Junit { id } => Ok(Box::new( + self.store + .as_ref() + .unwrap() + .get_junit(*id) + .into_async_read(), + )), + LavaUploadableFileType::Artifact { data, .. } => Ok(Box::new( + futures::io::AllowStdIo::new(std::io::Cursor::new(data.to_vec())), + )), } } } @@ -783,9 +773,10 @@ impl Run { "submit" => { if let Some(filename) = p.next() { let data = self.find_file(filename).await?; - let artifacts = self.upload_server.as_ref().and_then(|s| { - s.lock().unwrap().add_new_job() - }); + let artifacts = self + .upload_server + .as_ref() + .and_then(|s| s.lock().unwrap().add_new_job()); let upload_url = artifacts .as_ref() .map(|a| a.upload_url().to_string()) @@ -1076,10 +1067,7 @@ async fn main() { }; runner - .run( - move |job| new_job(job, upload_server.clone()), - 64, - ) + .run(move |job| new_job(job, upload_server.clone()), 64) .await .expect("Couldn't pick up jobs"); } diff --git a/src/upload.rs b/src/upload.rs index 8f9b8f0..76347b2 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -106,12 +106,7 @@ impl JobArtifacts { } pub fn artifact_data(&self, path: &str) -> Option { - self.inner - .lock() - .unwrap() - .artifacts - .get(path) - .cloned() + self.inner.lock().unwrap().artifacts.get(path).cloned() } } From a9c59c3d9dbe07475c91259a12fe91c57103ec35 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 14:56:00 +0000 Subject: [PATCH 4/4] Address review: path sanitization, disk caching, size limits, proper HTTP status codes Agent-Logs-Url: https://github.com/collabora/lava-gitlab-runner/sessions/439b5174-50be-4263-8ccd-e88b204f9251 Co-authored-by: sjoerdsimons <22603932+sjoerdsimons@users.noreply.github.com> --- src/main.rs | 63 ++++++++++++++----- src/upload.rs | 166 +++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 199 insertions(+), 30 deletions(-) diff --git a/src/main.rs b/src/main.rs index 038e8db..0e2e8dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,9 @@ use std::sync::{Arc, LazyLock, Mutex}; use std::time::Duration; use axum::Router; -use axum::extract::{Path, State}; +use axum::extract::{DefaultBodyLimit, Path, State}; +use axum::http::StatusCode; +use axum::response::IntoResponse; use axum::routing::post; use bytes::{Buf, Bytes}; use clap::Parser; @@ -29,7 +31,7 @@ use strum::{Display, EnumString}; use tokio::time::sleep; use tokio_util::sync::CancellationToken; use tracing::Level; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; use tracing_subscriber::filter; use tracing_subscriber::prelude::*; use url::Url; @@ -38,7 +40,7 @@ mod throttled; use throttled::{ThrottledLava, Throttler}; mod upload; -use upload::{JobArtifacts, UploadServer, UploadStore}; +use upload::{ArtifactFile, JobArtifacts, UploadError, UploadServer, UploadStore}; const MASK_PATTERN: &str = "[MASKED]"; @@ -296,13 +298,29 @@ impl AvailableArtifactStore { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug)] enum LavaUploadableFileType { Log { id: i64 }, Junit { id: i64 }, - Artifact { path: String, data: Bytes }, + Artifact { path: String, file: ArtifactFile }, } +impl PartialEq for LavaUploadableFileType { + /// Equality is based on the identity key of each variant (job ID for `Log`/`Junit`, + /// artifact path for `Artifact`). The file content is intentionally excluded from + /// the comparison because `ArtifactFile` may be backed by a temp file on disk. + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Log { id: a }, Self::Log { id: b }) => a == b, + (Self::Junit { id: a }, Self::Junit { id: b }) => a == b, + (Self::Artifact { path: a, .. }, Self::Artifact { path: b, .. }) => a == b, + _ => false, + } + } +} + +impl Eq for LavaUploadableFileType {} + #[derive(Clone)] struct LavaUploadableFile { store: Option>, @@ -338,9 +356,9 @@ impl LavaUploadableFile { } } - pub fn artifact(path: String, data: Bytes) -> Self { + pub fn artifact(path: String, file: ArtifactFile) -> Self { Self { - which: LavaUploadableFileType::Artifact { path, data }, + which: LavaUploadableFileType::Artifact { path, file }, store: None, } } @@ -371,9 +389,12 @@ impl UploadableFile for LavaUploadableFile { .get_junit(*id) .into_async_read(), )), - LavaUploadableFileType::Artifact { data, .. } => Ok(Box::new( - futures::io::AllowStdIo::new(std::io::Cursor::new(data.to_vec())), - )), + LavaUploadableFileType::Artifact { file, .. } => { + let reader = file.open_std_reader().map_err(|e| { + outputln!("Failed to open artifact for upload: {}", e); + })?; + Ok(Box::new(futures::io::AllowStdIo::new(reader))) + } } } } @@ -865,10 +886,10 @@ impl CancellableJobHandler for Run { } if let Some(artifacts) = &self.artifacts { for path in artifacts.artifact_paths() { - let data = artifacts - .artifact_data(&path) - .expect("Artifact data missing for path returned by artifact_paths"); - available_files.push(LavaUploadableFile::artifact(path, data)); + let file = artifacts + .artifact_file(&path) + .expect("Artifact file missing for path returned by artifact_paths"); + available_files.push(LavaUploadableFile::artifact(path, file)); } } Ok(Box::new(available_files.into_iter())) @@ -963,8 +984,17 @@ async fn upload_artifact( State(store): State>>, Path((key, path)): Path<(String, String)>, body: Bytes, -) { - store.lock().unwrap().upload_file(&key, &path, body); +) -> impl IntoResponse { + match store.lock().unwrap().upload_file(&key, &path, body) { + Ok(()) => StatusCode::OK, + Err(UploadError::UnknownKey) => StatusCode::NOT_FOUND, + Err(UploadError::InvalidPath) => StatusCode::BAD_REQUEST, + Err(UploadError::LimitExceeded) => StatusCode::PAYLOAD_TOO_LARGE, + Err(UploadError::Io(e)) => { + warn!("IO error storing artifact for key {:?}: {}", key, e); + StatusCode::INTERNAL_SERVER_ERROR + } + } } #[tokio::main] @@ -1045,6 +1075,7 @@ async fn main() { let app = Router::new() .route("/artifacts/{key}/{*path}", post(upload_artifact)) + .layer(DefaultBodyLimit::max(upload::ARTIFACT_JOB_LIMIT as usize)) .with_state(store); let listener = tokio::net::TcpListener::bind(&listen_addr) diff --git a/src/upload.rs b/src/upload.rs index 76347b2..673ec59 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -1,9 +1,97 @@ use std::collections::BTreeMap; +use std::io::Write; +use std::path::Component; use std::sync::{Arc, Mutex}; use bytes::Bytes; +use tempfile::NamedTempFile; use tracing::warn; +/// Files smaller than this are kept in memory; larger files are spilled to disk. +const ARTIFACT_MEMORY_THRESHOLD: u64 = 1024 * 1024; // 1 MB + +/// Maximum total bytes accepted across all uploads for a single job. +pub const ARTIFACT_JOB_LIMIT: u64 = 1024 * 1024 * 1024; // 1 GB + +/// Sanitize an artifact upload path to a safe relative path. +/// +/// - Rejects absolute paths, `..` components, and empty paths. +/// - Strips redundant `.` components. +/// - Returns the normalized path joined with `/`, or `None` if the path is invalid. +pub fn sanitize_artifact_path(path: &str) -> Option { + let p = std::path::Path::new(path); + let mut components = Vec::new(); + for component in p.components() { + match component { + Component::Normal(name) => components.push(name.to_string_lossy().into_owned()), + Component::CurDir => {} // skip redundant . + Component::ParentDir | Component::RootDir | Component::Prefix(_) => return None, + } + } + if components.is_empty() { + return None; + } + Some(components.join("/")) +} + +/// Errors returned by [`UploadStore::upload_file`]. +pub enum UploadError { + /// No job is registered under the given key. + UnknownKey, + /// The supplied path failed sanitization. + InvalidPath, + /// Accepting this upload would exceed the per-job byte limit. + LimitExceeded, + /// An I/O error occurred while spilling the artifact to disk. + Io(std::io::Error), +} + +// --------------------------------------------------------------------------- +// ArtifactFile — memory or disk backed +// --------------------------------------------------------------------------- + +enum ArtifactFileInner { + Memory(Bytes), + Disk(NamedTempFile), +} + +/// An uploaded artifact stored either in memory (small files) or on disk (large files). +/// +/// Cloning is cheap — it increments an [`Arc`] reference count without copying data. +#[derive(Clone)] +pub struct ArtifactFile(Arc); + +impl ArtifactFile { + fn new_memory(data: Bytes) -> Self { + Self(Arc::new(ArtifactFileInner::Memory(data))) + } + + fn new_disk(f: NamedTempFile) -> Self { + Self(Arc::new(ArtifactFileInner::Disk(f))) + } + + /// Open a synchronous reader for the artifact data. + pub fn open_std_reader(&self) -> std::io::Result> { + match &*self.0 { + ArtifactFileInner::Memory(bytes) => Ok(Box::new(std::io::Cursor::new(bytes.clone()))), + ArtifactFileInner::Disk(f) => Ok(Box::new(std::fs::File::open(f.path())?)), + } + } +} + +impl std::fmt::Debug for ArtifactFile { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &*self.0 { + ArtifactFileInner::Memory(b) => write!(f, "ArtifactFile::Memory({} bytes)", b.len()), + ArtifactFileInner::Disk(file) => write!(f, "ArtifactFile::Disk({:?})", file.path()), + } + } +} + +// --------------------------------------------------------------------------- +// UploadStore +// --------------------------------------------------------------------------- + /// Shared state between the HTTP server and the job handlers. pub struct UploadStore { jobs: BTreeMap>>, @@ -17,18 +105,25 @@ impl UploadStore { } /// Called by the axum handler to store an uploaded file. - pub fn upload_file(&mut self, key: &str, path: &str, data: Bytes) { + /// + /// Returns [`UploadError::InvalidPath`] if `path` fails sanitization, + /// [`UploadError::UnknownKey`] if no job is registered under `key`, or + /// [`UploadError::LimitExceeded`] if the per-job quota would be exceeded. + pub fn upload_file(&mut self, key: &str, path: &str, data: Bytes) -> Result<(), UploadError> { + let sanitized = sanitize_artifact_path(path).ok_or(UploadError::InvalidPath)?; if let Some(ja) = self.jobs.get(key) { - ja.lock().unwrap().upload_artifact(path, data); + ja.lock().unwrap().upload_artifact(&sanitized, data) } else { - warn!( - "Ignoring attempt to upload {} for non-existent job key", - path - ); + warn!("Attempt to upload {:?} for non-existent job key", path); + Err(UploadError::UnknownKey) } } } +// --------------------------------------------------------------------------- +// UploadServer +// --------------------------------------------------------------------------- + pub struct UploadServer { base_url: Option, store: Arc>, @@ -43,7 +138,7 @@ impl UploadServer { } /// Set the externally-routable base URL for the upload server. - /// E.g. "http://my-upload-host:2456/artifacts" + /// E.g. `"http://my-upload-host:2456/artifacts"` pub fn set_base_url(&mut self, base_url: url::Url) { self.base_url = Some(base_url); } @@ -52,7 +147,7 @@ impl UploadServer { self.store.clone() } - /// Create a new JobArtifacts registration. The returned value is owned by Run. + /// Create a new [`JobArtifacts`] registration. The returned value is owned by `Run`. /// When it is dropped, its key is automatically removed from the store. pub fn add_new_job(&mut self) -> Option { let base_url = self.base_url.as_ref()?; @@ -82,7 +177,11 @@ fn generate_unique_id() -> String { bytes.iter().map(|b| format!("{:02x}", b)).collect() } -/// Owned by Run. Dropped when Run is done — automatically deregisters from the store. +// --------------------------------------------------------------------------- +// JobArtifacts — RAII handle owned by Run +// --------------------------------------------------------------------------- + +/// Owned by `Run`. Dropping it automatically deregisters the job key from the store. pub struct JobArtifacts { key: String, upload_url: String, @@ -105,8 +204,13 @@ impl JobArtifacts { .collect() } - pub fn artifact_data(&self, path: &str) -> Option { - self.inner.lock().unwrap().artifacts.get(path).cloned() + pub fn artifact_file(&self, path: &str) -> Option { + self.inner + .lock() + .unwrap() + .artifacts + .get(path) + .map(|(_, f)| f.clone()) } } @@ -116,18 +220,52 @@ impl Drop for JobArtifacts { } } +// --------------------------------------------------------------------------- +// JobArtifactsInner +// --------------------------------------------------------------------------- + struct JobArtifactsInner { - artifacts: BTreeMap, + /// Maps sanitized path → (byte size, stored file). + artifacts: BTreeMap, + /// Running total of bytes currently stored (decreases on overwrite). + total_bytes: u64, } impl JobArtifactsInner { fn new() -> Self { Self { artifacts: BTreeMap::new(), + total_bytes: 0, } } - fn upload_artifact(&mut self, path: &str, data: Bytes) { - self.artifacts.insert(path.to_string(), data); + fn upload_artifact(&mut self, path: &str, data: Bytes) -> Result<(), UploadError> { + let data_len = data.len() as u64; + // Subtract any existing file at this path from the running total so that + // overwrites don't permanently consume quota for the old content. + let old_len = self.artifacts.get(path).map(|(size, _)| *size).unwrap_or(0); + let new_total = self + .total_bytes + .saturating_sub(old_len) + .saturating_add(data_len); + if new_total > ARTIFACT_JOB_LIMIT { + return Err(UploadError::LimitExceeded); + } + // For files at or above the memory threshold, spill to a temp file to + // avoid keeping large payloads in RAM. Note: the incoming `data` buffer + // is already fully in memory at this point (axum's `Bytes` extractor); + // this avoids a second large heap allocation but does not eliminate the + // first one. A future improvement could stream the body directly to disk. + let artifact_file = if data_len >= ARTIFACT_MEMORY_THRESHOLD { + let mut f = NamedTempFile::new().map_err(UploadError::Io)?; + f.write_all(&data).map_err(UploadError::Io)?; + ArtifactFile::new_disk(f) + } else { + ArtifactFile::new_memory(data) + }; + self.total_bytes = new_total; + self.artifacts + .insert(path.to_string(), (data_len, artifact_file)); + Ok(()) } }