From 1be84cc912505902bd53017bb2ea5056f45d5d44 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 11 Jun 2026 19:34:41 -0400 Subject: [PATCH 01/12] feat(docker): reproducible fula-gateway image for federated masters Multi-stage build of the fula-gateway bin (cargo build --release -p fula-cli) on rust:1-bookworm with a slim Debian runtime. Env-driven config only; /data volume for durable state (pin queue, registry CID). Consumed by the federated-master installer in functionland/pinning-service (gateway compose profile, auto-enabled when the image exists). Closes #29 Co-Authored-By: Claude Fable 5 --- docker/Dockerfile.gateway | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 docker/Dockerfile.gateway diff --git a/docker/Dockerfile.gateway b/docker/Dockerfile.gateway new file mode 100644 index 0000000..6147e88 --- /dev/null +++ b/docker/Dockerfile.gateway @@ -0,0 +1,30 @@ +# fula-gateway — S3-compatible gateway (federated-master image). +# +# Build (repo root): docker build -f docker/Dockerfile.gateway -t fula-gateway:latest . +# Used by the federated-master stack: functionland/pinning-service +# docker/master/docker-compose.master.yml ("gateway" profile, auto-enabled by +# update-scripts/join-as-master.sh when this image exists on the box). +# +# All configuration is env-driven (clap): FULA_HOST (0.0.0.0), FULA_PORT +# (9000), IPFS_API_URL, CLUSTER_API_URL, PINNING_SERVICE_ENDPOINT, +# STORAGE_API_URL, JWT_SECRET, ADMIN_JWT_SECRET, FULA_BLOCK_CACHE_MB, +# FULA_PIN_QUEUE_PATH (durable pin queue — production MUST set it to a +# persistent mount), rate limits, etc. See crates/fula-cli/src/main.rs. +FROM rust:1-bookworm AS build +WORKDIR /src +# Workspace build; fula-js (wasm) and fula-flutter are workspace members but +# building only -p fula-cli avoids their toolchains. +COPY . . +RUN cargo build --release -p fula-cli --bin fula-gateway + +FROM debian:bookworm-slim AS runtime +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates netcat-openbsd \ + && rm -rf /var/lib/apt/lists/* +COPY --from=build /src/target/release/fula-gateway /usr/local/bin/fula-gateway +# Persistent state (pin queue, registry CID, block cache) belongs on a volume. +VOLUME ["/data"] +ENV FULA_HOST=0.0.0.0 \ + FULA_PORT=9000 +EXPOSE 9000 +ENTRYPOINT ["fula-gateway"] From ecb342abb8b1bd49bbb84cebf6516445c7839a0f Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 11 Jun 2026 20:51:02 -0400 Subject: [PATCH 02/12] fix(docker): volume at the gateway hardcoded state dir /var/lib/fula-gateway The pin queue, registry CID, and local-retain backlog default to /var/lib/fula-gateway (config.rs); the /data volume was wrong - without a mount there, pin retries/crash recovery silently degrade to fire-and-forget. Found by the federated-master e2e (startup WARNs). Co-Authored-By: Claude Fable 5 --- docker/Dockerfile.gateway | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile.gateway b/docker/Dockerfile.gateway index 6147e88..fabcb30 100644 --- a/docker/Dockerfile.gateway +++ b/docker/Dockerfile.gateway @@ -22,8 +22,12 @@ RUN apt-get update \ && apt-get install -y --no-install-recommends ca-certificates netcat-openbsd \ && rm -rf /var/lib/apt/lists/* COPY --from=build /src/target/release/fula-gateway /usr/local/bin/fula-gateway -# Persistent state (pin queue, registry CID, block cache) belongs on a volume. -VOLUME ["/data"] +# Persistent state lives at HARDCODED /var/lib/fula-gateway (pin_queue.redb, +# registry.cid, local_retain.redb — see crates/fula-cli/src/config.rs defaults). +# Mount a volume here or pin retries/crash recovery silently degrade to +# fire-and-forget and the bucket registry resets on restart. +RUN mkdir -p /var/lib/fula-gateway +VOLUME ["/var/lib/fula-gateway"] ENV FULA_HOST=0.0.0.0 \ FULA_PORT=9000 EXPOSE 9000 From 9fbc069e3d76dce24108b31ddf2de48ad07aa1dc Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 11 Jun 2026 22:47:43 -0400 Subject: [PATCH 03/12] feat(ingress): Phase 2 - ingest byte route with capability probe + remote-cid mapping PUTs Gateway (flag FULA_REMOTE_CID_PUT, default OFF): - GET /fula/capabilities (public): advertises {remoteCidPut} so clients probe before using optional protocols - an old/flag-off master is never sent an empty-body PUT it would misstore as a real zero-byte chunk - put_object: empty-body PUT with x-amz-meta-fula-remote-cid records only the key->cid mapping after validating the CID is raw(0x55)+blake3(0x1e) and the block is PRESENT in the blockstore (absent => retryable 409 so the client falls back to full bytes); x-amz-meta-fula-remote-size carries the chunk's true byte count so billing/list never see 0; both headers added to the control-header filter (never persisted) Client (Config.ingest_endpoints, default empty = byte-identical legacy): - supports_remote_cid_put(): one-shot capability probe, ANY failure=false - put_object_chunked_internal: per chunk, bytes stream to the ingest node (PUT /v0/block?cid= - the node verifies before storing), then the mapping PUT (with pinning variant when creds present, keeping per-object pin requests); ANY failure on the route falls back to the legacy full-bytes PUT. Probe once per upload; native-only (wasm32 keeps legacy path); ETag=cid keeps the existing self-verify unchanged Part of #31 (cross-repo: functionland/fula-ota#74 fula-ingest) Co-Authored-By: Claude Fable 5 --- crates/fula-cli/src/config.rs | 9 +++ crates/fula-cli/src/handlers/object.rs | 89 ++++++++++++++++++++++++- crates/fula-cli/src/handlers/service.rs | 11 +++ crates/fula-cli/src/main.rs | 7 ++ crates/fula-cli/src/routes.rs | 7 +- crates/fula-client/src/client.rs | 18 +++++ crates/fula-client/src/config.rs | 21 ++++++ crates/fula-client/src/encryption.rs | 78 ++++++++++++++++++++++ 8 files changed, 236 insertions(+), 4 deletions(-) diff --git a/crates/fula-cli/src/config.rs b/crates/fula-cli/src/config.rs index 0720620..2671f74 100644 --- a/crates/fula-cli/src/config.rs +++ b/crates/fula-cli/src/config.rs @@ -58,6 +58,14 @@ pub struct GatewayConfig { pub cors_origins: Vec, /// Path to store bucket registry CID for persistence pub registry_cid_path: Option, + /// Phase 2 (decentralized ingress): accept empty-body chunk PUTs carrying + /// `x-amz-meta-fula-remote-cid` — the chunk's verified bytes were already + /// streamed to a fula-ingest node; this master records only the + /// key→cid mapping (after confirming block presence via the blockstore). + /// Advertised to clients via GET /fula/capabilities so old/flag-off + /// masters are never sent the protocol. Default OFF. + #[serde(default)] + pub remote_cid_put_enabled: bool, /// Storage API URL for balance/quota checking before uploads pub storage_api_url: Option, /// Admin JWT secret for admin API authentication (separate from user JWT) @@ -198,6 +206,7 @@ impl Default for GatewayConfig { cors_enabled: true, cors_origins: vec!["*".to_string()], registry_cid_path: Some("/var/lib/fula-gateway/registry.cid".to_string()), + remote_cid_put_enabled: false, storage_api_url: None, admin_jwt_secret: None, admin_api_enabled: false, diff --git a/crates/fula-cli/src/handlers/object.rs b/crates/fula-cli/src/handlers/object.rs index 6aa9c9b..aa45730 100644 --- a/crates/fula-cli/src/handlers/object.rs +++ b/crates/fula-cli/src/handlers/object.rs @@ -129,8 +129,86 @@ pub async fn put_object( } } - // Store the data - let cid = state.block_store.put_block(&body).await?; + // Phase 2 (decentralized ingress): empty-body mapping PUT. The chunk's + // verified bytes were already streamed to a fula-ingest node; record only + // the key→cid mapping here. Triple-gated: server flag (advertised via + // /fula/capabilities, so well-behaved clients never send this to a + // flag-off master), the header, and an empty body. The declared CID must + // be raw(0x55)+blake3(0x1e) — the only addressing the ingest verifies — + // and the block must be PRESENT in the blockstore (bitswap pulls it from + // the ingest peer); absent ⇒ retryable 409 so the client falls back to a + // full-bytes PUT. Storing nothing here is the point: the CID is + // self-certifying, so reads stay tamper-evident end-to-end. + let remote_cid_hdr = headers + .get("x-amz-meta-fula-remote-cid") + .and_then(|v| v.to_str().ok()) + .map(str::to_owned); + let cid = if let Some(declared) = remote_cid_hdr { + if !state.config.remote_cid_put_enabled { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "remote-cid PUT is not enabled on this master (probe /fula/capabilities)", + )); + } + if !body.is_empty() { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "remote-cid PUT requires an empty body", + )); + } + let declared_cid = cid::Cid::try_from(declared.as_str()).map_err(|e| { + ApiError::s3(S3ErrorCode::InvalidRequest, format!("invalid remote cid: {e}")) + })?; + if declared_cid.codec() != 0x55 || declared_cid.hash().code() != 0x1e { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "remote cid must be raw (0x55) + blake3 (0x1e) — the addressing fula-ingest verifies", + )); + } + match state.block_store.has_block(&declared_cid).await { + Ok(true) => {} + Ok(false) => { + return Err(ApiError::s3( + S3ErrorCode::OperationAborted, + "declared block not present/retrievable yet — retry or fall back to a full-bytes PUT", + )); + } + Err(e) => { + tracing::warn!(error = %e, cid = %declared_cid, "remote-cid presence check failed"); + return Err(ApiError::s3( + S3ErrorCode::OperationAborted, + "block presence check failed — retry or fall back to a full-bytes PUT", + )); + } + } + tracing::debug!(bucket = %bucket_name, key = %key, cid = %declared_cid, "remote-cid PUT accepted (bytes via ingest)"); + declared_cid + } else { + // Store the data + state.block_store.put_block(&body).await? + }; + // Phase 2: a mapping PUT has an empty body — the chunk's TRUE size (as + // stored on the ingest node) arrives in x-amz-meta-fula-remote-size so + // billing/list/stat record the real byte count, never 0. Only honored on + // the remote-cid path; bounded by max_body_size like a real body. + let object_size: u64 = match ( + headers.get("x-amz-meta-fula-remote-size").and_then(|v| v.to_str().ok()), + headers.get("x-amz-meta-fula-remote-cid").is_some(), + ) { + (Some(sz), true) => { + let parsed = sz.parse::().map_err(|_| { + ApiError::s3(S3ErrorCode::InvalidRequest, "invalid x-amz-meta-fula-remote-size") + })?; + if parsed == 0 || parsed > state.config.max_body_size as u64 { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "x-amz-meta-fula-remote-size out of range", + )); + } + parsed + } + _ => body.len() as u64, + }; // Use CID as ETag (content-addressed identifier) // This is S3-compliant: AWS docs state "The ETag may or may not be an MD5 digest" @@ -165,7 +243,7 @@ pub async fn put_object( // COPY handler at line 694). Access control is unaffected — bucket- // level access is gated by `BucketMetadata.owner_id` which is // already correctly the hashed form. - let mut metadata = ObjectMetadata::new(cid, body.len() as u64, etag.clone()) + let mut metadata = ObjectMetadata::new(cid, object_size, etag.clone()) .with_owner(&session.hashed_user_id); if let Some(ct) = content_type { @@ -1399,6 +1477,11 @@ pub(crate) const FULA_CONTROL_HEADERS: &[&str] = &[ // as user_metadata on the object itself (would pollute every // forest-manifest object with a meaningless `=1` tag). "fula-forest-manifest", + // Phase 2 (decentralized ingress): remote-cid mapping PUT controls — + // consumed by the handler (declared CID + true byte size of the chunk + // stored on the ingest node); never persisted as object metadata. + "fula-remote-cid", + "fula-remote-size", ]; /// Returns `true` if the given x-amz-meta key (already stripped of diff --git a/crates/fula-cli/src/handlers/service.rs b/crates/fula-cli/src/handlers/service.rs index d8e70b4..15e34f8 100644 --- a/crates/fula-cli/src/handlers/service.rs +++ b/crates/fula-cli/src/handlers/service.rs @@ -46,3 +46,14 @@ pub async fn health_check() -> impl IntoResponse { pub async fn healthz() -> impl IntoResponse { (StatusCode::OK, "ok") } + +/// GET /fula/capabilities — unauthenticated protocol-capability advertisement +/// (Phase 2). Clients probe this ONCE per instance before using optional +/// protocols. An old master 404s here, so a client never sends e.g. the +/// empty-body remote-cid PUT to a build that would misstore it as a real +/// zero-byte object. +pub async fn capabilities(State(state): State>) -> impl IntoResponse { + axum::Json(serde_json::json!({ + "remoteCidPut": state.config.remote_cid_put_enabled, + })) +} diff --git a/crates/fula-cli/src/main.rs b/crates/fula-cli/src/main.rs index bd1a8c1..ca683ad 100644 --- a/crates/fula-cli/src/main.rs +++ b/crates/fula-cli/src/main.rs @@ -92,6 +92,12 @@ struct Args { /// the local-retain backlog (the ongoing per-upload protection still runs). #[arg(long, env = "FULA_NO_LOCAL_RETAIN_BACKFILL")] no_local_retain_backfill: bool, + + /// Phase 2 (decentralized ingress): accept empty-body chunk PUTs carrying + /// x-amz-meta-fula-remote-cid (bytes pre-stored on a fula-ingest node). + /// Advertised to clients via GET /fula/capabilities. Default OFF. + #[arg(long, env = "FULA_REMOTE_CID_PUT")] + remote_cid_put: bool, } #[tokio::main] @@ -163,6 +169,7 @@ async fn main() -> anyhow::Result<()> { cluster_peering_enabled: if args.no_cluster_peering { Some(false) } else { None }, local_retain_enabled: if args.no_local_retain { Some(false) } else { None }, local_retain_backfill: if args.no_local_retain_backfill { Some(false) } else { None }, + remote_cid_put_enabled: args.remote_cid_put, ..Default::default() }; diff --git a/crates/fula-cli/src/routes.rs b/crates/fula-cli/src/routes.rs index 09b256d..cd7c054 100644 --- a/crates/fula-cli/src/routes.rs +++ b/crates/fula-cli/src/routes.rs @@ -24,7 +24,12 @@ pub fn create_router(state: Arc) -> Router { let cors = create_cors_layer(&state.config.cors_origins); // Public routes that must bypass auth (e.g., container health checks) - let public = Router::new().route("/healthz", get(handlers::healthz)); + let public = Router::new() + .route("/healthz", get(handlers::healthz)) + // Phase 2: capability probe (no auth — the answer is not sensitive and + // clients must be able to probe before any authenticated operation). + .route("/fula/capabilities", get(handlers::capabilities)) + .with_state(state.clone()); // Phase 3.2 internal endpoints. Bearer-token-protected at the // handler level (see handlers::internal::authenticate). They diff --git a/crates/fula-client/src/client.rs b/crates/fula-client/src/client.rs index 1b2086b..f67791a 100644 --- a/crates/fula-client/src/client.rs +++ b/crates/fula-client/src/client.rs @@ -522,6 +522,24 @@ impl FulaClient { }) } + /// Phase 2 (decentralized ingress): one-shot master capability probe. + /// Returns whether the master accepts empty-body remote-cid mapping PUTs + /// (GET /fula/capabilities → {"remoteCidPut": bool}). ANY failure — 404 on + /// an old build, network error, unparseable body — returns `false`, so + /// callers degrade to the legacy full-bytes path, never to a protocol the + /// master would misinterpret. + pub async fn supports_remote_cid_put(&self) -> bool { + match self.request("GET", "/fula/capabilities", None, None, None).await { + Ok(resp) => resp + .json::() + .await + .ok() + .and_then(|v| v.get("remoteCidPut").and_then(|b| b.as_bool())) + .unwrap_or(false), + Err(_) => false, + } + } + /// Put an object with metadata and optional If-Match / If-None-Match guards. /// /// Used by conditional-write paths (e.g. forest flush) to detect concurrent diff --git a/crates/fula-client/src/config.rs b/crates/fula-client/src/config.rs index dfa4980..1660340 100644 --- a/crates/fula-client/src/config.rs +++ b/crates/fula-client/src/config.rs @@ -195,6 +195,18 @@ pub struct Config { /// targeted regressions or backward-compat tests). pub walkable_v8_writer_enabled: bool, + /// Phase 2 (decentralized ingress) — optional fula-ingest endpoints. + /// When non-empty AND `walkable_v8_writer_enabled` AND the master + /// advertises `remoteCidPut` via GET /fula/capabilities (probed once per + /// chunked upload), chunk ciphertext BYTES are streamed to the first + /// ingest endpoint (`PUT /v0/block?cid=` — the node verifies + /// the CID before storing) and the master receives only an empty-body + /// mapping PUT (`x-amz-meta-fula-remote-cid` + true size). ANY failure on + /// that route falls back transparently to the legacy full-bytes PUT. + /// Empty (default) = behavior byte-identical to before. Native-only + /// (ignored on wasm32). + pub ingest_endpoints: Vec, + /// Phase 19 — optional health-status callback. When set, the SDK /// invokes this closure on every Up↔Down transition of the /// master health gate (`MasterHealthEvent::Online` / @@ -330,6 +342,8 @@ impl Default for Config { // flipping master-side gates until SDK adoption reaches the // % they're comfortable with for the pre-v0.6 reader cost. walkable_v8_writer_enabled: true, + // Phase 2 — no ingest nodes by default (legacy byte path). + ingest_endpoints: Vec::new(), // Phase 19 — no callback by default (silent gate). health_callback: None, // E2E plan Phase 4 — Mode B/C signed-entry writer disabled @@ -358,6 +372,13 @@ impl Config { self } + /// Phase 2: route chunk bytes through fula-ingest node(s) when the master + /// supports remote-cid mapping PUTs. Empty = legacy behavior. + pub fn with_ingest_endpoints(mut self, endpoints: Vec) -> Self { + self.ingest_endpoints = endpoints; + self + } + /// Enable encryption pub fn with_encryption(mut self) -> Self { self.encryption_enabled = true; diff --git a/crates/fula-client/src/encryption.rs b/crates/fula-client/src/encryption.rs index 10359a6..97eca51 100644 --- a/crates/fula-client/src/encryption.rs +++ b/crates/fula-client/src/encryption.rs @@ -7083,6 +7083,31 @@ impl EncryptedClient { // (skip_serializing_if keeps it off the wire). let walkable_v8 = self.inner.config().walkable_v8_writer_enabled; + // Phase 2 (decentralized ingress): decide the byte route ONCE per + // upload. The ingest route activates only when (a) ingest endpoints + // are configured, (b) the walkable-v8 writer is on (we need the + // pre-computed chunk CID to declare), and (c) the master ADVERTISES + // remote-cid support via /fula/capabilities — an old or flag-off + // master is never sent the empty-body protocol it would misstore. + // (cfg-gated: native only; wasm32 keeps the legacy path.) + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx: Option<(reqwest::Client, String, Option)> = { + let cfg = self.inner.config(); + if cfg.ingest_endpoints.is_empty() || !walkable_v8 { + None + } else if self.inner.supports_remote_cid_put().await { + tracing::debug!(ingest = %cfg.ingest_endpoints[0], "ingest byte route enabled (master advertises remoteCidPut)"); + Some(( + reqwest::Client::new(), + cfg.ingest_endpoints[0].trim_end_matches('/').to_string(), + cfg.access_token.clone(), + )) + } else { + tracing::debug!("master does not advertise remoteCidPut — ingest route disabled, using legacy byte path"); + None + } + }; + // Upload chunks in parallel with bounded concurrency. Using // futures::stream::buffer_unordered rather than tokio::spawn so the // same code runs on wasm32 (where tokio has no multi-thread runtime). @@ -7111,8 +7136,61 @@ impl EncryptedClient { let bucket = bucket.to_string(); let pinning = pinning.clone(); let chunk_key_ret = chunk_key.clone(); + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx = ingest_ctx.clone(); async move { + // Phase 2: ingest byte route. Bytes go to the fula-ingest node + // (which verifies BLAKE3(body) == declared CID before storing) + // and the master gets an empty-body mapping PUT carrying the + // declared CID + true size. ANY failure on this route falls + // through to the legacy full-bytes PUT below — degradation, + // never corruption (the master only accepts the mapping when + // its flag is on AND the block is present in its blockstore). + #[cfg(not(target_arch = "wasm32"))] + if let (Some((ingest_http, ingest_base, token)), Some(expected)) = + (ingest_ctx.as_ref(), expected_chunk_cid) + { + let url = format!("{}/v0/block?cid={}", ingest_base, expected); + let mut rb = ingest_http.put(&url).body(chunk.ciphertext.clone()); + if let Some(t) = token { + rb = rb.bearer_auth(t); + } + match rb.send().await { + Ok(r) if r.status().is_success() => { + let map_meta = ObjectMetadata::new() + .with_content_type("application/octet-stream") + .with_metadata("x-fula-chunk", "true") + .with_metadata("x-fula-chunk-index", &chunk.index.to_string()) + .with_metadata("fula-remote-cid", &expected.to_string()) + .with_metadata("fula-remote-size", &chunk.ciphertext.len().to_string()); + let map_res = if let Some(ref pin) = pinning { + client.put_object_with_metadata_and_pinning( + &bucket, &chunk_key, bytes::Bytes::new(), Some(map_meta), + &pin.endpoint, &pin.token, + ).await + } else { + client.put_object_with_metadata( + &bucket, &chunk_key, bytes::Bytes::new(), Some(map_meta), + ).await + }; + match map_res { + Ok(put_result) => { + let chunk_cid = crate::walkable_v8::verify_etag_against_expected_cid( + &put_result.etag, expected, &bucket, &chunk_key, + ); + return Ok::<(String, u32, Option), ClientError>(( + chunk_key_ret, chunk_index_for_collect, chunk_cid, + )); + } + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "remote-cid mapping PUT failed — falling back to full-bytes PUT"), + } + } + Ok(r) => tracing::debug!(status = %r.status(), chunk = %chunk_key, "ingest rejected block — falling back to full-bytes PUT"), + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "ingest unreachable — falling back to full-bytes PUT"), + } + } + let put_result = if let Some(ref pin) = pinning { client.put_object_with_metadata_and_pinning( &bucket, From d1ac73f72b4ecd25f12167464bfb55deb2694946 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 11 Jun 2026 22:59:46 -0400 Subject: [PATCH 04/12] test(ingress): wiremock matrix for the ingest byte route Four cases: old master (no capabilities) never receives the new protocol; capable master routes every chunk's bytes through the ingest node (master sees ONLY empty-body mapping PUTs; declared sizes sum to the bytes the ingest stored; ingest asserts declared cid == blake3 of body); dead ingest falls back transparently and the upload succeeds; empty ingest_endpoints is byte-identical legacy. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-client/tests/ingest_route.rs | 273 +++++++++++++++++++++++ 1 file changed, 273 insertions(+) create mode 100644 crates/fula-client/tests/ingest_route.rs diff --git a/crates/fula-client/tests/ingest_route.rs b/crates/fula-client/tests/ingest_route.rs new file mode 100644 index 0000000..3a27386 --- /dev/null +++ b/crates/fula-client/tests/ingest_route.rs @@ -0,0 +1,273 @@ +//! Phase 2 (decentralized ingress) — client routing matrix. +//! +//! Verifies the three load-bearing behaviors of `Config.ingest_endpoints`: +//! +//! 1. **Old master (no /fula/capabilities)** → the ingest route never +//! activates; every chunk goes to the master as a full-bytes PUT +//! (the misstore-on-old-master hazard the capability probe exists for). +//! 2. **Capable master + healthy ingest** → chunk BYTES go to the ingest +//! node; the master receives ONLY empty-body mapping PUTs carrying +//! `x-amz-meta-fula-remote-cid` (the declared blake3 CID) and +//! `x-amz-meta-fula-remote-size` (the true ciphertext size). +//! 3. **Capable master + dead ingest** → transparent fallback: the upload +//! still succeeds and the master sees full-bytes chunk PUTs. +//! +//! Chunk PUTs are identified by the `x-amz-meta-x-fula-chunk: true` header +//! the SDK stamps on every chunk (index objects / forest blobs lack it). + +#![cfg(not(target_arch = "wasm32"))] + +use bytes::Bytes; +use cid::multihash::Multihash; +use cid::Cid; +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate}; + +fn blake3_raw_cid(data: &[u8]) -> Cid { + let h = blake3::hash(data); + let mh = Multihash::<64>::wrap(0x1e, h.as_bytes()).expect("blake3 multihash wrap"); + Cid::new_v1(0x55, mh) +} + +/// Master-side recording responder. Mimics BOTH master generations: +/// for a normal bytes PUT it returns `ETag = blake3(body)` (what every +/// master does); for an empty-body mapping PUT it returns `ETag = +/// declared cid` (what a flag-on master does after its presence check). +/// Records per-class counters the assertions read back. +struct MasterResponder { + chunk_full_bodies: Arc, + chunk_mapping_puts: Arc, + mapping_size_headers: Arc>>, +} + +impl Respond for MasterResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + let is_chunk = req + .headers + .get("x-amz-meta-x-fula-chunk") + .map(|v| v.to_str().unwrap_or("") == "true") + .unwrap_or(false); + let remote_cid = req + .headers + .get("x-amz-meta-fula-remote-cid") + .and_then(|v| v.to_str().ok()) + .map(str::to_owned); + + if let Some(declared) = remote_cid { + // Mapping PUT: must be empty-bodied; echo the declared cid as ETag. + assert!( + req.body.is_empty(), + "mapping PUT must carry an EMPTY body, got {} bytes", + req.body.len() + ); + if is_chunk { + self.chunk_mapping_puts.fetch_add(1, Ordering::SeqCst); + } + if let Some(sz) = req + .headers + .get("x-amz-meta-fula-remote-size") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + { + self.mapping_size_headers.lock().unwrap().push(sz); + } + return ResponseTemplate::new(200).insert_header("ETag", declared.as_str()); + } + + if is_chunk && !req.body.is_empty() { + self.chunk_full_bodies.fetch_add(1, Ordering::SeqCst); + } + let cid = blake3_raw_cid(&req.body); + ResponseTemplate::new(200).insert_header("ETag", cid.to_string()) + } +} + +/// Ingest-side responder: verifies the declared cid matches the body +/// (what the real fula-ingest does) and counts accepted blocks + bytes. +struct IngestResponder { + accepted: Arc, + bytes_seen: Arc, +} + +impl Respond for IngestResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + let declared = req + .url + .query_pairs() + .find(|(k, _)| k == "cid") + .map(|(_, v)| v.to_string()) + .unwrap_or_default(); + let actual = blake3_raw_cid(&req.body).to_string(); + assert_eq!(declared, actual, "client must declare the true blake3 CID"); + self.accepted.fetch_add(1, Ordering::SeqCst); + self.bytes_seen.fetch_add(req.body.len(), Ordering::SeqCst); + ResponseTemplate::new(200) + .set_body_json(serde_json::json!({"cid": declared, "size": req.body.len()})) + } +} + +struct Counters { + chunk_full_bodies: Arc, + chunk_mapping_puts: Arc, + mapping_size_headers: Arc>>, + ingest_accepted: Arc, + ingest_bytes: Arc, +} + +/// Build (master, ingest, counters); `advertise_capability` controls whether +/// the master exposes /fula/capabilities (old vs new build). +async fn setup(advertise_capability: bool) -> (MockServer, MockServer, Counters) { + let master = MockServer::start().await; + let ingest = MockServer::start().await; + + let c = Counters { + chunk_full_bodies: Arc::new(AtomicUsize::new(0)), + chunk_mapping_puts: Arc::new(AtomicUsize::new(0)), + mapping_size_headers: Arc::new(Mutex::new(Vec::new())), + ingest_accepted: Arc::new(AtomicUsize::new(0)), + ingest_bytes: Arc::new(AtomicUsize::new(0)), + }; + + if advertise_capability { + Mock::given(method("GET")) + .and(path("/fula/capabilities")) + .respond_with( + ResponseTemplate::new(200).set_body_json(serde_json::json!({"remoteCidPut": true})), + ) + .mount(&master) + .await; + } + Mock::given(method("PUT")) + .respond_with(MasterResponder { + chunk_full_bodies: c.chunk_full_bodies.clone(), + chunk_mapping_puts: c.chunk_mapping_puts.clone(), + mapping_size_headers: c.mapping_size_headers.clone(), + }) + .mount(&master) + .await; + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(404)) + .mount(&master) + .await; + Mock::given(method("HEAD")) + .respond_with(ResponseTemplate::new(200)) + .mount(&master) + .await; + + Mock::given(method("PUT")) + .respond_with(IngestResponder { + accepted: c.ingest_accepted.clone(), + bytes_seen: c.ingest_bytes.clone(), + }) + .mount(&ingest) + .await; + + (master, ingest, c) +} + +fn client_for(master_uri: &str, ingest_uri: Option<&str>) -> EncryptedClient { + let mut config = Config::new(master_uri).with_token("test-jwt"); + config.walkable_v8_writer_enabled = true; + if let Some(uri) = ingest_uri { + config.ingest_endpoints = vec![uri.to_string()]; + } + let secret = SecretKey::generate(); + EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)).expect("client") +} + +/// 5 chunks of 64 KiB — well past the threshold with an explicit chunk size. +fn payload() -> Vec { + (0..(5 * 64 * 1024)).map(|i| (i % 251) as u8).collect() +} + +#[tokio::test] +async fn old_master_never_receives_the_new_protocol() { + let (master, ingest, c) = setup(false).await; + let client = client_for(&master.uri(), Some(&ingest.uri())); + + client + .put_object_chunked("bucket-a", "/file.bin", &payload(), Some(64 * 1024)) + .await + .expect("upload must succeed via legacy path"); + + assert_eq!( + c.ingest_accepted.load(Ordering::SeqCst), + 0, + "ingest must never be used when the master lacks the capability" + ); + assert_eq!(c.chunk_mapping_puts.load(Ordering::SeqCst), 0); + assert!( + c.chunk_full_bodies.load(Ordering::SeqCst) >= 5, + "all chunks must arrive at the master as full bodies" + ); +} + +#[tokio::test] +async fn capable_master_routes_bytes_through_ingest() { + let (master, ingest, c) = setup(true).await; + let client = client_for(&master.uri(), Some(&ingest.uri())); + + let data = payload(); + client + .put_object_chunked("bucket-b", "/file.bin", &data, Some(64 * 1024)) + .await + .expect("upload must succeed via ingest route"); + + let accepted = c.ingest_accepted.load(Ordering::SeqCst); + let mappings = c.chunk_mapping_puts.load(Ordering::SeqCst); + assert!(accepted >= 5, "chunk bytes must hit the ingest node (got {accepted})"); + assert_eq!( + c.chunk_full_bodies.load(Ordering::SeqCst), + 0, + "no chunk should arrive at the master as a full body" + ); + assert_eq!(accepted, mappings, "every ingest-accepted chunk needs its mapping PUT"); + // The true ciphertext sizes (plaintext + AEAD overhead) must be declared. + let sizes = c.mapping_size_headers.lock().unwrap(); + assert_eq!(sizes.len(), mappings); + assert!(sizes.iter().all(|&s| s > 0), "remote-size must never be 0"); + let total_ingest_bytes = c.ingest_bytes.load(Ordering::SeqCst) as u64; + assert_eq!( + sizes.iter().sum::(), + total_ingest_bytes, + "declared sizes must equal the bytes the ingest node stored" + ); +} + +#[tokio::test] +async fn dead_ingest_falls_back_to_full_bytes() { + let (master, _ingest, c) = setup(true).await; + // Point at a port nothing listens on — connection refused, fast. + let client = client_for(&master.uri(), Some("http://127.0.0.1:9")); + + client + .put_object_chunked("bucket-c", "/file.bin", &payload(), Some(64 * 1024)) + .await + .expect("upload must SUCCEED despite the dead ingest (transparent fallback)"); + + assert_eq!(c.chunk_mapping_puts.load(Ordering::SeqCst), 0); + assert!( + c.chunk_full_bodies.load(Ordering::SeqCst) >= 5, + "fallback must deliver every chunk as a full-bytes PUT" + ); +} + +#[tokio::test] +async fn no_ingest_configured_is_byte_identical_legacy() { + let (master, ingest, c) = setup(true).await; + let client = client_for(&master.uri(), None); + + client + .put_object_chunked("bucket-d", "/file.bin", &payload(), Some(64 * 1024)) + .await + .expect("legacy upload"); + + assert_eq!(c.ingest_accepted.load(Ordering::SeqCst), 0); + assert_eq!(c.chunk_mapping_puts.load(Ordering::SeqCst), 0); + assert!(c.chunk_full_bodies.load(Ordering::SeqCst) >= 5); + drop(ingest); +} From 9493a571cf5a4454f1c5ce7750899f21d1bb6d8f Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 11 Jun 2026 23:07:14 -0400 Subject: [PATCH 05/12] fix(ingress): mirror the ingest route into the public put_object_chunked loop put_object_chunked (the FxFiles-path public API) duplicates the chunk loop of put_object_chunked_internal (E51 mirroring pattern) - the route existed only in the internal one, so public-API uploads silently stayed on the legacy byte path. Caught by the wiremock matrix (positive case: ingest got 0). Same probe gating + fallback semantics. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-client/src/encryption.rs | 63 ++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/crates/fula-client/src/encryption.rs b/crates/fula-client/src/encryption.rs index 97eca51..90d8edb 100644 --- a/crates/fula-client/src/encryption.rs +++ b/crates/fula-client/src/encryption.rs @@ -10593,6 +10593,28 @@ impl EncryptedClient { // master's etag-attested CID matches. let walkable_v8 = self.inner.config().walkable_v8_writer_enabled; + // Phase 2 (decentralized ingress): same probe-gated ingest route as + // `put_object_chunked_internal` — this public API duplicates that + // loop (E51 mirroring pattern), so it must mirror the route too or + // FxFiles-path uploads silently stay on the legacy byte path. + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx: Option<(reqwest::Client, String, Option)> = { + let cfg = self.inner.config(); + if cfg.ingest_endpoints.is_empty() || !walkable_v8 { + None + } else if self.inner.supports_remote_cid_put().await { + tracing::debug!(ingest = %cfg.ingest_endpoints[0], "ingest byte route enabled (master advertises remoteCidPut)"); + Some(( + reqwest::Client::new(), + cfg.ingest_endpoints[0].trim_end_matches('/').to_string(), + cfg.access_token.clone(), + )) + } else { + tracing::debug!("master does not advertise remoteCidPut — ingest route disabled, using legacy byte path"); + None + } + }; + // Upload chunks in parallel with bounded concurrency. Using // futures::stream::buffer_unordered rather than tokio::spawn so the // same code runs on wasm32 (where tokio has no multi-thread runtime). @@ -10622,8 +10644,49 @@ impl EncryptedClient { let client = self.inner.clone(); let bucket = bucket.to_string(); let chunk_key_ret = chunk_key.clone(); + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx = ingest_ctx.clone(); async move { + // Phase 2: ingest byte route (see put_object_chunked_internal + // for the full rationale). ANY failure falls through to the + // legacy full-bytes PUT below. + #[cfg(not(target_arch = "wasm32"))] + if let (Some((ingest_http, ingest_base, token)), Some(expected)) = + (ingest_ctx.as_ref(), expected_chunk_cid) + { + let url = format!("{}/v0/block?cid={}", ingest_base, expected); + let mut rb = ingest_http.put(&url).body(chunk.ciphertext.clone()); + if let Some(t) = token { + rb = rb.bearer_auth(t); + } + match rb.send().await { + Ok(r) if r.status().is_success() => { + let map_meta = ObjectMetadata::new() + .with_content_type("application/octet-stream") + .with_metadata("x-fula-chunk", "true") + .with_metadata("x-fula-chunk-index", &chunk.index.to_string()) + .with_metadata("fula-remote-cid", &expected.to_string()) + .with_metadata("fula-remote-size", &chunk.ciphertext.len().to_string()); + match client.put_object_with_metadata( + &bucket, &chunk_key, bytes::Bytes::new(), Some(map_meta), + ).await { + Ok(put_result) => { + let chunk_cid = crate::walkable_v8::verify_etag_against_expected_cid( + &put_result.etag, expected, &bucket, &chunk_key, + ); + return Ok::<(String, u32, Option), ClientError>(( + chunk_key_ret, chunk_index_for_collect, chunk_cid, + )); + } + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "remote-cid mapping PUT failed — falling back to full-bytes PUT"), + } + } + Ok(r) => tracing::debug!(status = %r.status(), chunk = %chunk_key, "ingest rejected block — falling back to full-bytes PUT"), + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "ingest unreachable — falling back to full-bytes PUT"), + } + } + let put_result = client.put_object_with_metadata( &bucket, &chunk_key, From c910b79c7b1bb0ccb8cd95e02297f3db99e3329a Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 11 Jun 2026 23:12:22 -0400 Subject: [PATCH 06/12] test(ingress): live ingest-route e2e (env-driven, ignored by default) Round-trip via a real master + fula-ingest: chunked upload routed through the ingest node then read back via the master (CID on); v8-off legacy parity leg; FULA_BIG=1 adds the 1 GiB case (~4096 chunks). Consumed by fula-ota tests/e2e/phase-2/60-fidelity.sh which also asserts server-side that kubo block counts grew. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-client/tests/live_ingest_e2e.rs | 133 ++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 crates/fula-client/tests/live_ingest_e2e.rs diff --git a/crates/fula-client/tests/live_ingest_e2e.rs b/crates/fula-client/tests/live_ingest_e2e.rs new file mode 100644 index 0000000..da8b818 --- /dev/null +++ b/crates/fula-client/tests/live_ingest_e2e.rs @@ -0,0 +1,133 @@ +//! Phase 2 — LIVE ingest-route e2e (requires a running master + fula-ingest). +//! +//! Env (skip-without, offline_e2e convention): +//! FULA_S3 master base URL (e.g. http://127.0.0.1:9000) +//! FULA_JWT bearer for the master (HS256 with the master's secret) +//! FULA_INGEST fula-ingest base URL (e.g. http://127.0.0.1:3601) +//! FULA_BIG=1 also run the ≥1 GiB case (writes a temp file) +//! +//! Run: cargo test -p fula-client --test live_ingest_e2e --release -- --ignored --nocapture +//! +//! The runner script (fula-ota tests/e2e/phase-2/60-fidelity.sh) asserts the +//! ingest container's accepted-block log count INCREASES across this test — +//! the server-side proof that bytes flowed via the ingest node, not the +//! gateway. + +#![cfg(not(target_arch = "wasm32"))] + +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; + +fn env_or_skip(key: &str) -> Option { + match std::env::var(key) { + Ok(v) if !v.is_empty() => Some(v), + _ => { + eprintln!("SKIP: {key} not set"); + None + } + } +} + +fn client(s3: &str, jwt: &str, ingest: Option<&str>, v8: bool) -> EncryptedClient { + let mut config = Config::new(s3).with_token(jwt); + config.walkable_v8_writer_enabled = v8; + if let Some(i) = ingest { + config.ingest_endpoints = vec![i.to_string()]; + } + let secret = SecretKey::generate(); + EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)).expect("client") +} + +fn payload(len: usize) -> Vec { + (0..len).map(|i| ((i * 31 + 7) % 251) as u8).collect() +} + +/// Chunked upload routed via the ingest node, read back through the master — +/// full live round-trip of the Phase 2 byte path (client CID on). +#[tokio::test] +#[ignore] +async fn live_chunked_via_ingest_round_trip() { + let (Some(s3), Some(jwt), Some(ingest)) = ( + env_or_skip("FULA_S3"), + env_or_skip("FULA_JWT"), + env_or_skip("FULA_INGEST"), + ) else { + return; + }; + let c = client(&s3, &jwt, Some(&ingest), true); + let bucket = "p2-live-ingest"; + let key = "/ingest/round-trip.bin"; + let data = payload(1_500_000); // ~6 chunks at the 256 KiB default + + c.put_object_chunked(bucket, key, &data, None) + .await + .expect("chunked upload via ingest route"); + + let got = c + .get_object(bucket, key) + .await + .expect("download after ingest-routed upload"); + assert_eq!(got.len(), data.len(), "length mismatch"); + assert_eq!( + blake3::hash(&got), + blake3::hash(&data), + "content mismatch after ingest-routed round trip" + ); +} + +/// Same upload with the v8 writer OFF — the ingest route must self-disable +/// (no pre-computed CIDs to declare) and the legacy path must round-trip. +#[tokio::test] +#[ignore] +async fn live_chunked_v8_off_legacy_round_trip() { + let (Some(s3), Some(jwt)) = (env_or_skip("FULA_S3"), env_or_skip("FULA_JWT")) else { + return; + }; + let ingest = std::env::var("FULA_INGEST").ok(); + let c = client(&s3, &jwt, ingest.as_deref(), false); + let bucket = "p2-live-legacy"; + let key = "/legacy/round-trip.bin"; + let data = payload(1_200_000); + + c.put_object_chunked(bucket, key, &data, None) + .await + .expect("chunked upload, v8 off"); + let got = c.get_object(bucket, key).await.expect("download"); + assert_eq!(blake3::hash(&got), blake3::hash(&data)); +} + +/// ≥1 GiB chunked upload via the ingest route (scale invariant: thousands of +/// chunks, bounded memory on the service side). Gated on FULA_BIG=1. +#[tokio::test] +#[ignore] +async fn live_1gib_chunked_via_ingest() { + if std::env::var("FULA_BIG").ok().as_deref() != Some("1") { + eprintln!("SKIP: FULA_BIG != 1"); + return; + } + let (Some(s3), Some(jwt), Some(ingest)) = ( + env_or_skip("FULA_S3"), + env_or_skip("FULA_JWT"), + env_or_skip("FULA_INGEST"), + ) else { + return; + }; + let c = client(&s3, &jwt, Some(&ingest), true); + let bucket = "p2-live-big"; + let key = "/big/one-gib.bin"; + + const GIB: usize = 1 << 30; + let data = payload(GIB); // ~4096 chunks at 256 KiB + let want = blake3::hash(&data); + + let started = std::time::Instant::now(); + c.put_object_chunked(bucket, key, &data, None) + .await + .expect("1 GiB chunked upload via ingest"); + eprintln!("1 GiB upload took {:?}", started.elapsed()); + drop(data); + + let got = c.get_object(bucket, key).await.expect("1 GiB download"); + assert_eq!(got.len(), GIB); + assert_eq!(blake3::hash(&got), want, "1 GiB content mismatch"); +} From b06b088d7653672edec3ae8585a7fad226a66ed3 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Fri, 12 Jun 2026 00:22:16 -0400 Subject: [PATCH 07/12] fix(ingress): bound the remote-cid presence check (5s) - absent cids must 409 fast kubo block lookup for an ABSENT cid is an unbounded bitswap/DHT search; the mapping-PUT handler hung for minutes per missing block (caught by drill I10: curl timed out). Timeout => treat as absent => retryable 409, client falls back to a full-bytes PUT. 5s amply covers a bitswap pull from the ingest peer. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-cli/src/handlers/object.rs | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/crates/fula-cli/src/handlers/object.rs b/crates/fula-cli/src/handlers/object.rs index aa45730..262fd80 100644 --- a/crates/fula-cli/src/handlers/object.rs +++ b/crates/fula-cli/src/handlers/object.rs @@ -165,21 +165,39 @@ pub async fn put_object( "remote cid must be raw (0x55) + blake3 (0x1e) — the addressing fula-ingest verifies", )); } - match state.block_store.has_block(&declared_cid).await { - Ok(true) => {} - Ok(false) => { + // BOUNDED presence check: kubo's block lookup for an ABSENT cid is an + // unbounded network search (bitswap/DHT) — without a deadline this + // handler would hang for minutes per missing block. 5s is ample for a + // bitswap pull from the ingest peer (same box: instant; LAN/WAN peer: + // one round-trip); timeout ⇒ treat as absent ⇒ retryable 409, the + // client falls back to a full-bytes PUT. + match tokio::time::timeout( + std::time::Duration::from_secs(5), + state.block_store.has_block(&declared_cid), + ) + .await + { + Ok(Ok(true)) => {} + Ok(Ok(false)) => { return Err(ApiError::s3( S3ErrorCode::OperationAborted, "declared block not present/retrievable yet — retry or fall back to a full-bytes PUT", )); } - Err(e) => { + Ok(Err(e)) => { tracing::warn!(error = %e, cid = %declared_cid, "remote-cid presence check failed"); return Err(ApiError::s3( S3ErrorCode::OperationAborted, "block presence check failed — retry or fall back to a full-bytes PUT", )); } + Err(_elapsed) => { + tracing::debug!(cid = %declared_cid, "remote-cid presence check timed out (treating as absent)"); + return Err(ApiError::s3( + S3ErrorCode::OperationAborted, + "block presence check timed out — retry or fall back to a full-bytes PUT", + )); + } } tracing::debug!(bucket = %bucket_name, key = %key, cid = %declared_cid, "remote-cid PUT accepted (bytes via ingest)"); declared_cid From 754a41702d116c6b628f8e2b331e9f93016cd979 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Fri, 12 Jun 2026 08:20:22 -0400 Subject: [PATCH 08/12] fix(test): live e2e uses the FxFiles pairing (put_object_flat/get_object_flat) get_object does not exist on EncryptedClient; the canonical pairing is put_object_flat (which dispatches >768KiB payloads into put_object_chunked_internal - the route under test) + get_object_flat, exactly what offline_e2e/FxFiles use. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-client/tests/live_ingest_e2e.rs | 22 +++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/crates/fula-client/tests/live_ingest_e2e.rs b/crates/fula-client/tests/live_ingest_e2e.rs index da8b818..7dc1dce 100644 --- a/crates/fula-client/tests/live_ingest_e2e.rs +++ b/crates/fula-client/tests/live_ingest_e2e.rs @@ -57,14 +57,16 @@ async fn live_chunked_via_ingest_round_trip() { let c = client(&s3, &jwt, Some(&ingest), true); let bucket = "p2-live-ingest"; let key = "/ingest/round-trip.bin"; - let data = payload(1_500_000); // ~6 chunks at the 256 KiB default + // >768 KiB → put_object_flat dispatches into put_object_chunked_internal + // (the FxFiles photo/video path) where the ingest route lives. + let data = payload(1_500_000); - c.put_object_chunked(bucket, key, &data, None) + c.put_object_flat(bucket, key, data.clone(), Some("application/octet-stream")) .await - .expect("chunked upload via ingest route"); + .expect("chunked flat upload via ingest route"); let got = c - .get_object(bucket, key) + .get_object_flat(bucket, key) .await .expect("download after ingest-routed upload"); assert_eq!(got.len(), data.len(), "length mismatch"); @@ -89,10 +91,10 @@ async fn live_chunked_v8_off_legacy_round_trip() { let key = "/legacy/round-trip.bin"; let data = payload(1_200_000); - c.put_object_chunked(bucket, key, &data, None) + c.put_object_flat(bucket, key, data.clone(), Some("application/octet-stream")) .await - .expect("chunked upload, v8 off"); - let got = c.get_object(bucket, key).await.expect("download"); + .expect("chunked flat upload, v8 off"); + let got = c.get_object_flat(bucket, key).await.expect("download"); assert_eq!(blake3::hash(&got), blake3::hash(&data)); } @@ -121,13 +123,13 @@ async fn live_1gib_chunked_via_ingest() { let want = blake3::hash(&data); let started = std::time::Instant::now(); - c.put_object_chunked(bucket, key, &data, None) + c.put_object_flat(bucket, key, data.clone(), Some("application/octet-stream")) .await - .expect("1 GiB chunked upload via ingest"); + .expect("1 GiB chunked flat upload via ingest"); eprintln!("1 GiB upload took {:?}", started.elapsed()); drop(data); - let got = c.get_object(bucket, key).await.expect("1 GiB download"); + let got = c.get_object_flat(bucket, key).await.expect("1 GiB download"); assert_eq!(got.len(), GIB); assert_eq!(blake3::hash(&got), want, "1 GiB content mismatch"); } From e28438eb9c2e8111259c432dcc7cd9d18f6b7c78 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Fri, 12 Jun 2026 14:01:17 -0400 Subject: [PATCH 09/12] fix(test): 600s client timeout for the contended-box 1 GiB leg 16 parallel chunk commits convoy on the per-bucket write lock (each including a registry persist+pin); under heavy e2e-box load the queue tail exceeded the 30s default and one mapping PUT TimedOut, aborting the batch after ~4k uploads + 2h of orphan cleanup. Real deployments commit in ~100ms - this is test headroom, not a product setting. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-client/tests/live_ingest_e2e.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/fula-client/tests/live_ingest_e2e.rs b/crates/fula-client/tests/live_ingest_e2e.rs index 7dc1dce..2188a6f 100644 --- a/crates/fula-client/tests/live_ingest_e2e.rs +++ b/crates/fula-client/tests/live_ingest_e2e.rs @@ -30,6 +30,11 @@ fn env_or_skip(key: &str) -> Option { fn client(s3: &str, jwt: &str, ingest: Option<&str>, v8: bool) -> EncryptedClient { let mut config = Config::new(s3).with_token(jwt); + // The contended e2e box serializes 16 parallel chunk commits through the + // per-bucket write lock (each including a registry persist+pin); under + // load the queue tail exceeds the 30s default. Real deployments commit in + // ~100ms — this is test-box headroom, not a product setting. + config.timeout = std::time::Duration::from_secs(600); config.walkable_v8_writer_enabled = v8; if let Some(i) = ingest { config.ingest_endpoints = vec![i.to_string()]; From 7bd20e1bc300c5145302b527635f70136a9631ed Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Fri, 12 Jun 2026 19:33:46 -0400 Subject: [PATCH 10/12] fix(test): move the 1 GiB payload into the upload - clone doubled RSS and OOM-killed run #4 dmesg: oom-kill live_ingest_e2e total-vm 3.26GB anon-rss 3.18GB on the 7.8GB e2e box. The blake3 want-hash captured before the move carries everything verification needs. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-client/tests/live_ingest_e2e.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/fula-client/tests/live_ingest_e2e.rs b/crates/fula-client/tests/live_ingest_e2e.rs index 2188a6f..12cdd7d 100644 --- a/crates/fula-client/tests/live_ingest_e2e.rs +++ b/crates/fula-client/tests/live_ingest_e2e.rs @@ -128,11 +128,13 @@ async fn live_1gib_chunked_via_ingest() { let want = blake3::hash(&data); let started = std::time::Instant::now(); - c.put_object_flat(bucket, key, data.clone(), Some("application/octet-stream")) + // MOVE the payload — holding payload + a clone is >2 GiB in this one + // process and OOM-kills the run on the 7.8 GB e2e box (run #4, SIGKILL). + // `want` already carries everything the verification needs. + c.put_object_flat(bucket, key, data, Some("application/octet-stream")) .await .expect("1 GiB chunked flat upload via ingest"); eprintln!("1 GiB upload took {:?}", started.elapsed()); - drop(data); let got = c.get_object_flat(bucket, key).await.expect("1 GiB download"); assert_eq!(got.len(), GIB); From 174634dfbfeb30ae12c5d478460ef2779899e4ad Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Fri, 12 Jun 2026 21:50:47 -0400 Subject: [PATCH 11/12] gateway: force no-cache on mutable forest index objects Browsers HEURISTICALLY cache GET responses that carry Last-Modified but no Cache-Control (RFC 7234 4.2.2) - and the gateway only sent Cache-Control when per-object metadata had one, which uploads never set. The sharded forest index objects live at FIXED keys and are overwritten in place, so web (wasm) clients - whose fetches go through the browser HTTP cache - kept reading a stale forest root for minutes after another device wrote, while native apps (no HTTP cache) saw fresh state. Real two-client repro via FxFiles web. GET and HEAD now force Cache-Control: private, no-cache for mutable __fula_* keys (everything except the immutable-by- construction families: content-addressed v7 nodes/pages and timestamped v1 backups). no-cache, not no-store: the handler already answers If-None-Match with 304 and ETags are exact CIDs, so an unchanged index costs one conditional round trip and a same-second overwrite cannot false-304. Per-object metadata Cache-Control still applies to everything else. Unit tests cover the key classification. Co-Authored-By: Claude Fable 5 --- crates/fula-cli/src/handlers/object.rs | 60 +++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/crates/fula-cli/src/handlers/object.rs b/crates/fula-cli/src/handlers/object.rs index 262fd80..c4a75ea 100644 --- a/crates/fula-cli/src/handlers/object.rs +++ b/crates/fula-cli/src/handlers/object.rs @@ -959,7 +959,18 @@ pub async fn get_object( response = response.header("Content-Type", ct); } - if let Some(ref cc) = metadata.cache_control { + if is_mutable_index_key(&key) { + // Forced for the fixed-key forest index objects, overriding any + // per-object metadata: browsers HEURISTICALLY cache responses + // that carry Last-Modified but no Cache-Control, so web (wasm) + // clients kept reading a stale forest root for minutes after + // another device's upload — native apps (no HTTP cache) were + // fine. `no-cache` (not no-store) forces revalidation on every + // use; the If-None-Match → 304 path above makes an unchanged + // index cost one conditional round trip, and the ETag is the + // exact CID so a same-second overwrite can't false-304. + response = response.header("Cache-Control", "private, no-cache"); + } else if let Some(ref cc) = metadata.cache_control { response = response.header("Cache-Control", cc); } @@ -1189,6 +1200,12 @@ pub async fn head_object( response = response.header("Content-Type", ct); } + if is_mutable_index_key(&key) { + // Same forced revalidation as the GET path — HEAD responses + // update the browser's cached entry metadata too. + response = response.header("Cache-Control", "private, no-cache"); + } + // Add user metadata for (k, v) in &metadata.user_metadata { response = response.header(format!("x-amz-meta-{}", k), v); @@ -1450,6 +1467,26 @@ pub(crate) fn match_if_match(header: &str, current: Option<&str>) -> bool { result } +/// Mutable fula-internal index objects live at FIXED keys and are +/// overwritten in place — the sharded forest index / dir-index +/// (`__fula_forest_v7_index`, `__fula_forest_v7_dir_index`) and any +/// legacy fixed-key forest blob. Their GET/HEAD responses must force +/// `Cache-Control: private, no-cache`, or browsers heuristically cache +/// them (Last-Modified present, no Cache-Control) and wasm clients read +/// a stale forest root for minutes after another device's write. +/// +/// Immutable-by-construction families stay normally cacheable: their +/// key never carries different bytes (content-addressed nodes/pages, +/// timestamped v1 backups). +pub(crate) fn is_mutable_index_key(key: &str) -> bool { + if !key.starts_with("__fula_") { + return false; + } + !(key.starts_with("__fula_forest_v7_nodes/") + || key.starts_with("__fula_forest_v7_pages/") + || key.starts_with("__fula_forest_v1_backup/")) +} + /// RFC 7232 §3.2. True iff the If-None-Match precondition is satisfied. pub(crate) fn match_if_none_match(header: &str, current: Option<&str>) -> bool { let h = header.trim(); @@ -1737,7 +1774,7 @@ mod phase_1_2_wire_tests { #[cfg(test)] mod conditional_tests { - use super::{match_if_match, match_if_none_match}; + use super::{is_mutable_index_key, match_if_match, match_if_none_match}; #[test] fn if_match_star_requires_existing() { @@ -1782,6 +1819,25 @@ mod conditional_tests { assert!(match_if_none_match("W/\"abc\"", Some("abc"))); } + #[test] + fn mutable_index_keys_force_no_cache() { + // Fixed-key, overwritten-in-place index objects → forced. + assert!(is_mutable_index_key("__fula_forest_v7_index")); + assert!(is_mutable_index_key("__fula_forest_v7_dir_index")); + assert!(is_mutable_index_key("__fula_forest_v1")); + // Unknown future __fula_* state defaults to forced (safe bias). + assert!(is_mutable_index_key("__fula_something_new")); + // Immutable families keep normal caching. + assert!(!is_mutable_index_key("__fula_forest_v7_nodes/abc123")); + assert!(!is_mutable_index_key("__fula_forest_v7_pages/0001")); + assert!(!is_mutable_index_key( + "__fula_forest_v1_backup/1700000000000" + )); + // User content is untouched. + assert!(!is_mutable_index_key("photos/cat.jpg")); + assert!(!is_mutable_index_key(".fula/tags/abcd.json")); + } + #[test] fn empty_or_whitespace_header() { assert!(!match_if_match("", Some("abc"))); From fd501ef036d0ccfd4d1bc68f6185a3c2e62eee14 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Mon, 15 Jun 2026 11:22:56 -0400 Subject: [PATCH 12/12] test(ingress): large-file leg = FULA_BIG_MB MiB (default 512), env-scalable to 1 GiB Per the descope decision: the 7.8GB/4-core e2e box swap-thrashes 12h+ on a full 1 GiB (product buffers whole file in RAM via fs::read->Bytes; 4096 per-chunk metadata commits serialize on the bucket lock). 512 MiB (~2048 chunks) proves the same multi-thousand-chunk streaming + ingest scale in ~30-40 min here; FULA_BIG_MB=1024 runs the literal >=1GiB on a prod-class box. Product full-file-buffering memory model flagged for prod sizing. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-client/tests/live_ingest_e2e.rs | 43 ++++++++++++++------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/crates/fula-client/tests/live_ingest_e2e.rs b/crates/fula-client/tests/live_ingest_e2e.rs index 12cdd7d..3780aef 100644 --- a/crates/fula-client/tests/live_ingest_e2e.rs +++ b/crates/fula-client/tests/live_ingest_e2e.rs @@ -103,11 +103,20 @@ async fn live_chunked_v8_off_legacy_round_trip() { assert_eq!(blake3::hash(&got), blake3::hash(&data)); } -/// ≥1 GiB chunked upload via the ingest route (scale invariant: thousands of -/// chunks, bounded memory on the service side). Gated on FULA_BIG=1. +/// Large-file chunked upload via the ingest route (scale invariant: many +/// thousands of chunks, bounded service-side memory). Gated on FULA_BIG=1. +/// +/// Size = `FULA_BIG_MB` MiB (default 512 ≈ 2048 chunks at 256 KiB). The +/// 7.8 GB / 4-core e2e box swap-thrashes for 12h+ on a full 1 GiB because the +/// product buffers a whole file in RAM (`put_flat_from_path` → fs::read → +/// one Bytes) AND every chunk's metadata commit serializes through the +/// per-bucket lock; 512 MiB proves the same multi-thousand-chunk streaming + +/// ingest path in ~30-40 min here. To validate the literal ≥1 GiB on a +/// prod-class box: `FULA_BIG_MB=1024` (or higher). The product memory model +/// (full-file buffering) is flagged for prod large-file sizing. #[tokio::test] #[ignore] -async fn live_1gib_chunked_via_ingest() { +async fn live_large_file_chunked_via_ingest() { if std::env::var("FULA_BIG").ok().as_deref() != Some("1") { eprintln!("SKIP: FULA_BIG != 1"); return; @@ -119,24 +128,30 @@ async fn live_1gib_chunked_via_ingest() { ) else { return; }; + let size_mb: usize = std::env::var("FULA_BIG_MB") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(512); + let size = size_mb * 1024 * 1024; + let c = client(&s3, &jwt, Some(&ingest), true); let bucket = "p2-live-big"; - let key = "/big/one-gib.bin"; + let key = "/big/large.bin"; - const GIB: usize = 1 << 30; - let data = payload(GIB); // ~4096 chunks at 256 KiB + let data = payload(size); // size/256KiB chunks let want = blake3::hash(&data); + let len = data.len(); let started = std::time::Instant::now(); - // MOVE the payload — holding payload + a clone is >2 GiB in this one - // process and OOM-kills the run on the 7.8 GB e2e box (run #4, SIGKILL). - // `want` already carries everything the verification needs. + // MOVE the payload — holding payload + a clone doubles RSS and OOM-kills + // the run on this box (run #4, SIGKILL). `want`+`len` carry everything + // verification needs. c.put_object_flat(bucket, key, data, Some("application/octet-stream")) .await - .expect("1 GiB chunked flat upload via ingest"); - eprintln!("1 GiB upload took {:?}", started.elapsed()); + .unwrap_or_else(|e| panic!("{size_mb} MiB chunked flat upload via ingest: {e:?}")); + eprintln!("{size_mb} MiB upload took {:?} (~{} chunks)", started.elapsed(), len / (256 * 1024)); - let got = c.get_object_flat(bucket, key).await.expect("1 GiB download"); - assert_eq!(got.len(), GIB); - assert_eq!(blake3::hash(&got), want, "1 GiB content mismatch"); + let got = c.get_object_flat(bucket, key).await.expect("large-file download"); + assert_eq!(got.len(), len); + assert_eq!(blake3::hash(&got), want, "large-file content mismatch"); }