diff --git a/crates/fula-cli/src/config.rs b/crates/fula-cli/src/config.rs index 0720620..2671f74 100644 --- a/crates/fula-cli/src/config.rs +++ b/crates/fula-cli/src/config.rs @@ -58,6 +58,14 @@ pub struct GatewayConfig { pub cors_origins: Vec, /// Path to store bucket registry CID for persistence pub registry_cid_path: Option, + /// Phase 2 (decentralized ingress): accept empty-body chunk PUTs carrying + /// `x-amz-meta-fula-remote-cid` — the chunk's verified bytes were already + /// streamed to a fula-ingest node; this master records only the + /// key→cid mapping (after confirming block presence via the blockstore). + /// Advertised to clients via GET /fula/capabilities so old/flag-off + /// masters are never sent the protocol. Default OFF. + #[serde(default)] + pub remote_cid_put_enabled: bool, /// Storage API URL for balance/quota checking before uploads pub storage_api_url: Option, /// Admin JWT secret for admin API authentication (separate from user JWT) @@ -198,6 +206,7 @@ impl Default for GatewayConfig { cors_enabled: true, cors_origins: vec!["*".to_string()], registry_cid_path: Some("/var/lib/fula-gateway/registry.cid".to_string()), + remote_cid_put_enabled: false, storage_api_url: None, admin_jwt_secret: None, admin_api_enabled: false, diff --git a/crates/fula-cli/src/handlers/object.rs b/crates/fula-cli/src/handlers/object.rs index 6aa9c9b..c4a75ea 100644 --- a/crates/fula-cli/src/handlers/object.rs +++ b/crates/fula-cli/src/handlers/object.rs @@ -129,8 +129,104 @@ pub async fn put_object( } } - // Store the data - let cid = state.block_store.put_block(&body).await?; + // Phase 2 (decentralized ingress): empty-body mapping PUT. The chunk's + // verified bytes were already streamed to a fula-ingest node; record only + // the key→cid mapping here. Triple-gated: server flag (advertised via + // /fula/capabilities, so well-behaved clients never send this to a + // flag-off master), the header, and an empty body. The declared CID must + // be raw(0x55)+blake3(0x1e) — the only addressing the ingest verifies — + // and the block must be PRESENT in the blockstore (bitswap pulls it from + // the ingest peer); absent ⇒ retryable 409 so the client falls back to a + // full-bytes PUT. Storing nothing here is the point: the CID is + // self-certifying, so reads stay tamper-evident end-to-end. + let remote_cid_hdr = headers + .get("x-amz-meta-fula-remote-cid") + .and_then(|v| v.to_str().ok()) + .map(str::to_owned); + let cid = if let Some(declared) = remote_cid_hdr { + if !state.config.remote_cid_put_enabled { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "remote-cid PUT is not enabled on this master (probe /fula/capabilities)", + )); + } + if !body.is_empty() { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "remote-cid PUT requires an empty body", + )); + } + let declared_cid = cid::Cid::try_from(declared.as_str()).map_err(|e| { + ApiError::s3(S3ErrorCode::InvalidRequest, format!("invalid remote cid: {e}")) + })?; + if declared_cid.codec() != 0x55 || declared_cid.hash().code() != 0x1e { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "remote cid must be raw (0x55) + blake3 (0x1e) — the addressing fula-ingest verifies", + )); + } + // BOUNDED presence check: kubo's block lookup for an ABSENT cid is an + // unbounded network search (bitswap/DHT) — without a deadline this + // handler would hang for minutes per missing block. 5s is ample for a + // bitswap pull from the ingest peer (same box: instant; LAN/WAN peer: + // one round-trip); timeout ⇒ treat as absent ⇒ retryable 409, the + // client falls back to a full-bytes PUT. + match tokio::time::timeout( + std::time::Duration::from_secs(5), + state.block_store.has_block(&declared_cid), + ) + .await + { + Ok(Ok(true)) => {} + Ok(Ok(false)) => { + return Err(ApiError::s3( + S3ErrorCode::OperationAborted, + "declared block not present/retrievable yet — retry or fall back to a full-bytes PUT", + )); + } + Ok(Err(e)) => { + tracing::warn!(error = %e, cid = %declared_cid, "remote-cid presence check failed"); + return Err(ApiError::s3( + S3ErrorCode::OperationAborted, + "block presence check failed — retry or fall back to a full-bytes PUT", + )); + } + Err(_elapsed) => { + tracing::debug!(cid = %declared_cid, "remote-cid presence check timed out (treating as absent)"); + return Err(ApiError::s3( + S3ErrorCode::OperationAborted, + "block presence check timed out — retry or fall back to a full-bytes PUT", + )); + } + } + tracing::debug!(bucket = %bucket_name, key = %key, cid = %declared_cid, "remote-cid PUT accepted (bytes via ingest)"); + declared_cid + } else { + // Store the data + state.block_store.put_block(&body).await? + }; + // Phase 2: a mapping PUT has an empty body — the chunk's TRUE size (as + // stored on the ingest node) arrives in x-amz-meta-fula-remote-size so + // billing/list/stat record the real byte count, never 0. Only honored on + // the remote-cid path; bounded by max_body_size like a real body. + let object_size: u64 = match ( + headers.get("x-amz-meta-fula-remote-size").and_then(|v| v.to_str().ok()), + headers.get("x-amz-meta-fula-remote-cid").is_some(), + ) { + (Some(sz), true) => { + let parsed = sz.parse::().map_err(|_| { + ApiError::s3(S3ErrorCode::InvalidRequest, "invalid x-amz-meta-fula-remote-size") + })?; + if parsed == 0 || parsed > state.config.max_body_size as u64 { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "x-amz-meta-fula-remote-size out of range", + )); + } + parsed + } + _ => body.len() as u64, + }; // Use CID as ETag (content-addressed identifier) // This is S3-compliant: AWS docs state "The ETag may or may not be an MD5 digest" @@ -165,7 +261,7 @@ pub async fn put_object( // COPY handler at line 694). Access control is unaffected — bucket- // level access is gated by `BucketMetadata.owner_id` which is // already correctly the hashed form. - let mut metadata = ObjectMetadata::new(cid, body.len() as u64, etag.clone()) + let mut metadata = ObjectMetadata::new(cid, object_size, etag.clone()) .with_owner(&session.hashed_user_id); if let Some(ct) = content_type { @@ -863,7 +959,18 @@ pub async fn get_object( response = response.header("Content-Type", ct); } - if let Some(ref cc) = metadata.cache_control { + if is_mutable_index_key(&key) { + // Forced for the fixed-key forest index objects, overriding any + // per-object metadata: browsers HEURISTICALLY cache responses + // that carry Last-Modified but no Cache-Control, so web (wasm) + // clients kept reading a stale forest root for minutes after + // another device's upload — native apps (no HTTP cache) were + // fine. `no-cache` (not no-store) forces revalidation on every + // use; the If-None-Match → 304 path above makes an unchanged + // index cost one conditional round trip, and the ETag is the + // exact CID so a same-second overwrite can't false-304. + response = response.header("Cache-Control", "private, no-cache"); + } else if let Some(ref cc) = metadata.cache_control { response = response.header("Cache-Control", cc); } @@ -1093,6 +1200,12 @@ pub async fn head_object( response = response.header("Content-Type", ct); } + if is_mutable_index_key(&key) { + // Same forced revalidation as the GET path — HEAD responses + // update the browser's cached entry metadata too. + response = response.header("Cache-Control", "private, no-cache"); + } + // Add user metadata for (k, v) in &metadata.user_metadata { response = response.header(format!("x-amz-meta-{}", k), v); @@ -1354,6 +1467,26 @@ pub(crate) fn match_if_match(header: &str, current: Option<&str>) -> bool { result } +/// Mutable fula-internal index objects live at FIXED keys and are +/// overwritten in place — the sharded forest index / dir-index +/// (`__fula_forest_v7_index`, `__fula_forest_v7_dir_index`) and any +/// legacy fixed-key forest blob. Their GET/HEAD responses must force +/// `Cache-Control: private, no-cache`, or browsers heuristically cache +/// them (Last-Modified present, no Cache-Control) and wasm clients read +/// a stale forest root for minutes after another device's write. +/// +/// Immutable-by-construction families stay normally cacheable: their +/// key never carries different bytes (content-addressed nodes/pages, +/// timestamped v1 backups). +pub(crate) fn is_mutable_index_key(key: &str) -> bool { + if !key.starts_with("__fula_") { + return false; + } + !(key.starts_with("__fula_forest_v7_nodes/") + || key.starts_with("__fula_forest_v7_pages/") + || key.starts_with("__fula_forest_v1_backup/")) +} + /// RFC 7232 §3.2. True iff the If-None-Match precondition is satisfied. pub(crate) fn match_if_none_match(header: &str, current: Option<&str>) -> bool { let h = header.trim(); @@ -1399,6 +1532,11 @@ pub(crate) const FULA_CONTROL_HEADERS: &[&str] = &[ // as user_metadata on the object itself (would pollute every // forest-manifest object with a meaningless `=1` tag). "fula-forest-manifest", + // Phase 2 (decentralized ingress): remote-cid mapping PUT controls — + // consumed by the handler (declared CID + true byte size of the chunk + // stored on the ingest node); never persisted as object metadata. + "fula-remote-cid", + "fula-remote-size", ]; /// Returns `true` if the given x-amz-meta key (already stripped of @@ -1636,7 +1774,7 @@ mod phase_1_2_wire_tests { #[cfg(test)] mod conditional_tests { - use super::{match_if_match, match_if_none_match}; + use super::{is_mutable_index_key, match_if_match, match_if_none_match}; #[test] fn if_match_star_requires_existing() { @@ -1681,6 +1819,25 @@ mod conditional_tests { assert!(match_if_none_match("W/\"abc\"", Some("abc"))); } + #[test] + fn mutable_index_keys_force_no_cache() { + // Fixed-key, overwritten-in-place index objects → forced. + assert!(is_mutable_index_key("__fula_forest_v7_index")); + assert!(is_mutable_index_key("__fula_forest_v7_dir_index")); + assert!(is_mutable_index_key("__fula_forest_v1")); + // Unknown future __fula_* state defaults to forced (safe bias). + assert!(is_mutable_index_key("__fula_something_new")); + // Immutable families keep normal caching. + assert!(!is_mutable_index_key("__fula_forest_v7_nodes/abc123")); + assert!(!is_mutable_index_key("__fula_forest_v7_pages/0001")); + assert!(!is_mutable_index_key( + "__fula_forest_v1_backup/1700000000000" + )); + // User content is untouched. + assert!(!is_mutable_index_key("photos/cat.jpg")); + assert!(!is_mutable_index_key(".fula/tags/abcd.json")); + } + #[test] fn empty_or_whitespace_header() { assert!(!match_if_match("", Some("abc"))); diff --git a/crates/fula-cli/src/handlers/service.rs b/crates/fula-cli/src/handlers/service.rs index d8e70b4..15e34f8 100644 --- a/crates/fula-cli/src/handlers/service.rs +++ b/crates/fula-cli/src/handlers/service.rs @@ -46,3 +46,14 @@ pub async fn health_check() -> impl IntoResponse { pub async fn healthz() -> impl IntoResponse { (StatusCode::OK, "ok") } + +/// GET /fula/capabilities — unauthenticated protocol-capability advertisement +/// (Phase 2). Clients probe this ONCE per instance before using optional +/// protocols. An old master 404s here, so a client never sends e.g. the +/// empty-body remote-cid PUT to a build that would misstore it as a real +/// zero-byte object. +pub async fn capabilities(State(state): State>) -> impl IntoResponse { + axum::Json(serde_json::json!({ + "remoteCidPut": state.config.remote_cid_put_enabled, + })) +} diff --git a/crates/fula-cli/src/main.rs b/crates/fula-cli/src/main.rs index bd1a8c1..ca683ad 100644 --- a/crates/fula-cli/src/main.rs +++ b/crates/fula-cli/src/main.rs @@ -92,6 +92,12 @@ struct Args { /// the local-retain backlog (the ongoing per-upload protection still runs). #[arg(long, env = "FULA_NO_LOCAL_RETAIN_BACKFILL")] no_local_retain_backfill: bool, + + /// Phase 2 (decentralized ingress): accept empty-body chunk PUTs carrying + /// x-amz-meta-fula-remote-cid (bytes pre-stored on a fula-ingest node). + /// Advertised to clients via GET /fula/capabilities. Default OFF. + #[arg(long, env = "FULA_REMOTE_CID_PUT")] + remote_cid_put: bool, } #[tokio::main] @@ -163,6 +169,7 @@ async fn main() -> anyhow::Result<()> { cluster_peering_enabled: if args.no_cluster_peering { Some(false) } else { None }, local_retain_enabled: if args.no_local_retain { Some(false) } else { None }, local_retain_backfill: if args.no_local_retain_backfill { Some(false) } else { None }, + remote_cid_put_enabled: args.remote_cid_put, ..Default::default() }; diff --git a/crates/fula-cli/src/routes.rs b/crates/fula-cli/src/routes.rs index 09b256d..cd7c054 100644 --- a/crates/fula-cli/src/routes.rs +++ b/crates/fula-cli/src/routes.rs @@ -24,7 +24,12 @@ pub fn create_router(state: Arc) -> Router { let cors = create_cors_layer(&state.config.cors_origins); // Public routes that must bypass auth (e.g., container health checks) - let public = Router::new().route("/healthz", get(handlers::healthz)); + let public = Router::new() + .route("/healthz", get(handlers::healthz)) + // Phase 2: capability probe (no auth — the answer is not sensitive and + // clients must be able to probe before any authenticated operation). + .route("/fula/capabilities", get(handlers::capabilities)) + .with_state(state.clone()); // Phase 3.2 internal endpoints. Bearer-token-protected at the // handler level (see handlers::internal::authenticate). They diff --git a/crates/fula-client/src/client.rs b/crates/fula-client/src/client.rs index 1b2086b..f67791a 100644 --- a/crates/fula-client/src/client.rs +++ b/crates/fula-client/src/client.rs @@ -522,6 +522,24 @@ impl FulaClient { }) } + /// Phase 2 (decentralized ingress): one-shot master capability probe. + /// Returns whether the master accepts empty-body remote-cid mapping PUTs + /// (GET /fula/capabilities → {"remoteCidPut": bool}). ANY failure — 404 on + /// an old build, network error, unparseable body — returns `false`, so + /// callers degrade to the legacy full-bytes path, never to a protocol the + /// master would misinterpret. + pub async fn supports_remote_cid_put(&self) -> bool { + match self.request("GET", "/fula/capabilities", None, None, None).await { + Ok(resp) => resp + .json::() + .await + .ok() + .and_then(|v| v.get("remoteCidPut").and_then(|b| b.as_bool())) + .unwrap_or(false), + Err(_) => false, + } + } + /// Put an object with metadata and optional If-Match / If-None-Match guards. /// /// Used by conditional-write paths (e.g. forest flush) to detect concurrent diff --git a/crates/fula-client/src/config.rs b/crates/fula-client/src/config.rs index dfa4980..1660340 100644 --- a/crates/fula-client/src/config.rs +++ b/crates/fula-client/src/config.rs @@ -195,6 +195,18 @@ pub struct Config { /// targeted regressions or backward-compat tests). pub walkable_v8_writer_enabled: bool, + /// Phase 2 (decentralized ingress) — optional fula-ingest endpoints. + /// When non-empty AND `walkable_v8_writer_enabled` AND the master + /// advertises `remoteCidPut` via GET /fula/capabilities (probed once per + /// chunked upload), chunk ciphertext BYTES are streamed to the first + /// ingest endpoint (`PUT /v0/block?cid=` — the node verifies + /// the CID before storing) and the master receives only an empty-body + /// mapping PUT (`x-amz-meta-fula-remote-cid` + true size). ANY failure on + /// that route falls back transparently to the legacy full-bytes PUT. + /// Empty (default) = behavior byte-identical to before. Native-only + /// (ignored on wasm32). + pub ingest_endpoints: Vec, + /// Phase 19 — optional health-status callback. When set, the SDK /// invokes this closure on every Up↔Down transition of the /// master health gate (`MasterHealthEvent::Online` / @@ -330,6 +342,8 @@ impl Default for Config { // flipping master-side gates until SDK adoption reaches the // % they're comfortable with for the pre-v0.6 reader cost. walkable_v8_writer_enabled: true, + // Phase 2 — no ingest nodes by default (legacy byte path). + ingest_endpoints: Vec::new(), // Phase 19 — no callback by default (silent gate). health_callback: None, // E2E plan Phase 4 — Mode B/C signed-entry writer disabled @@ -358,6 +372,13 @@ impl Config { self } + /// Phase 2: route chunk bytes through fula-ingest node(s) when the master + /// supports remote-cid mapping PUTs. Empty = legacy behavior. + pub fn with_ingest_endpoints(mut self, endpoints: Vec) -> Self { + self.ingest_endpoints = endpoints; + self + } + /// Enable encryption pub fn with_encryption(mut self) -> Self { self.encryption_enabled = true; diff --git a/crates/fula-client/src/encryption.rs b/crates/fula-client/src/encryption.rs index 10359a6..90d8edb 100644 --- a/crates/fula-client/src/encryption.rs +++ b/crates/fula-client/src/encryption.rs @@ -7083,6 +7083,31 @@ impl EncryptedClient { // (skip_serializing_if keeps it off the wire). let walkable_v8 = self.inner.config().walkable_v8_writer_enabled; + // Phase 2 (decentralized ingress): decide the byte route ONCE per + // upload. The ingest route activates only when (a) ingest endpoints + // are configured, (b) the walkable-v8 writer is on (we need the + // pre-computed chunk CID to declare), and (c) the master ADVERTISES + // remote-cid support via /fula/capabilities — an old or flag-off + // master is never sent the empty-body protocol it would misstore. + // (cfg-gated: native only; wasm32 keeps the legacy path.) + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx: Option<(reqwest::Client, String, Option)> = { + let cfg = self.inner.config(); + if cfg.ingest_endpoints.is_empty() || !walkable_v8 { + None + } else if self.inner.supports_remote_cid_put().await { + tracing::debug!(ingest = %cfg.ingest_endpoints[0], "ingest byte route enabled (master advertises remoteCidPut)"); + Some(( + reqwest::Client::new(), + cfg.ingest_endpoints[0].trim_end_matches('/').to_string(), + cfg.access_token.clone(), + )) + } else { + tracing::debug!("master does not advertise remoteCidPut — ingest route disabled, using legacy byte path"); + None + } + }; + // Upload chunks in parallel with bounded concurrency. Using // futures::stream::buffer_unordered rather than tokio::spawn so the // same code runs on wasm32 (where tokio has no multi-thread runtime). @@ -7111,8 +7136,61 @@ impl EncryptedClient { let bucket = bucket.to_string(); let pinning = pinning.clone(); let chunk_key_ret = chunk_key.clone(); + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx = ingest_ctx.clone(); async move { + // Phase 2: ingest byte route. Bytes go to the fula-ingest node + // (which verifies BLAKE3(body) == declared CID before storing) + // and the master gets an empty-body mapping PUT carrying the + // declared CID + true size. ANY failure on this route falls + // through to the legacy full-bytes PUT below — degradation, + // never corruption (the master only accepts the mapping when + // its flag is on AND the block is present in its blockstore). + #[cfg(not(target_arch = "wasm32"))] + if let (Some((ingest_http, ingest_base, token)), Some(expected)) = + (ingest_ctx.as_ref(), expected_chunk_cid) + { + let url = format!("{}/v0/block?cid={}", ingest_base, expected); + let mut rb = ingest_http.put(&url).body(chunk.ciphertext.clone()); + if let Some(t) = token { + rb = rb.bearer_auth(t); + } + match rb.send().await { + Ok(r) if r.status().is_success() => { + let map_meta = ObjectMetadata::new() + .with_content_type("application/octet-stream") + .with_metadata("x-fula-chunk", "true") + .with_metadata("x-fula-chunk-index", &chunk.index.to_string()) + .with_metadata("fula-remote-cid", &expected.to_string()) + .with_metadata("fula-remote-size", &chunk.ciphertext.len().to_string()); + let map_res = if let Some(ref pin) = pinning { + client.put_object_with_metadata_and_pinning( + &bucket, &chunk_key, bytes::Bytes::new(), Some(map_meta), + &pin.endpoint, &pin.token, + ).await + } else { + client.put_object_with_metadata( + &bucket, &chunk_key, bytes::Bytes::new(), Some(map_meta), + ).await + }; + match map_res { + Ok(put_result) => { + let chunk_cid = crate::walkable_v8::verify_etag_against_expected_cid( + &put_result.etag, expected, &bucket, &chunk_key, + ); + return Ok::<(String, u32, Option), ClientError>(( + chunk_key_ret, chunk_index_for_collect, chunk_cid, + )); + } + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "remote-cid mapping PUT failed — falling back to full-bytes PUT"), + } + } + Ok(r) => tracing::debug!(status = %r.status(), chunk = %chunk_key, "ingest rejected block — falling back to full-bytes PUT"), + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "ingest unreachable — falling back to full-bytes PUT"), + } + } + let put_result = if let Some(ref pin) = pinning { client.put_object_with_metadata_and_pinning( &bucket, @@ -10515,6 +10593,28 @@ impl EncryptedClient { // master's etag-attested CID matches. let walkable_v8 = self.inner.config().walkable_v8_writer_enabled; + // Phase 2 (decentralized ingress): same probe-gated ingest route as + // `put_object_chunked_internal` — this public API duplicates that + // loop (E51 mirroring pattern), so it must mirror the route too or + // FxFiles-path uploads silently stay on the legacy byte path. + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx: Option<(reqwest::Client, String, Option)> = { + let cfg = self.inner.config(); + if cfg.ingest_endpoints.is_empty() || !walkable_v8 { + None + } else if self.inner.supports_remote_cid_put().await { + tracing::debug!(ingest = %cfg.ingest_endpoints[0], "ingest byte route enabled (master advertises remoteCidPut)"); + Some(( + reqwest::Client::new(), + cfg.ingest_endpoints[0].trim_end_matches('/').to_string(), + cfg.access_token.clone(), + )) + } else { + tracing::debug!("master does not advertise remoteCidPut — ingest route disabled, using legacy byte path"); + None + } + }; + // Upload chunks in parallel with bounded concurrency. Using // futures::stream::buffer_unordered rather than tokio::spawn so the // same code runs on wasm32 (where tokio has no multi-thread runtime). @@ -10544,8 +10644,49 @@ impl EncryptedClient { let client = self.inner.clone(); let bucket = bucket.to_string(); let chunk_key_ret = chunk_key.clone(); + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx = ingest_ctx.clone(); async move { + // Phase 2: ingest byte route (see put_object_chunked_internal + // for the full rationale). ANY failure falls through to the + // legacy full-bytes PUT below. + #[cfg(not(target_arch = "wasm32"))] + if let (Some((ingest_http, ingest_base, token)), Some(expected)) = + (ingest_ctx.as_ref(), expected_chunk_cid) + { + let url = format!("{}/v0/block?cid={}", ingest_base, expected); + let mut rb = ingest_http.put(&url).body(chunk.ciphertext.clone()); + if let Some(t) = token { + rb = rb.bearer_auth(t); + } + match rb.send().await { + Ok(r) if r.status().is_success() => { + let map_meta = ObjectMetadata::new() + .with_content_type("application/octet-stream") + .with_metadata("x-fula-chunk", "true") + .with_metadata("x-fula-chunk-index", &chunk.index.to_string()) + .with_metadata("fula-remote-cid", &expected.to_string()) + .with_metadata("fula-remote-size", &chunk.ciphertext.len().to_string()); + match client.put_object_with_metadata( + &bucket, &chunk_key, bytes::Bytes::new(), Some(map_meta), + ).await { + Ok(put_result) => { + let chunk_cid = crate::walkable_v8::verify_etag_against_expected_cid( + &put_result.etag, expected, &bucket, &chunk_key, + ); + return Ok::<(String, u32, Option), ClientError>(( + chunk_key_ret, chunk_index_for_collect, chunk_cid, + )); + } + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "remote-cid mapping PUT failed — falling back to full-bytes PUT"), + } + } + Ok(r) => tracing::debug!(status = %r.status(), chunk = %chunk_key, "ingest rejected block — falling back to full-bytes PUT"), + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "ingest unreachable — falling back to full-bytes PUT"), + } + } + let put_result = client.put_object_with_metadata( &bucket, &chunk_key, diff --git a/crates/fula-client/tests/ingest_route.rs b/crates/fula-client/tests/ingest_route.rs new file mode 100644 index 0000000..3a27386 --- /dev/null +++ b/crates/fula-client/tests/ingest_route.rs @@ -0,0 +1,273 @@ +//! Phase 2 (decentralized ingress) — client routing matrix. +//! +//! Verifies the three load-bearing behaviors of `Config.ingest_endpoints`: +//! +//! 1. **Old master (no /fula/capabilities)** → the ingest route never +//! activates; every chunk goes to the master as a full-bytes PUT +//! (the misstore-on-old-master hazard the capability probe exists for). +//! 2. **Capable master + healthy ingest** → chunk BYTES go to the ingest +//! node; the master receives ONLY empty-body mapping PUTs carrying +//! `x-amz-meta-fula-remote-cid` (the declared blake3 CID) and +//! `x-amz-meta-fula-remote-size` (the true ciphertext size). +//! 3. **Capable master + dead ingest** → transparent fallback: the upload +//! still succeeds and the master sees full-bytes chunk PUTs. +//! +//! Chunk PUTs are identified by the `x-amz-meta-x-fula-chunk: true` header +//! the SDK stamps on every chunk (index objects / forest blobs lack it). + +#![cfg(not(target_arch = "wasm32"))] + +use bytes::Bytes; +use cid::multihash::Multihash; +use cid::Cid; +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate}; + +fn blake3_raw_cid(data: &[u8]) -> Cid { + let h = blake3::hash(data); + let mh = Multihash::<64>::wrap(0x1e, h.as_bytes()).expect("blake3 multihash wrap"); + Cid::new_v1(0x55, mh) +} + +/// Master-side recording responder. Mimics BOTH master generations: +/// for a normal bytes PUT it returns `ETag = blake3(body)` (what every +/// master does); for an empty-body mapping PUT it returns `ETag = +/// declared cid` (what a flag-on master does after its presence check). +/// Records per-class counters the assertions read back. +struct MasterResponder { + chunk_full_bodies: Arc, + chunk_mapping_puts: Arc, + mapping_size_headers: Arc>>, +} + +impl Respond for MasterResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + let is_chunk = req + .headers + .get("x-amz-meta-x-fula-chunk") + .map(|v| v.to_str().unwrap_or("") == "true") + .unwrap_or(false); + let remote_cid = req + .headers + .get("x-amz-meta-fula-remote-cid") + .and_then(|v| v.to_str().ok()) + .map(str::to_owned); + + if let Some(declared) = remote_cid { + // Mapping PUT: must be empty-bodied; echo the declared cid as ETag. + assert!( + req.body.is_empty(), + "mapping PUT must carry an EMPTY body, got {} bytes", + req.body.len() + ); + if is_chunk { + self.chunk_mapping_puts.fetch_add(1, Ordering::SeqCst); + } + if let Some(sz) = req + .headers + .get("x-amz-meta-fula-remote-size") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + { + self.mapping_size_headers.lock().unwrap().push(sz); + } + return ResponseTemplate::new(200).insert_header("ETag", declared.as_str()); + } + + if is_chunk && !req.body.is_empty() { + self.chunk_full_bodies.fetch_add(1, Ordering::SeqCst); + } + let cid = blake3_raw_cid(&req.body); + ResponseTemplate::new(200).insert_header("ETag", cid.to_string()) + } +} + +/// Ingest-side responder: verifies the declared cid matches the body +/// (what the real fula-ingest does) and counts accepted blocks + bytes. +struct IngestResponder { + accepted: Arc, + bytes_seen: Arc, +} + +impl Respond for IngestResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + let declared = req + .url + .query_pairs() + .find(|(k, _)| k == "cid") + .map(|(_, v)| v.to_string()) + .unwrap_or_default(); + let actual = blake3_raw_cid(&req.body).to_string(); + assert_eq!(declared, actual, "client must declare the true blake3 CID"); + self.accepted.fetch_add(1, Ordering::SeqCst); + self.bytes_seen.fetch_add(req.body.len(), Ordering::SeqCst); + ResponseTemplate::new(200) + .set_body_json(serde_json::json!({"cid": declared, "size": req.body.len()})) + } +} + +struct Counters { + chunk_full_bodies: Arc, + chunk_mapping_puts: Arc, + mapping_size_headers: Arc>>, + ingest_accepted: Arc, + ingest_bytes: Arc, +} + +/// Build (master, ingest, counters); `advertise_capability` controls whether +/// the master exposes /fula/capabilities (old vs new build). +async fn setup(advertise_capability: bool) -> (MockServer, MockServer, Counters) { + let master = MockServer::start().await; + let ingest = MockServer::start().await; + + let c = Counters { + chunk_full_bodies: Arc::new(AtomicUsize::new(0)), + chunk_mapping_puts: Arc::new(AtomicUsize::new(0)), + mapping_size_headers: Arc::new(Mutex::new(Vec::new())), + ingest_accepted: Arc::new(AtomicUsize::new(0)), + ingest_bytes: Arc::new(AtomicUsize::new(0)), + }; + + if advertise_capability { + Mock::given(method("GET")) + .and(path("/fula/capabilities")) + .respond_with( + ResponseTemplate::new(200).set_body_json(serde_json::json!({"remoteCidPut": true})), + ) + .mount(&master) + .await; + } + Mock::given(method("PUT")) + .respond_with(MasterResponder { + chunk_full_bodies: c.chunk_full_bodies.clone(), + chunk_mapping_puts: c.chunk_mapping_puts.clone(), + mapping_size_headers: c.mapping_size_headers.clone(), + }) + .mount(&master) + .await; + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(404)) + .mount(&master) + .await; + Mock::given(method("HEAD")) + .respond_with(ResponseTemplate::new(200)) + .mount(&master) + .await; + + Mock::given(method("PUT")) + .respond_with(IngestResponder { + accepted: c.ingest_accepted.clone(), + bytes_seen: c.ingest_bytes.clone(), + }) + .mount(&ingest) + .await; + + (master, ingest, c) +} + +fn client_for(master_uri: &str, ingest_uri: Option<&str>) -> EncryptedClient { + let mut config = Config::new(master_uri).with_token("test-jwt"); + config.walkable_v8_writer_enabled = true; + if let Some(uri) = ingest_uri { + config.ingest_endpoints = vec![uri.to_string()]; + } + let secret = SecretKey::generate(); + EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)).expect("client") +} + +/// 5 chunks of 64 KiB — well past the threshold with an explicit chunk size. +fn payload() -> Vec { + (0..(5 * 64 * 1024)).map(|i| (i % 251) as u8).collect() +} + +#[tokio::test] +async fn old_master_never_receives_the_new_protocol() { + let (master, ingest, c) = setup(false).await; + let client = client_for(&master.uri(), Some(&ingest.uri())); + + client + .put_object_chunked("bucket-a", "/file.bin", &payload(), Some(64 * 1024)) + .await + .expect("upload must succeed via legacy path"); + + assert_eq!( + c.ingest_accepted.load(Ordering::SeqCst), + 0, + "ingest must never be used when the master lacks the capability" + ); + assert_eq!(c.chunk_mapping_puts.load(Ordering::SeqCst), 0); + assert!( + c.chunk_full_bodies.load(Ordering::SeqCst) >= 5, + "all chunks must arrive at the master as full bodies" + ); +} + +#[tokio::test] +async fn capable_master_routes_bytes_through_ingest() { + let (master, ingest, c) = setup(true).await; + let client = client_for(&master.uri(), Some(&ingest.uri())); + + let data = payload(); + client + .put_object_chunked("bucket-b", "/file.bin", &data, Some(64 * 1024)) + .await + .expect("upload must succeed via ingest route"); + + let accepted = c.ingest_accepted.load(Ordering::SeqCst); + let mappings = c.chunk_mapping_puts.load(Ordering::SeqCst); + assert!(accepted >= 5, "chunk bytes must hit the ingest node (got {accepted})"); + assert_eq!( + c.chunk_full_bodies.load(Ordering::SeqCst), + 0, + "no chunk should arrive at the master as a full body" + ); + assert_eq!(accepted, mappings, "every ingest-accepted chunk needs its mapping PUT"); + // The true ciphertext sizes (plaintext + AEAD overhead) must be declared. + let sizes = c.mapping_size_headers.lock().unwrap(); + assert_eq!(sizes.len(), mappings); + assert!(sizes.iter().all(|&s| s > 0), "remote-size must never be 0"); + let total_ingest_bytes = c.ingest_bytes.load(Ordering::SeqCst) as u64; + assert_eq!( + sizes.iter().sum::(), + total_ingest_bytes, + "declared sizes must equal the bytes the ingest node stored" + ); +} + +#[tokio::test] +async fn dead_ingest_falls_back_to_full_bytes() { + let (master, _ingest, c) = setup(true).await; + // Point at a port nothing listens on — connection refused, fast. + let client = client_for(&master.uri(), Some("http://127.0.0.1:9")); + + client + .put_object_chunked("bucket-c", "/file.bin", &payload(), Some(64 * 1024)) + .await + .expect("upload must SUCCEED despite the dead ingest (transparent fallback)"); + + assert_eq!(c.chunk_mapping_puts.load(Ordering::SeqCst), 0); + assert!( + c.chunk_full_bodies.load(Ordering::SeqCst) >= 5, + "fallback must deliver every chunk as a full-bytes PUT" + ); +} + +#[tokio::test] +async fn no_ingest_configured_is_byte_identical_legacy() { + let (master, ingest, c) = setup(true).await; + let client = client_for(&master.uri(), None); + + client + .put_object_chunked("bucket-d", "/file.bin", &payload(), Some(64 * 1024)) + .await + .expect("legacy upload"); + + assert_eq!(c.ingest_accepted.load(Ordering::SeqCst), 0); + assert_eq!(c.chunk_mapping_puts.load(Ordering::SeqCst), 0); + assert!(c.chunk_full_bodies.load(Ordering::SeqCst) >= 5); + drop(ingest); +} diff --git a/crates/fula-client/tests/live_ingest_e2e.rs b/crates/fula-client/tests/live_ingest_e2e.rs new file mode 100644 index 0000000..3780aef --- /dev/null +++ b/crates/fula-client/tests/live_ingest_e2e.rs @@ -0,0 +1,157 @@ +//! Phase 2 — LIVE ingest-route e2e (requires a running master + fula-ingest). +//! +//! Env (skip-without, offline_e2e convention): +//! FULA_S3 master base URL (e.g. http://127.0.0.1:9000) +//! FULA_JWT bearer for the master (HS256 with the master's secret) +//! FULA_INGEST fula-ingest base URL (e.g. http://127.0.0.1:3601) +//! FULA_BIG=1 also run the ≥1 GiB case (writes a temp file) +//! +//! Run: cargo test -p fula-client --test live_ingest_e2e --release -- --ignored --nocapture +//! +//! The runner script (fula-ota tests/e2e/phase-2/60-fidelity.sh) asserts the +//! ingest container's accepted-block log count INCREASES across this test — +//! the server-side proof that bytes flowed via the ingest node, not the +//! gateway. + +#![cfg(not(target_arch = "wasm32"))] + +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; + +fn env_or_skip(key: &str) -> Option { + match std::env::var(key) { + Ok(v) if !v.is_empty() => Some(v), + _ => { + eprintln!("SKIP: {key} not set"); + None + } + } +} + +fn client(s3: &str, jwt: &str, ingest: Option<&str>, v8: bool) -> EncryptedClient { + let mut config = Config::new(s3).with_token(jwt); + // The contended e2e box serializes 16 parallel chunk commits through the + // per-bucket write lock (each including a registry persist+pin); under + // load the queue tail exceeds the 30s default. Real deployments commit in + // ~100ms — this is test-box headroom, not a product setting. + config.timeout = std::time::Duration::from_secs(600); + config.walkable_v8_writer_enabled = v8; + if let Some(i) = ingest { + config.ingest_endpoints = vec![i.to_string()]; + } + let secret = SecretKey::generate(); + EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)).expect("client") +} + +fn payload(len: usize) -> Vec { + (0..len).map(|i| ((i * 31 + 7) % 251) as u8).collect() +} + +/// Chunked upload routed via the ingest node, read back through the master — +/// full live round-trip of the Phase 2 byte path (client CID on). +#[tokio::test] +#[ignore] +async fn live_chunked_via_ingest_round_trip() { + let (Some(s3), Some(jwt), Some(ingest)) = ( + env_or_skip("FULA_S3"), + env_or_skip("FULA_JWT"), + env_or_skip("FULA_INGEST"), + ) else { + return; + }; + let c = client(&s3, &jwt, Some(&ingest), true); + let bucket = "p2-live-ingest"; + let key = "/ingest/round-trip.bin"; + // >768 KiB → put_object_flat dispatches into put_object_chunked_internal + // (the FxFiles photo/video path) where the ingest route lives. + let data = payload(1_500_000); + + c.put_object_flat(bucket, key, data.clone(), Some("application/octet-stream")) + .await + .expect("chunked flat upload via ingest route"); + + let got = c + .get_object_flat(bucket, key) + .await + .expect("download after ingest-routed upload"); + assert_eq!(got.len(), data.len(), "length mismatch"); + assert_eq!( + blake3::hash(&got), + blake3::hash(&data), + "content mismatch after ingest-routed round trip" + ); +} + +/// Same upload with the v8 writer OFF — the ingest route must self-disable +/// (no pre-computed CIDs to declare) and the legacy path must round-trip. +#[tokio::test] +#[ignore] +async fn live_chunked_v8_off_legacy_round_trip() { + let (Some(s3), Some(jwt)) = (env_or_skip("FULA_S3"), env_or_skip("FULA_JWT")) else { + return; + }; + let ingest = std::env::var("FULA_INGEST").ok(); + let c = client(&s3, &jwt, ingest.as_deref(), false); + let bucket = "p2-live-legacy"; + let key = "/legacy/round-trip.bin"; + let data = payload(1_200_000); + + c.put_object_flat(bucket, key, data.clone(), Some("application/octet-stream")) + .await + .expect("chunked flat upload, v8 off"); + let got = c.get_object_flat(bucket, key).await.expect("download"); + assert_eq!(blake3::hash(&got), blake3::hash(&data)); +} + +/// Large-file chunked upload via the ingest route (scale invariant: many +/// thousands of chunks, bounded service-side memory). Gated on FULA_BIG=1. +/// +/// Size = `FULA_BIG_MB` MiB (default 512 ≈ 2048 chunks at 256 KiB). The +/// 7.8 GB / 4-core e2e box swap-thrashes for 12h+ on a full 1 GiB because the +/// product buffers a whole file in RAM (`put_flat_from_path` → fs::read → +/// one Bytes) AND every chunk's metadata commit serializes through the +/// per-bucket lock; 512 MiB proves the same multi-thousand-chunk streaming + +/// ingest path in ~30-40 min here. To validate the literal ≥1 GiB on a +/// prod-class box: `FULA_BIG_MB=1024` (or higher). The product memory model +/// (full-file buffering) is flagged for prod large-file sizing. +#[tokio::test] +#[ignore] +async fn live_large_file_chunked_via_ingest() { + if std::env::var("FULA_BIG").ok().as_deref() != Some("1") { + eprintln!("SKIP: FULA_BIG != 1"); + return; + } + let (Some(s3), Some(jwt), Some(ingest)) = ( + env_or_skip("FULA_S3"), + env_or_skip("FULA_JWT"), + env_or_skip("FULA_INGEST"), + ) else { + return; + }; + let size_mb: usize = std::env::var("FULA_BIG_MB") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(512); + let size = size_mb * 1024 * 1024; + + let c = client(&s3, &jwt, Some(&ingest), true); + let bucket = "p2-live-big"; + let key = "/big/large.bin"; + + let data = payload(size); // size/256KiB chunks + let want = blake3::hash(&data); + let len = data.len(); + + let started = std::time::Instant::now(); + // MOVE the payload — holding payload + a clone doubles RSS and OOM-kills + // the run on this box (run #4, SIGKILL). `want`+`len` carry everything + // verification needs. + c.put_object_flat(bucket, key, data, Some("application/octet-stream")) + .await + .unwrap_or_else(|e| panic!("{size_mb} MiB chunked flat upload via ingest: {e:?}")); + eprintln!("{size_mb} MiB upload took {:?} (~{} chunks)", started.elapsed(), len / (256 * 1024)); + + let got = c.get_object_flat(bucket, key).await.expect("large-file download"); + assert_eq!(got.len(), len); + assert_eq!(blake3::hash(&got), want, "large-file content mismatch"); +} diff --git a/docker/Dockerfile.gateway b/docker/Dockerfile.gateway new file mode 100644 index 0000000..fabcb30 --- /dev/null +++ b/docker/Dockerfile.gateway @@ -0,0 +1,34 @@ +# fula-gateway — S3-compatible gateway (federated-master image). +# +# Build (repo root): docker build -f docker/Dockerfile.gateway -t fula-gateway:latest . +# Used by the federated-master stack: functionland/pinning-service +# docker/master/docker-compose.master.yml ("gateway" profile, auto-enabled by +# update-scripts/join-as-master.sh when this image exists on the box). +# +# All configuration is env-driven (clap): FULA_HOST (0.0.0.0), FULA_PORT +# (9000), IPFS_API_URL, CLUSTER_API_URL, PINNING_SERVICE_ENDPOINT, +# STORAGE_API_URL, JWT_SECRET, ADMIN_JWT_SECRET, FULA_BLOCK_CACHE_MB, +# FULA_PIN_QUEUE_PATH (durable pin queue — production MUST set it to a +# persistent mount), rate limits, etc. See crates/fula-cli/src/main.rs. +FROM rust:1-bookworm AS build +WORKDIR /src +# Workspace build; fula-js (wasm) and fula-flutter are workspace members but +# building only -p fula-cli avoids their toolchains. +COPY . . +RUN cargo build --release -p fula-cli --bin fula-gateway + +FROM debian:bookworm-slim AS runtime +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates netcat-openbsd \ + && rm -rf /var/lib/apt/lists/* +COPY --from=build /src/target/release/fula-gateway /usr/local/bin/fula-gateway +# Persistent state lives at HARDCODED /var/lib/fula-gateway (pin_queue.redb, +# registry.cid, local_retain.redb — see crates/fula-cli/src/config.rs defaults). +# Mount a volume here or pin retries/crash recovery silently degrade to +# fire-and-forget and the bucket registry resets on restart. +RUN mkdir -p /var/lib/fula-gateway +VOLUME ["/var/lib/fula-gateway"] +ENV FULA_HOST=0.0.0.0 \ + FULA_PORT=9000 +EXPOSE 9000 +ENTRYPOINT ["fula-gateway"]