From 1be84cc912505902bd53017bb2ea5056f45d5d44 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 11 Jun 2026 19:34:41 -0400 Subject: [PATCH 01/15] feat(docker): reproducible fula-gateway image for federated masters Multi-stage build of the fula-gateway bin (cargo build --release -p fula-cli) on rust:1-bookworm with a slim Debian runtime. Env-driven config only; /data volume for durable state (pin queue, registry CID). Consumed by the federated-master installer in functionland/pinning-service (gateway compose profile, auto-enabled when the image exists). Closes #29 Co-Authored-By: Claude Fable 5 --- docker/Dockerfile.gateway | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 docker/Dockerfile.gateway diff --git a/docker/Dockerfile.gateway b/docker/Dockerfile.gateway new file mode 100644 index 0000000..6147e88 --- /dev/null +++ b/docker/Dockerfile.gateway @@ -0,0 +1,30 @@ +# fula-gateway — S3-compatible gateway (federated-master image). +# +# Build (repo root): docker build -f docker/Dockerfile.gateway -t fula-gateway:latest . +# Used by the federated-master stack: functionland/pinning-service +# docker/master/docker-compose.master.yml ("gateway" profile, auto-enabled by +# update-scripts/join-as-master.sh when this image exists on the box). +# +# All configuration is env-driven (clap): FULA_HOST (0.0.0.0), FULA_PORT +# (9000), IPFS_API_URL, CLUSTER_API_URL, PINNING_SERVICE_ENDPOINT, +# STORAGE_API_URL, JWT_SECRET, ADMIN_JWT_SECRET, FULA_BLOCK_CACHE_MB, +# FULA_PIN_QUEUE_PATH (durable pin queue — production MUST set it to a +# persistent mount), rate limits, etc. See crates/fula-cli/src/main.rs. +FROM rust:1-bookworm AS build +WORKDIR /src +# Workspace build; fula-js (wasm) and fula-flutter are workspace members but +# building only -p fula-cli avoids their toolchains. +COPY . . +RUN cargo build --release -p fula-cli --bin fula-gateway + +FROM debian:bookworm-slim AS runtime +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates netcat-openbsd \ + && rm -rf /var/lib/apt/lists/* +COPY --from=build /src/target/release/fula-gateway /usr/local/bin/fula-gateway +# Persistent state (pin queue, registry CID, block cache) belongs on a volume. +VOLUME ["/data"] +ENV FULA_HOST=0.0.0.0 \ + FULA_PORT=9000 +EXPOSE 9000 +ENTRYPOINT ["fula-gateway"] From ecb342abb8b1bd49bbb84cebf6516445c7839a0f Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 11 Jun 2026 20:51:02 -0400 Subject: [PATCH 02/15] fix(docker): volume at the gateway hardcoded state dir /var/lib/fula-gateway The pin queue, registry CID, and local-retain backlog default to /var/lib/fula-gateway (config.rs); the /data volume was wrong - without a mount there, pin retries/crash recovery silently degrade to fire-and-forget. Found by the federated-master e2e (startup WARNs). Co-Authored-By: Claude Fable 5 --- docker/Dockerfile.gateway | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile.gateway b/docker/Dockerfile.gateway index 6147e88..fabcb30 100644 --- a/docker/Dockerfile.gateway +++ b/docker/Dockerfile.gateway @@ -22,8 +22,12 @@ RUN apt-get update \ && apt-get install -y --no-install-recommends ca-certificates netcat-openbsd \ && rm -rf /var/lib/apt/lists/* COPY --from=build /src/target/release/fula-gateway /usr/local/bin/fula-gateway -# Persistent state (pin queue, registry CID, block cache) belongs on a volume. -VOLUME ["/data"] +# Persistent state lives at HARDCODED /var/lib/fula-gateway (pin_queue.redb, +# registry.cid, local_retain.redb — see crates/fula-cli/src/config.rs defaults). +# Mount a volume here or pin retries/crash recovery silently degrade to +# fire-and-forget and the bucket registry resets on restart. +RUN mkdir -p /var/lib/fula-gateway +VOLUME ["/var/lib/fula-gateway"] ENV FULA_HOST=0.0.0.0 \ FULA_PORT=9000 EXPOSE 9000 From 9fbc069e3d76dce24108b31ddf2de48ad07aa1dc Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 11 Jun 2026 22:47:43 -0400 Subject: [PATCH 03/15] feat(ingress): Phase 2 - ingest byte route with capability probe + remote-cid mapping PUTs Gateway (flag FULA_REMOTE_CID_PUT, default OFF): - GET /fula/capabilities (public): advertises {remoteCidPut} so clients probe before using optional protocols - an old/flag-off master is never sent an empty-body PUT it would misstore as a real zero-byte chunk - put_object: empty-body PUT with x-amz-meta-fula-remote-cid records only the key->cid mapping after validating the CID is raw(0x55)+blake3(0x1e) and the block is PRESENT in the blockstore (absent => retryable 409 so the client falls back to full bytes); x-amz-meta-fula-remote-size carries the chunk's true byte count so billing/list never see 0; both headers added to the control-header filter (never persisted) Client (Config.ingest_endpoints, default empty = byte-identical legacy): - supports_remote_cid_put(): one-shot capability probe, ANY failure=false - put_object_chunked_internal: per chunk, bytes stream to the ingest node (PUT /v0/block?cid= - the node verifies before storing), then the mapping PUT (with pinning variant when creds present, keeping per-object pin requests); ANY failure on the route falls back to the legacy full-bytes PUT. Probe once per upload; native-only (wasm32 keeps legacy path); ETag=cid keeps the existing self-verify unchanged Part of #31 (cross-repo: functionland/fula-ota#74 fula-ingest) Co-Authored-By: Claude Fable 5 --- crates/fula-cli/src/config.rs | 9 +++ crates/fula-cli/src/handlers/object.rs | 89 ++++++++++++++++++++++++- crates/fula-cli/src/handlers/service.rs | 11 +++ crates/fula-cli/src/main.rs | 7 ++ crates/fula-cli/src/routes.rs | 7 +- crates/fula-client/src/client.rs | 18 +++++ crates/fula-client/src/config.rs | 21 ++++++ crates/fula-client/src/encryption.rs | 78 ++++++++++++++++++++++ 8 files changed, 236 insertions(+), 4 deletions(-) diff --git a/crates/fula-cli/src/config.rs b/crates/fula-cli/src/config.rs index 0720620..2671f74 100644 --- a/crates/fula-cli/src/config.rs +++ b/crates/fula-cli/src/config.rs @@ -58,6 +58,14 @@ pub struct GatewayConfig { pub cors_origins: Vec, /// Path to store bucket registry CID for persistence pub registry_cid_path: Option, + /// Phase 2 (decentralized ingress): accept empty-body chunk PUTs carrying + /// `x-amz-meta-fula-remote-cid` — the chunk's verified bytes were already + /// streamed to a fula-ingest node; this master records only the + /// key→cid mapping (after confirming block presence via the blockstore). + /// Advertised to clients via GET /fula/capabilities so old/flag-off + /// masters are never sent the protocol. Default OFF. + #[serde(default)] + pub remote_cid_put_enabled: bool, /// Storage API URL for balance/quota checking before uploads pub storage_api_url: Option, /// Admin JWT secret for admin API authentication (separate from user JWT) @@ -198,6 +206,7 @@ impl Default for GatewayConfig { cors_enabled: true, cors_origins: vec!["*".to_string()], registry_cid_path: Some("/var/lib/fula-gateway/registry.cid".to_string()), + remote_cid_put_enabled: false, storage_api_url: None, admin_jwt_secret: None, admin_api_enabled: false, diff --git a/crates/fula-cli/src/handlers/object.rs b/crates/fula-cli/src/handlers/object.rs index 6aa9c9b..aa45730 100644 --- a/crates/fula-cli/src/handlers/object.rs +++ b/crates/fula-cli/src/handlers/object.rs @@ -129,8 +129,86 @@ pub async fn put_object( } } - // Store the data - let cid = state.block_store.put_block(&body).await?; + // Phase 2 (decentralized ingress): empty-body mapping PUT. The chunk's + // verified bytes were already streamed to a fula-ingest node; record only + // the key→cid mapping here. Triple-gated: server flag (advertised via + // /fula/capabilities, so well-behaved clients never send this to a + // flag-off master), the header, and an empty body. The declared CID must + // be raw(0x55)+blake3(0x1e) — the only addressing the ingest verifies — + // and the block must be PRESENT in the blockstore (bitswap pulls it from + // the ingest peer); absent ⇒ retryable 409 so the client falls back to a + // full-bytes PUT. Storing nothing here is the point: the CID is + // self-certifying, so reads stay tamper-evident end-to-end. + let remote_cid_hdr = headers + .get("x-amz-meta-fula-remote-cid") + .and_then(|v| v.to_str().ok()) + .map(str::to_owned); + let cid = if let Some(declared) = remote_cid_hdr { + if !state.config.remote_cid_put_enabled { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "remote-cid PUT is not enabled on this master (probe /fula/capabilities)", + )); + } + if !body.is_empty() { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "remote-cid PUT requires an empty body", + )); + } + let declared_cid = cid::Cid::try_from(declared.as_str()).map_err(|e| { + ApiError::s3(S3ErrorCode::InvalidRequest, format!("invalid remote cid: {e}")) + })?; + if declared_cid.codec() != 0x55 || declared_cid.hash().code() != 0x1e { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "remote cid must be raw (0x55) + blake3 (0x1e) — the addressing fula-ingest verifies", + )); + } + match state.block_store.has_block(&declared_cid).await { + Ok(true) => {} + Ok(false) => { + return Err(ApiError::s3( + S3ErrorCode::OperationAborted, + "declared block not present/retrievable yet — retry or fall back to a full-bytes PUT", + )); + } + Err(e) => { + tracing::warn!(error = %e, cid = %declared_cid, "remote-cid presence check failed"); + return Err(ApiError::s3( + S3ErrorCode::OperationAborted, + "block presence check failed — retry or fall back to a full-bytes PUT", + )); + } + } + tracing::debug!(bucket = %bucket_name, key = %key, cid = %declared_cid, "remote-cid PUT accepted (bytes via ingest)"); + declared_cid + } else { + // Store the data + state.block_store.put_block(&body).await? + }; + // Phase 2: a mapping PUT has an empty body — the chunk's TRUE size (as + // stored on the ingest node) arrives in x-amz-meta-fula-remote-size so + // billing/list/stat record the real byte count, never 0. Only honored on + // the remote-cid path; bounded by max_body_size like a real body. + let object_size: u64 = match ( + headers.get("x-amz-meta-fula-remote-size").and_then(|v| v.to_str().ok()), + headers.get("x-amz-meta-fula-remote-cid").is_some(), + ) { + (Some(sz), true) => { + let parsed = sz.parse::().map_err(|_| { + ApiError::s3(S3ErrorCode::InvalidRequest, "invalid x-amz-meta-fula-remote-size") + })?; + if parsed == 0 || parsed > state.config.max_body_size as u64 { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "x-amz-meta-fula-remote-size out of range", + )); + } + parsed + } + _ => body.len() as u64, + }; // Use CID as ETag (content-addressed identifier) // This is S3-compliant: AWS docs state "The ETag may or may not be an MD5 digest" @@ -165,7 +243,7 @@ pub async fn put_object( // COPY handler at line 694). Access control is unaffected — bucket- // level access is gated by `BucketMetadata.owner_id` which is // already correctly the hashed form. - let mut metadata = ObjectMetadata::new(cid, body.len() as u64, etag.clone()) + let mut metadata = ObjectMetadata::new(cid, object_size, etag.clone()) .with_owner(&session.hashed_user_id); if let Some(ct) = content_type { @@ -1399,6 +1477,11 @@ pub(crate) const FULA_CONTROL_HEADERS: &[&str] = &[ // as user_metadata on the object itself (would pollute every // forest-manifest object with a meaningless `=1` tag). "fula-forest-manifest", + // Phase 2 (decentralized ingress): remote-cid mapping PUT controls — + // consumed by the handler (declared CID + true byte size of the chunk + // stored on the ingest node); never persisted as object metadata. + "fula-remote-cid", + "fula-remote-size", ]; /// Returns `true` if the given x-amz-meta key (already stripped of diff --git a/crates/fula-cli/src/handlers/service.rs b/crates/fula-cli/src/handlers/service.rs index d8e70b4..15e34f8 100644 --- a/crates/fula-cli/src/handlers/service.rs +++ b/crates/fula-cli/src/handlers/service.rs @@ -46,3 +46,14 @@ pub async fn health_check() -> impl IntoResponse { pub async fn healthz() -> impl IntoResponse { (StatusCode::OK, "ok") } + +/// GET /fula/capabilities — unauthenticated protocol-capability advertisement +/// (Phase 2). Clients probe this ONCE per instance before using optional +/// protocols. An old master 404s here, so a client never sends e.g. the +/// empty-body remote-cid PUT to a build that would misstore it as a real +/// zero-byte object. +pub async fn capabilities(State(state): State>) -> impl IntoResponse { + axum::Json(serde_json::json!({ + "remoteCidPut": state.config.remote_cid_put_enabled, + })) +} diff --git a/crates/fula-cli/src/main.rs b/crates/fula-cli/src/main.rs index bd1a8c1..ca683ad 100644 --- a/crates/fula-cli/src/main.rs +++ b/crates/fula-cli/src/main.rs @@ -92,6 +92,12 @@ struct Args { /// the local-retain backlog (the ongoing per-upload protection still runs). #[arg(long, env = "FULA_NO_LOCAL_RETAIN_BACKFILL")] no_local_retain_backfill: bool, + + /// Phase 2 (decentralized ingress): accept empty-body chunk PUTs carrying + /// x-amz-meta-fula-remote-cid (bytes pre-stored on a fula-ingest node). + /// Advertised to clients via GET /fula/capabilities. Default OFF. + #[arg(long, env = "FULA_REMOTE_CID_PUT")] + remote_cid_put: bool, } #[tokio::main] @@ -163,6 +169,7 @@ async fn main() -> anyhow::Result<()> { cluster_peering_enabled: if args.no_cluster_peering { Some(false) } else { None }, local_retain_enabled: if args.no_local_retain { Some(false) } else { None }, local_retain_backfill: if args.no_local_retain_backfill { Some(false) } else { None }, + remote_cid_put_enabled: args.remote_cid_put, ..Default::default() }; diff --git a/crates/fula-cli/src/routes.rs b/crates/fula-cli/src/routes.rs index 09b256d..cd7c054 100644 --- a/crates/fula-cli/src/routes.rs +++ b/crates/fula-cli/src/routes.rs @@ -24,7 +24,12 @@ pub fn create_router(state: Arc) -> Router { let cors = create_cors_layer(&state.config.cors_origins); // Public routes that must bypass auth (e.g., container health checks) - let public = Router::new().route("/healthz", get(handlers::healthz)); + let public = Router::new() + .route("/healthz", get(handlers::healthz)) + // Phase 2: capability probe (no auth — the answer is not sensitive and + // clients must be able to probe before any authenticated operation). + .route("/fula/capabilities", get(handlers::capabilities)) + .with_state(state.clone()); // Phase 3.2 internal endpoints. Bearer-token-protected at the // handler level (see handlers::internal::authenticate). They diff --git a/crates/fula-client/src/client.rs b/crates/fula-client/src/client.rs index 1b2086b..f67791a 100644 --- a/crates/fula-client/src/client.rs +++ b/crates/fula-client/src/client.rs @@ -522,6 +522,24 @@ impl FulaClient { }) } + /// Phase 2 (decentralized ingress): one-shot master capability probe. + /// Returns whether the master accepts empty-body remote-cid mapping PUTs + /// (GET /fula/capabilities → {"remoteCidPut": bool}). ANY failure — 404 on + /// an old build, network error, unparseable body — returns `false`, so + /// callers degrade to the legacy full-bytes path, never to a protocol the + /// master would misinterpret. + pub async fn supports_remote_cid_put(&self) -> bool { + match self.request("GET", "/fula/capabilities", None, None, None).await { + Ok(resp) => resp + .json::() + .await + .ok() + .and_then(|v| v.get("remoteCidPut").and_then(|b| b.as_bool())) + .unwrap_or(false), + Err(_) => false, + } + } + /// Put an object with metadata and optional If-Match / If-None-Match guards. /// /// Used by conditional-write paths (e.g. forest flush) to detect concurrent diff --git a/crates/fula-client/src/config.rs b/crates/fula-client/src/config.rs index dfa4980..1660340 100644 --- a/crates/fula-client/src/config.rs +++ b/crates/fula-client/src/config.rs @@ -195,6 +195,18 @@ pub struct Config { /// targeted regressions or backward-compat tests). pub walkable_v8_writer_enabled: bool, + /// Phase 2 (decentralized ingress) — optional fula-ingest endpoints. + /// When non-empty AND `walkable_v8_writer_enabled` AND the master + /// advertises `remoteCidPut` via GET /fula/capabilities (probed once per + /// chunked upload), chunk ciphertext BYTES are streamed to the first + /// ingest endpoint (`PUT /v0/block?cid=` — the node verifies + /// the CID before storing) and the master receives only an empty-body + /// mapping PUT (`x-amz-meta-fula-remote-cid` + true size). ANY failure on + /// that route falls back transparently to the legacy full-bytes PUT. + /// Empty (default) = behavior byte-identical to before. Native-only + /// (ignored on wasm32). + pub ingest_endpoints: Vec, + /// Phase 19 — optional health-status callback. When set, the SDK /// invokes this closure on every Up↔Down transition of the /// master health gate (`MasterHealthEvent::Online` / @@ -330,6 +342,8 @@ impl Default for Config { // flipping master-side gates until SDK adoption reaches the // % they're comfortable with for the pre-v0.6 reader cost. walkable_v8_writer_enabled: true, + // Phase 2 — no ingest nodes by default (legacy byte path). + ingest_endpoints: Vec::new(), // Phase 19 — no callback by default (silent gate). health_callback: None, // E2E plan Phase 4 — Mode B/C signed-entry writer disabled @@ -358,6 +372,13 @@ impl Config { self } + /// Phase 2: route chunk bytes through fula-ingest node(s) when the master + /// supports remote-cid mapping PUTs. Empty = legacy behavior. + pub fn with_ingest_endpoints(mut self, endpoints: Vec) -> Self { + self.ingest_endpoints = endpoints; + self + } + /// Enable encryption pub fn with_encryption(mut self) -> Self { self.encryption_enabled = true; diff --git a/crates/fula-client/src/encryption.rs b/crates/fula-client/src/encryption.rs index 10359a6..97eca51 100644 --- a/crates/fula-client/src/encryption.rs +++ b/crates/fula-client/src/encryption.rs @@ -7083,6 +7083,31 @@ impl EncryptedClient { // (skip_serializing_if keeps it off the wire). let walkable_v8 = self.inner.config().walkable_v8_writer_enabled; + // Phase 2 (decentralized ingress): decide the byte route ONCE per + // upload. The ingest route activates only when (a) ingest endpoints + // are configured, (b) the walkable-v8 writer is on (we need the + // pre-computed chunk CID to declare), and (c) the master ADVERTISES + // remote-cid support via /fula/capabilities — an old or flag-off + // master is never sent the empty-body protocol it would misstore. + // (cfg-gated: native only; wasm32 keeps the legacy path.) + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx: Option<(reqwest::Client, String, Option)> = { + let cfg = self.inner.config(); + if cfg.ingest_endpoints.is_empty() || !walkable_v8 { + None + } else if self.inner.supports_remote_cid_put().await { + tracing::debug!(ingest = %cfg.ingest_endpoints[0], "ingest byte route enabled (master advertises remoteCidPut)"); + Some(( + reqwest::Client::new(), + cfg.ingest_endpoints[0].trim_end_matches('/').to_string(), + cfg.access_token.clone(), + )) + } else { + tracing::debug!("master does not advertise remoteCidPut — ingest route disabled, using legacy byte path"); + None + } + }; + // Upload chunks in parallel with bounded concurrency. Using // futures::stream::buffer_unordered rather than tokio::spawn so the // same code runs on wasm32 (where tokio has no multi-thread runtime). @@ -7111,8 +7136,61 @@ impl EncryptedClient { let bucket = bucket.to_string(); let pinning = pinning.clone(); let chunk_key_ret = chunk_key.clone(); + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx = ingest_ctx.clone(); async move { + // Phase 2: ingest byte route. Bytes go to the fula-ingest node + // (which verifies BLAKE3(body) == declared CID before storing) + // and the master gets an empty-body mapping PUT carrying the + // declared CID + true size. ANY failure on this route falls + // through to the legacy full-bytes PUT below — degradation, + // never corruption (the master only accepts the mapping when + // its flag is on AND the block is present in its blockstore). + #[cfg(not(target_arch = "wasm32"))] + if let (Some((ingest_http, ingest_base, token)), Some(expected)) = + (ingest_ctx.as_ref(), expected_chunk_cid) + { + let url = format!("{}/v0/block?cid={}", ingest_base, expected); + let mut rb = ingest_http.put(&url).body(chunk.ciphertext.clone()); + if let Some(t) = token { + rb = rb.bearer_auth(t); + } + match rb.send().await { + Ok(r) if r.status().is_success() => { + let map_meta = ObjectMetadata::new() + .with_content_type("application/octet-stream") + .with_metadata("x-fula-chunk", "true") + .with_metadata("x-fula-chunk-index", &chunk.index.to_string()) + .with_metadata("fula-remote-cid", &expected.to_string()) + .with_metadata("fula-remote-size", &chunk.ciphertext.len().to_string()); + let map_res = if let Some(ref pin) = pinning { + client.put_object_with_metadata_and_pinning( + &bucket, &chunk_key, bytes::Bytes::new(), Some(map_meta), + &pin.endpoint, &pin.token, + ).await + } else { + client.put_object_with_metadata( + &bucket, &chunk_key, bytes::Bytes::new(), Some(map_meta), + ).await + }; + match map_res { + Ok(put_result) => { + let chunk_cid = crate::walkable_v8::verify_etag_against_expected_cid( + &put_result.etag, expected, &bucket, &chunk_key, + ); + return Ok::<(String, u32, Option), ClientError>(( + chunk_key_ret, chunk_index_for_collect, chunk_cid, + )); + } + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "remote-cid mapping PUT failed — falling back to full-bytes PUT"), + } + } + Ok(r) => tracing::debug!(status = %r.status(), chunk = %chunk_key, "ingest rejected block — falling back to full-bytes PUT"), + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "ingest unreachable — falling back to full-bytes PUT"), + } + } + let put_result = if let Some(ref pin) = pinning { client.put_object_with_metadata_and_pinning( &bucket, From d1ac73f72b4ecd25f12167464bfb55deb2694946 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 11 Jun 2026 22:59:46 -0400 Subject: [PATCH 04/15] test(ingress): wiremock matrix for the ingest byte route Four cases: old master (no capabilities) never receives the new protocol; capable master routes every chunk's bytes through the ingest node (master sees ONLY empty-body mapping PUTs; declared sizes sum to the bytes the ingest stored; ingest asserts declared cid == blake3 of body); dead ingest falls back transparently and the upload succeeds; empty ingest_endpoints is byte-identical legacy. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-client/tests/ingest_route.rs | 273 +++++++++++++++++++++++ 1 file changed, 273 insertions(+) create mode 100644 crates/fula-client/tests/ingest_route.rs diff --git a/crates/fula-client/tests/ingest_route.rs b/crates/fula-client/tests/ingest_route.rs new file mode 100644 index 0000000..3a27386 --- /dev/null +++ b/crates/fula-client/tests/ingest_route.rs @@ -0,0 +1,273 @@ +//! Phase 2 (decentralized ingress) — client routing matrix. +//! +//! Verifies the three load-bearing behaviors of `Config.ingest_endpoints`: +//! +//! 1. **Old master (no /fula/capabilities)** → the ingest route never +//! activates; every chunk goes to the master as a full-bytes PUT +//! (the misstore-on-old-master hazard the capability probe exists for). +//! 2. **Capable master + healthy ingest** → chunk BYTES go to the ingest +//! node; the master receives ONLY empty-body mapping PUTs carrying +//! `x-amz-meta-fula-remote-cid` (the declared blake3 CID) and +//! `x-amz-meta-fula-remote-size` (the true ciphertext size). +//! 3. **Capable master + dead ingest** → transparent fallback: the upload +//! still succeeds and the master sees full-bytes chunk PUTs. +//! +//! Chunk PUTs are identified by the `x-amz-meta-x-fula-chunk: true` header +//! the SDK stamps on every chunk (index objects / forest blobs lack it). + +#![cfg(not(target_arch = "wasm32"))] + +use bytes::Bytes; +use cid::multihash::Multihash; +use cid::Cid; +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate}; + +fn blake3_raw_cid(data: &[u8]) -> Cid { + let h = blake3::hash(data); + let mh = Multihash::<64>::wrap(0x1e, h.as_bytes()).expect("blake3 multihash wrap"); + Cid::new_v1(0x55, mh) +} + +/// Master-side recording responder. Mimics BOTH master generations: +/// for a normal bytes PUT it returns `ETag = blake3(body)` (what every +/// master does); for an empty-body mapping PUT it returns `ETag = +/// declared cid` (what a flag-on master does after its presence check). +/// Records per-class counters the assertions read back. +struct MasterResponder { + chunk_full_bodies: Arc, + chunk_mapping_puts: Arc, + mapping_size_headers: Arc>>, +} + +impl Respond for MasterResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + let is_chunk = req + .headers + .get("x-amz-meta-x-fula-chunk") + .map(|v| v.to_str().unwrap_or("") == "true") + .unwrap_or(false); + let remote_cid = req + .headers + .get("x-amz-meta-fula-remote-cid") + .and_then(|v| v.to_str().ok()) + .map(str::to_owned); + + if let Some(declared) = remote_cid { + // Mapping PUT: must be empty-bodied; echo the declared cid as ETag. + assert!( + req.body.is_empty(), + "mapping PUT must carry an EMPTY body, got {} bytes", + req.body.len() + ); + if is_chunk { + self.chunk_mapping_puts.fetch_add(1, Ordering::SeqCst); + } + if let Some(sz) = req + .headers + .get("x-amz-meta-fula-remote-size") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + { + self.mapping_size_headers.lock().unwrap().push(sz); + } + return ResponseTemplate::new(200).insert_header("ETag", declared.as_str()); + } + + if is_chunk && !req.body.is_empty() { + self.chunk_full_bodies.fetch_add(1, Ordering::SeqCst); + } + let cid = blake3_raw_cid(&req.body); + ResponseTemplate::new(200).insert_header("ETag", cid.to_string()) + } +} + +/// Ingest-side responder: verifies the declared cid matches the body +/// (what the real fula-ingest does) and counts accepted blocks + bytes. +struct IngestResponder { + accepted: Arc, + bytes_seen: Arc, +} + +impl Respond for IngestResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + let declared = req + .url + .query_pairs() + .find(|(k, _)| k == "cid") + .map(|(_, v)| v.to_string()) + .unwrap_or_default(); + let actual = blake3_raw_cid(&req.body).to_string(); + assert_eq!(declared, actual, "client must declare the true blake3 CID"); + self.accepted.fetch_add(1, Ordering::SeqCst); + self.bytes_seen.fetch_add(req.body.len(), Ordering::SeqCst); + ResponseTemplate::new(200) + .set_body_json(serde_json::json!({"cid": declared, "size": req.body.len()})) + } +} + +struct Counters { + chunk_full_bodies: Arc, + chunk_mapping_puts: Arc, + mapping_size_headers: Arc>>, + ingest_accepted: Arc, + ingest_bytes: Arc, +} + +/// Build (master, ingest, counters); `advertise_capability` controls whether +/// the master exposes /fula/capabilities (old vs new build). +async fn setup(advertise_capability: bool) -> (MockServer, MockServer, Counters) { + let master = MockServer::start().await; + let ingest = MockServer::start().await; + + let c = Counters { + chunk_full_bodies: Arc::new(AtomicUsize::new(0)), + chunk_mapping_puts: Arc::new(AtomicUsize::new(0)), + mapping_size_headers: Arc::new(Mutex::new(Vec::new())), + ingest_accepted: Arc::new(AtomicUsize::new(0)), + ingest_bytes: Arc::new(AtomicUsize::new(0)), + }; + + if advertise_capability { + Mock::given(method("GET")) + .and(path("/fula/capabilities")) + .respond_with( + ResponseTemplate::new(200).set_body_json(serde_json::json!({"remoteCidPut": true})), + ) + .mount(&master) + .await; + } + Mock::given(method("PUT")) + .respond_with(MasterResponder { + chunk_full_bodies: c.chunk_full_bodies.clone(), + chunk_mapping_puts: c.chunk_mapping_puts.clone(), + mapping_size_headers: c.mapping_size_headers.clone(), + }) + .mount(&master) + .await; + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(404)) + .mount(&master) + .await; + Mock::given(method("HEAD")) + .respond_with(ResponseTemplate::new(200)) + .mount(&master) + .await; + + Mock::given(method("PUT")) + .respond_with(IngestResponder { + accepted: c.ingest_accepted.clone(), + bytes_seen: c.ingest_bytes.clone(), + }) + .mount(&ingest) + .await; + + (master, ingest, c) +} + +fn client_for(master_uri: &str, ingest_uri: Option<&str>) -> EncryptedClient { + let mut config = Config::new(master_uri).with_token("test-jwt"); + config.walkable_v8_writer_enabled = true; + if let Some(uri) = ingest_uri { + config.ingest_endpoints = vec![uri.to_string()]; + } + let secret = SecretKey::generate(); + EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)).expect("client") +} + +/// 5 chunks of 64 KiB — well past the threshold with an explicit chunk size. +fn payload() -> Vec { + (0..(5 * 64 * 1024)).map(|i| (i % 251) as u8).collect() +} + +#[tokio::test] +async fn old_master_never_receives_the_new_protocol() { + let (master, ingest, c) = setup(false).await; + let client = client_for(&master.uri(), Some(&ingest.uri())); + + client + .put_object_chunked("bucket-a", "/file.bin", &payload(), Some(64 * 1024)) + .await + .expect("upload must succeed via legacy path"); + + assert_eq!( + c.ingest_accepted.load(Ordering::SeqCst), + 0, + "ingest must never be used when the master lacks the capability" + ); + assert_eq!(c.chunk_mapping_puts.load(Ordering::SeqCst), 0); + assert!( + c.chunk_full_bodies.load(Ordering::SeqCst) >= 5, + "all chunks must arrive at the master as full bodies" + ); +} + +#[tokio::test] +async fn capable_master_routes_bytes_through_ingest() { + let (master, ingest, c) = setup(true).await; + let client = client_for(&master.uri(), Some(&ingest.uri())); + + let data = payload(); + client + .put_object_chunked("bucket-b", "/file.bin", &data, Some(64 * 1024)) + .await + .expect("upload must succeed via ingest route"); + + let accepted = c.ingest_accepted.load(Ordering::SeqCst); + let mappings = c.chunk_mapping_puts.load(Ordering::SeqCst); + assert!(accepted >= 5, "chunk bytes must hit the ingest node (got {accepted})"); + assert_eq!( + c.chunk_full_bodies.load(Ordering::SeqCst), + 0, + "no chunk should arrive at the master as a full body" + ); + assert_eq!(accepted, mappings, "every ingest-accepted chunk needs its mapping PUT"); + // The true ciphertext sizes (plaintext + AEAD overhead) must be declared. + let sizes = c.mapping_size_headers.lock().unwrap(); + assert_eq!(sizes.len(), mappings); + assert!(sizes.iter().all(|&s| s > 0), "remote-size must never be 0"); + let total_ingest_bytes = c.ingest_bytes.load(Ordering::SeqCst) as u64; + assert_eq!( + sizes.iter().sum::(), + total_ingest_bytes, + "declared sizes must equal the bytes the ingest node stored" + ); +} + +#[tokio::test] +async fn dead_ingest_falls_back_to_full_bytes() { + let (master, _ingest, c) = setup(true).await; + // Point at a port nothing listens on — connection refused, fast. + let client = client_for(&master.uri(), Some("http://127.0.0.1:9")); + + client + .put_object_chunked("bucket-c", "/file.bin", &payload(), Some(64 * 1024)) + .await + .expect("upload must SUCCEED despite the dead ingest (transparent fallback)"); + + assert_eq!(c.chunk_mapping_puts.load(Ordering::SeqCst), 0); + assert!( + c.chunk_full_bodies.load(Ordering::SeqCst) >= 5, + "fallback must deliver every chunk as a full-bytes PUT" + ); +} + +#[tokio::test] +async fn no_ingest_configured_is_byte_identical_legacy() { + let (master, ingest, c) = setup(true).await; + let client = client_for(&master.uri(), None); + + client + .put_object_chunked("bucket-d", "/file.bin", &payload(), Some(64 * 1024)) + .await + .expect("legacy upload"); + + assert_eq!(c.ingest_accepted.load(Ordering::SeqCst), 0); + assert_eq!(c.chunk_mapping_puts.load(Ordering::SeqCst), 0); + assert!(c.chunk_full_bodies.load(Ordering::SeqCst) >= 5); + drop(ingest); +} From 9493a571cf5a4454f1c5ce7750899f21d1bb6d8f Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 11 Jun 2026 23:07:14 -0400 Subject: [PATCH 05/15] fix(ingress): mirror the ingest route into the public put_object_chunked loop put_object_chunked (the FxFiles-path public API) duplicates the chunk loop of put_object_chunked_internal (E51 mirroring pattern) - the route existed only in the internal one, so public-API uploads silently stayed on the legacy byte path. Caught by the wiremock matrix (positive case: ingest got 0). Same probe gating + fallback semantics. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-client/src/encryption.rs | 63 ++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/crates/fula-client/src/encryption.rs b/crates/fula-client/src/encryption.rs index 97eca51..90d8edb 100644 --- a/crates/fula-client/src/encryption.rs +++ b/crates/fula-client/src/encryption.rs @@ -10593,6 +10593,28 @@ impl EncryptedClient { // master's etag-attested CID matches. let walkable_v8 = self.inner.config().walkable_v8_writer_enabled; + // Phase 2 (decentralized ingress): same probe-gated ingest route as + // `put_object_chunked_internal` — this public API duplicates that + // loop (E51 mirroring pattern), so it must mirror the route too or + // FxFiles-path uploads silently stay on the legacy byte path. + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx: Option<(reqwest::Client, String, Option)> = { + let cfg = self.inner.config(); + if cfg.ingest_endpoints.is_empty() || !walkable_v8 { + None + } else if self.inner.supports_remote_cid_put().await { + tracing::debug!(ingest = %cfg.ingest_endpoints[0], "ingest byte route enabled (master advertises remoteCidPut)"); + Some(( + reqwest::Client::new(), + cfg.ingest_endpoints[0].trim_end_matches('/').to_string(), + cfg.access_token.clone(), + )) + } else { + tracing::debug!("master does not advertise remoteCidPut — ingest route disabled, using legacy byte path"); + None + } + }; + // Upload chunks in parallel with bounded concurrency. Using // futures::stream::buffer_unordered rather than tokio::spawn so the // same code runs on wasm32 (where tokio has no multi-thread runtime). @@ -10622,8 +10644,49 @@ impl EncryptedClient { let client = self.inner.clone(); let bucket = bucket.to_string(); let chunk_key_ret = chunk_key.clone(); + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx = ingest_ctx.clone(); async move { + // Phase 2: ingest byte route (see put_object_chunked_internal + // for the full rationale). ANY failure falls through to the + // legacy full-bytes PUT below. + #[cfg(not(target_arch = "wasm32"))] + if let (Some((ingest_http, ingest_base, token)), Some(expected)) = + (ingest_ctx.as_ref(), expected_chunk_cid) + { + let url = format!("{}/v0/block?cid={}", ingest_base, expected); + let mut rb = ingest_http.put(&url).body(chunk.ciphertext.clone()); + if let Some(t) = token { + rb = rb.bearer_auth(t); + } + match rb.send().await { + Ok(r) if r.status().is_success() => { + let map_meta = ObjectMetadata::new() + .with_content_type("application/octet-stream") + .with_metadata("x-fula-chunk", "true") + .with_metadata("x-fula-chunk-index", &chunk.index.to_string()) + .with_metadata("fula-remote-cid", &expected.to_string()) + .with_metadata("fula-remote-size", &chunk.ciphertext.len().to_string()); + match client.put_object_with_metadata( + &bucket, &chunk_key, bytes::Bytes::new(), Some(map_meta), + ).await { + Ok(put_result) => { + let chunk_cid = crate::walkable_v8::verify_etag_against_expected_cid( + &put_result.etag, expected, &bucket, &chunk_key, + ); + return Ok::<(String, u32, Option), ClientError>(( + chunk_key_ret, chunk_index_for_collect, chunk_cid, + )); + } + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "remote-cid mapping PUT failed — falling back to full-bytes PUT"), + } + } + Ok(r) => tracing::debug!(status = %r.status(), chunk = %chunk_key, "ingest rejected block — falling back to full-bytes PUT"), + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "ingest unreachable — falling back to full-bytes PUT"), + } + } + let put_result = client.put_object_with_metadata( &bucket, &chunk_key, From c910b79c7b1bb0ccb8cd95e02297f3db99e3329a Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 11 Jun 2026 23:12:22 -0400 Subject: [PATCH 06/15] test(ingress): live ingest-route e2e (env-driven, ignored by default) Round-trip via a real master + fula-ingest: chunked upload routed through the ingest node then read back via the master (CID on); v8-off legacy parity leg; FULA_BIG=1 adds the 1 GiB case (~4096 chunks). Consumed by fula-ota tests/e2e/phase-2/60-fidelity.sh which also asserts server-side that kubo block counts grew. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-client/tests/live_ingest_e2e.rs | 133 ++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 crates/fula-client/tests/live_ingest_e2e.rs diff --git a/crates/fula-client/tests/live_ingest_e2e.rs b/crates/fula-client/tests/live_ingest_e2e.rs new file mode 100644 index 0000000..da8b818 --- /dev/null +++ b/crates/fula-client/tests/live_ingest_e2e.rs @@ -0,0 +1,133 @@ +//! Phase 2 — LIVE ingest-route e2e (requires a running master + fula-ingest). +//! +//! Env (skip-without, offline_e2e convention): +//! FULA_S3 master base URL (e.g. http://127.0.0.1:9000) +//! FULA_JWT bearer for the master (HS256 with the master's secret) +//! FULA_INGEST fula-ingest base URL (e.g. http://127.0.0.1:3601) +//! FULA_BIG=1 also run the ≥1 GiB case (writes a temp file) +//! +//! Run: cargo test -p fula-client --test live_ingest_e2e --release -- --ignored --nocapture +//! +//! The runner script (fula-ota tests/e2e/phase-2/60-fidelity.sh) asserts the +//! ingest container's accepted-block log count INCREASES across this test — +//! the server-side proof that bytes flowed via the ingest node, not the +//! gateway. + +#![cfg(not(target_arch = "wasm32"))] + +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; + +fn env_or_skip(key: &str) -> Option { + match std::env::var(key) { + Ok(v) if !v.is_empty() => Some(v), + _ => { + eprintln!("SKIP: {key} not set"); + None + } + } +} + +fn client(s3: &str, jwt: &str, ingest: Option<&str>, v8: bool) -> EncryptedClient { + let mut config = Config::new(s3).with_token(jwt); + config.walkable_v8_writer_enabled = v8; + if let Some(i) = ingest { + config.ingest_endpoints = vec![i.to_string()]; + } + let secret = SecretKey::generate(); + EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)).expect("client") +} + +fn payload(len: usize) -> Vec { + (0..len).map(|i| ((i * 31 + 7) % 251) as u8).collect() +} + +/// Chunked upload routed via the ingest node, read back through the master — +/// full live round-trip of the Phase 2 byte path (client CID on). +#[tokio::test] +#[ignore] +async fn live_chunked_via_ingest_round_trip() { + let (Some(s3), Some(jwt), Some(ingest)) = ( + env_or_skip("FULA_S3"), + env_or_skip("FULA_JWT"), + env_or_skip("FULA_INGEST"), + ) else { + return; + }; + let c = client(&s3, &jwt, Some(&ingest), true); + let bucket = "p2-live-ingest"; + let key = "/ingest/round-trip.bin"; + let data = payload(1_500_000); // ~6 chunks at the 256 KiB default + + c.put_object_chunked(bucket, key, &data, None) + .await + .expect("chunked upload via ingest route"); + + let got = c + .get_object(bucket, key) + .await + .expect("download after ingest-routed upload"); + assert_eq!(got.len(), data.len(), "length mismatch"); + assert_eq!( + blake3::hash(&got), + blake3::hash(&data), + "content mismatch after ingest-routed round trip" + ); +} + +/// Same upload with the v8 writer OFF — the ingest route must self-disable +/// (no pre-computed CIDs to declare) and the legacy path must round-trip. +#[tokio::test] +#[ignore] +async fn live_chunked_v8_off_legacy_round_trip() { + let (Some(s3), Some(jwt)) = (env_or_skip("FULA_S3"), env_or_skip("FULA_JWT")) else { + return; + }; + let ingest = std::env::var("FULA_INGEST").ok(); + let c = client(&s3, &jwt, ingest.as_deref(), false); + let bucket = "p2-live-legacy"; + let key = "/legacy/round-trip.bin"; + let data = payload(1_200_000); + + c.put_object_chunked(bucket, key, &data, None) + .await + .expect("chunked upload, v8 off"); + let got = c.get_object(bucket, key).await.expect("download"); + assert_eq!(blake3::hash(&got), blake3::hash(&data)); +} + +/// ≥1 GiB chunked upload via the ingest route (scale invariant: thousands of +/// chunks, bounded memory on the service side). Gated on FULA_BIG=1. +#[tokio::test] +#[ignore] +async fn live_1gib_chunked_via_ingest() { + if std::env::var("FULA_BIG").ok().as_deref() != Some("1") { + eprintln!("SKIP: FULA_BIG != 1"); + return; + } + let (Some(s3), Some(jwt), Some(ingest)) = ( + env_or_skip("FULA_S3"), + env_or_skip("FULA_JWT"), + env_or_skip("FULA_INGEST"), + ) else { + return; + }; + let c = client(&s3, &jwt, Some(&ingest), true); + let bucket = "p2-live-big"; + let key = "/big/one-gib.bin"; + + const GIB: usize = 1 << 30; + let data = payload(GIB); // ~4096 chunks at 256 KiB + let want = blake3::hash(&data); + + let started = std::time::Instant::now(); + c.put_object_chunked(bucket, key, &data, None) + .await + .expect("1 GiB chunked upload via ingest"); + eprintln!("1 GiB upload took {:?}", started.elapsed()); + drop(data); + + let got = c.get_object(bucket, key).await.expect("1 GiB download"); + assert_eq!(got.len(), GIB); + assert_eq!(blake3::hash(&got), want, "1 GiB content mismatch"); +} From b06b088d7653672edec3ae8585a7fad226a66ed3 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Fri, 12 Jun 2026 00:22:16 -0400 Subject: [PATCH 07/15] fix(ingress): bound the remote-cid presence check (5s) - absent cids must 409 fast kubo block lookup for an ABSENT cid is an unbounded bitswap/DHT search; the mapping-PUT handler hung for minutes per missing block (caught by drill I10: curl timed out). Timeout => treat as absent => retryable 409, client falls back to a full-bytes PUT. 5s amply covers a bitswap pull from the ingest peer. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-cli/src/handlers/object.rs | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/crates/fula-cli/src/handlers/object.rs b/crates/fula-cli/src/handlers/object.rs index aa45730..262fd80 100644 --- a/crates/fula-cli/src/handlers/object.rs +++ b/crates/fula-cli/src/handlers/object.rs @@ -165,21 +165,39 @@ pub async fn put_object( "remote cid must be raw (0x55) + blake3 (0x1e) — the addressing fula-ingest verifies", )); } - match state.block_store.has_block(&declared_cid).await { - Ok(true) => {} - Ok(false) => { + // BOUNDED presence check: kubo's block lookup for an ABSENT cid is an + // unbounded network search (bitswap/DHT) — without a deadline this + // handler would hang for minutes per missing block. 5s is ample for a + // bitswap pull from the ingest peer (same box: instant; LAN/WAN peer: + // one round-trip); timeout ⇒ treat as absent ⇒ retryable 409, the + // client falls back to a full-bytes PUT. + match tokio::time::timeout( + std::time::Duration::from_secs(5), + state.block_store.has_block(&declared_cid), + ) + .await + { + Ok(Ok(true)) => {} + Ok(Ok(false)) => { return Err(ApiError::s3( S3ErrorCode::OperationAborted, "declared block not present/retrievable yet — retry or fall back to a full-bytes PUT", )); } - Err(e) => { + Ok(Err(e)) => { tracing::warn!(error = %e, cid = %declared_cid, "remote-cid presence check failed"); return Err(ApiError::s3( S3ErrorCode::OperationAborted, "block presence check failed — retry or fall back to a full-bytes PUT", )); } + Err(_elapsed) => { + tracing::debug!(cid = %declared_cid, "remote-cid presence check timed out (treating as absent)"); + return Err(ApiError::s3( + S3ErrorCode::OperationAborted, + "block presence check timed out — retry or fall back to a full-bytes PUT", + )); + } } tracing::debug!(bucket = %bucket_name, key = %key, cid = %declared_cid, "remote-cid PUT accepted (bytes via ingest)"); declared_cid From 754a41702d116c6b628f8e2b331e9f93016cd979 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Fri, 12 Jun 2026 08:20:22 -0400 Subject: [PATCH 08/15] fix(test): live e2e uses the FxFiles pairing (put_object_flat/get_object_flat) get_object does not exist on EncryptedClient; the canonical pairing is put_object_flat (which dispatches >768KiB payloads into put_object_chunked_internal - the route under test) + get_object_flat, exactly what offline_e2e/FxFiles use. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-client/tests/live_ingest_e2e.rs | 22 +++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/crates/fula-client/tests/live_ingest_e2e.rs b/crates/fula-client/tests/live_ingest_e2e.rs index da8b818..7dc1dce 100644 --- a/crates/fula-client/tests/live_ingest_e2e.rs +++ b/crates/fula-client/tests/live_ingest_e2e.rs @@ -57,14 +57,16 @@ async fn live_chunked_via_ingest_round_trip() { let c = client(&s3, &jwt, Some(&ingest), true); let bucket = "p2-live-ingest"; let key = "/ingest/round-trip.bin"; - let data = payload(1_500_000); // ~6 chunks at the 256 KiB default + // >768 KiB → put_object_flat dispatches into put_object_chunked_internal + // (the FxFiles photo/video path) where the ingest route lives. + let data = payload(1_500_000); - c.put_object_chunked(bucket, key, &data, None) + c.put_object_flat(bucket, key, data.clone(), Some("application/octet-stream")) .await - .expect("chunked upload via ingest route"); + .expect("chunked flat upload via ingest route"); let got = c - .get_object(bucket, key) + .get_object_flat(bucket, key) .await .expect("download after ingest-routed upload"); assert_eq!(got.len(), data.len(), "length mismatch"); @@ -89,10 +91,10 @@ async fn live_chunked_v8_off_legacy_round_trip() { let key = "/legacy/round-trip.bin"; let data = payload(1_200_000); - c.put_object_chunked(bucket, key, &data, None) + c.put_object_flat(bucket, key, data.clone(), Some("application/octet-stream")) .await - .expect("chunked upload, v8 off"); - let got = c.get_object(bucket, key).await.expect("download"); + .expect("chunked flat upload, v8 off"); + let got = c.get_object_flat(bucket, key).await.expect("download"); assert_eq!(blake3::hash(&got), blake3::hash(&data)); } @@ -121,13 +123,13 @@ async fn live_1gib_chunked_via_ingest() { let want = blake3::hash(&data); let started = std::time::Instant::now(); - c.put_object_chunked(bucket, key, &data, None) + c.put_object_flat(bucket, key, data.clone(), Some("application/octet-stream")) .await - .expect("1 GiB chunked upload via ingest"); + .expect("1 GiB chunked flat upload via ingest"); eprintln!("1 GiB upload took {:?}", started.elapsed()); drop(data); - let got = c.get_object(bucket, key).await.expect("1 GiB download"); + let got = c.get_object_flat(bucket, key).await.expect("1 GiB download"); assert_eq!(got.len(), GIB); assert_eq!(blake3::hash(&got), want, "1 GiB content mismatch"); } From d381cb49a05968ae7ea8074e1caaf3ccc2db042e Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Fri, 12 Jun 2026 10:47:26 -0400 Subject: [PATCH 09/15] feat(multimaster): FM-1 bucket-root CAS - shared arbiter for concurrent masters Phase 2.5 core (fula-api#32). Default OFF; byte-identical when dark. fula-core (storage-agnostic): - new root_pointer module: RootPointerStore trait (cas_root/get_root) + CasOutcome + in-memory test store; race-semantics unit tests - Bucket::flush(): with an arbiter attached, CAS old_root -> new_root BEFORE publishing to the in-process cache; a lost race surfaces as PreconditionFailed (the retryable contract SDKs already handle for conditional writes); no-op when the root did not change - BucketManager: OnceLock-held arbiter (wireable after Arc-ing, lock-free reads); open_bucket_for_user consults get_root so a bucket moved by ANOTHER master is opened at the shared root, not this process's stale cache (arbiter-unreachable degrades to cached root - flush CAS still arbitrates); opened buckets get the arbiter attached fula-cli: - root_store_pg: PgRootStore on the EXISTING pins-DB sqlx pool (zero new deps) - single-statement INSERT..ON CONFLICT..DO UPDATE..WHERE root_cid=expected upsert-CAS; bucket_roots table ships as pinning-service migration 020 - flag FULA_BUCKET_ROOT_CAS + config field; state wires the arbiter when flag AND Postgres are present (loud warning if flag-on without DB) Co-Authored-By: Claude Fable 5 --- crates/fula-cli/src/config.rs | 6 ++ crates/fula-cli/src/lib.rs | 1 + crates/fula-cli/src/main.rs | 7 ++ crates/fula-cli/src/root_store_pg.rs | 93 +++++++++++++++++ crates/fula-cli/src/state.rs | 15 +++ crates/fula-core/src/bucket.rs | 106 +++++++++++++++++++- crates/fula-core/src/lib.rs | 2 + crates/fula-core/src/root_pointer.rs | 143 +++++++++++++++++++++++++++ 8 files changed, 370 insertions(+), 3 deletions(-) create mode 100644 crates/fula-cli/src/root_store_pg.rs create mode 100644 crates/fula-core/src/root_pointer.rs diff --git a/crates/fula-cli/src/config.rs b/crates/fula-cli/src/config.rs index 2671f74..d7b306c 100644 --- a/crates/fula-cli/src/config.rs +++ b/crates/fula-cli/src/config.rs @@ -66,6 +66,11 @@ pub struct GatewayConfig { /// masters are never sent the protocol. Default OFF. #[serde(default)] pub remote_cid_put_enabled: bool, + /// FM-1 (Phase 2.5): arbitrate bucket-root flushes through the shared + /// Postgres so CONCURRENT federated masters can serve writes without + /// lost updates. Needs the pins-DB (POSTGRES_* env). Default OFF. + #[serde(default)] + pub bucket_root_cas_enabled: bool, /// Storage API URL for balance/quota checking before uploads pub storage_api_url: Option, /// Admin JWT secret for admin API authentication (separate from user JWT) @@ -207,6 +212,7 @@ impl Default for GatewayConfig { cors_origins: vec!["*".to_string()], registry_cid_path: Some("/var/lib/fula-gateway/registry.cid".to_string()), remote_cid_put_enabled: false, + bucket_root_cas_enabled: false, storage_api_url: None, admin_jwt_secret: None, admin_api_enabled: false, diff --git a/crates/fula-cli/src/lib.rs b/crates/fula-cli/src/lib.rs index d8e9a72..5d06a86 100644 --- a/crates/fula-cli/src/lib.rs +++ b/crates/fula-cli/src/lib.rs @@ -49,6 +49,7 @@ pub mod pin_queue; pub mod pinning; pub mod recovery_fallback; pub mod revocation; +pub mod root_store_pg; pub mod routes; pub mod server; pub mod state; diff --git a/crates/fula-cli/src/main.rs b/crates/fula-cli/src/main.rs index ca683ad..0d2beab 100644 --- a/crates/fula-cli/src/main.rs +++ b/crates/fula-cli/src/main.rs @@ -98,6 +98,12 @@ struct Args { /// Advertised to clients via GET /fula/capabilities. Default OFF. #[arg(long, env = "FULA_REMOTE_CID_PUT")] remote_cid_put: bool, + + /// FM-1 (Phase 2.5): arbitrate bucket-root flushes through the shared + /// Postgres (POSTGRES_* env) so concurrent federated masters never lose + /// each other's writes. Default OFF. + #[arg(long, env = "FULA_BUCKET_ROOT_CAS")] + bucket_root_cas: bool, } #[tokio::main] @@ -170,6 +176,7 @@ async fn main() -> anyhow::Result<()> { local_retain_enabled: if args.no_local_retain { Some(false) } else { None }, local_retain_backfill: if args.no_local_retain_backfill { Some(false) } else { None }, remote_cid_put_enabled: args.remote_cid_put, + bucket_root_cas_enabled: args.bucket_root_cas, ..Default::default() }; diff --git a/crates/fula-cli/src/root_store_pg.rs b/crates/fula-cli/src/root_store_pg.rs new file mode 100644 index 0000000..7b30cd2 --- /dev/null +++ b/crates/fula-cli/src/root_store_pg.rs @@ -0,0 +1,93 @@ +//! FM-1 (Phase 2.5) — Postgres-backed shared bucket-root arbiter. +//! +//! The Stage-A federated masters already share one Postgres (billing, +//! sessions, pins); the same database is the lowest-infrastructure arbiter +//! for bucket-root CAS. Table (pinning-service migration 020): +//! +//! bucket_roots(owner_id TEXT, bucket TEXT, root_cid TEXT NOT NULL, +//! version BIGINT NOT NULL DEFAULT 1, updated_at TIMESTAMPTZ, +//! PRIMARY KEY (owner_id, bucket)) +//! +//! CAS semantics: one statement, race-safe under READ COMMITTED — +//! INSERT .. ON CONFLICT (owner_id,bucket) DO UPDATE +//! SET root_cid = $new, version = version+1, updated_at = now() +//! WHERE bucket_roots.root_cid = $expected +//! rows_affected == 1 ⇒ Won (fresh row claimed, or expected matched); +//! rows_affected == 0 ⇒ another master moved the pointer ⇒ Conflict. +//! +//! Reuses the AppState `pins_db` sqlx pool — zero new dependencies. + +use async_trait::async_trait; +use cid::Cid; +use fula_core::root_pointer::{CasOutcome, RootPointerStore}; +use fula_core::{CoreError, Result}; +use sqlx::PgPool; + +pub struct PgRootStore { + pool: PgPool, +} + +impl PgRootStore { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl RootPointerStore for PgRootStore { + async fn cas_root( + &self, + owner_id: &str, + bucket: &str, + expected: &Cid, + new: &Cid, + ) -> Result { + let res = sqlx::query( + r#" + INSERT INTO bucket_roots (owner_id, bucket, root_cid, version, updated_at) + VALUES ($1, $2, $3, 1, NOW()) + ON CONFLICT (owner_id, bucket) DO UPDATE + SET root_cid = $3, version = bucket_roots.version + 1, updated_at = NOW() + WHERE bucket_roots.root_cid = $4 + "#, + ) + .bind(owner_id) + .bind(bucket) + .bind(new.to_string()) + .bind(expected.to_string()) + .execute(&self.pool) + .await + .map_err(|e| CoreError::StorageError(format!("bucket_roots CAS: {e}")))?; + + if res.rows_affected() == 1 { + return Ok(CasOutcome::Won); + } + + // Lost the race — report the pointer another master holds now. + let current: Option = sqlx::query_scalar( + "SELECT root_cid FROM bucket_roots WHERE owner_id = $1 AND bucket = $2", + ) + .bind(owner_id) + .bind(bucket) + .fetch_optional(&self.pool) + .await + .map_err(|e| CoreError::StorageError(format!("bucket_roots read: {e}")))?; + + Ok(CasOutcome::Conflict { + current: current.and_then(|s| s.parse::().ok()), + }) + } + + async fn get_root(&self, owner_id: &str, bucket: &str) -> Result> { + let current: Option = sqlx::query_scalar( + "SELECT root_cid FROM bucket_roots WHERE owner_id = $1 AND bucket = $2", + ) + .bind(owner_id) + .bind(bucket) + .fetch_optional(&self.pool) + .await + .map_err(|e| CoreError::StorageError(format!("bucket_roots read: {e}")))?; + + Ok(current.and_then(|s| s.parse::().ok())) + } +} diff --git a/crates/fula-cli/src/state.rs b/crates/fula-cli/src/state.rs index 7eede1a..611dccd 100644 --- a/crates/fula-cli/src/state.rs +++ b/crates/fula-cli/src/state.rs @@ -320,6 +320,21 @@ impl AppState { } }; + // FM-1 (Phase 2.5) — shared bucket-root CAS across federated masters. + // Requires BOTH the flag AND the shared Postgres (the same pool the + // recovery endpoints use). Flag on without a DB ⇒ loud warning + + // single-master behavior (never silently multi-master-unsafe). + if config.bucket_root_cas_enabled { + if let Some(ref pool) = pins_db { + bucket_manager.set_root_pointer_store(std::sync::Arc::new( + crate::root_store_pg::PgRootStore::new(pool.clone()), + )); + info!("✓ Bucket-root CAS enabled (FM-1): shared Postgres arbitrates multi-master flushes"); + } else { + warn!("FULA_BUCKET_ROOT_CAS is on but no Postgres is configured (POSTGRES_* / FULA_PINS_DATABASE_URL) — running WITHOUT shared root arbitration"); + } + } + // Audit F3 — JWT revocation deny-list. Allocated EMPTY here (= allow // all) only when the env switch is set AND a pins DB exists; the // background refresher (`revocation::spawn_if_enabled`, started in diff --git a/crates/fula-core/src/bucket.rs b/crates/fula-core/src/bucket.rs index 06e95f0..61606df 100644 --- a/crates/fula-core/src/bucket.rs +++ b/crates/fula-core/src/bucket.rs @@ -47,6 +47,12 @@ pub struct Bucket { metadata_cache: Option>>, /// Key used in the metadata cache (may differ from metadata.name for user-scoped buckets) cache_key: Option, + /// FM-1 (Phase 2.5): shared multi-master root arbiter. `None` (default) + /// = single-master behavior, byte-identical to before. Set via + /// [`set_root_pointer_store`](Self::set_root_pointer_store) when the + /// operator enables the flag; `flush()` then CASes the shared pointer + /// before publishing the new root. + root_store: Option>, } impl Bucket { @@ -74,6 +80,7 @@ impl Bucket { config, metadata_cache: None, cache_key: None, + root_store: None, }) } @@ -91,6 +98,7 @@ impl Bucket { config, metadata_cache, cache_key: None, + root_store: None, }) } @@ -110,9 +118,20 @@ impl Bucket { config, metadata_cache, cache_key: Some(cache_key), + root_store: None, }) } + /// FM-1: attach the shared multi-master root arbiter (see + /// [`crate::root_pointer`]). Called by `BucketManager` right after + /// opening when the operator enabled bucket-root CAS. + pub fn set_root_pointer_store( + &mut self, + store: Arc, + ) { + self.root_store = Some(store); + } + /// Get bucket name pub fn name(&self) -> &str { &self.metadata.name @@ -262,7 +281,37 @@ impl Bucket { /// Flush changes and return the new root CID pub async fn flush(&mut self) -> Result { + // FM-1: capture the root this bucket was OPENED at — the value the + // shared CAS must still hold for this flush to win the multi-master + // race. + let old_root = self.metadata.root_cid; + let root_cid = self.index.flush().await?; + + // FM-1 (Phase 2.5): with a shared arbiter attached, the new root is + // published to ALL masters atomically BEFORE this process's caches + // learn it. Losing the race means another master flushed this bucket + // after we opened it — surfacing as PreconditionFailed keeps the + // contract identical to conditional writes (retryable; SDKs already + // re-open + retry on 412). No-op when old == new (nothing changed). + if let Some(ref arbiter) = self.root_store { + if old_root != root_cid { + use crate::root_pointer::CasOutcome; + match arbiter + .cas_root(&self.metadata.owner_id, &self.metadata.name, &old_root, &root_cid) + .await? + { + CasOutcome::Won => {} + CasOutcome::Conflict { current } => { + return Err(CoreError::PreconditionFailed(format!( + "bucket '{}' was modified by another master (shared root is {:?}, this flush built on {}) — reopen and retry", + self.metadata.name, current, old_root + ))); + } + } + } + } + self.metadata.root_cid = root_cid; // Update the metadata cache if we have a reference to it @@ -430,6 +479,12 @@ pub struct BucketManager { /// registry.cid file from concurrent overwrites and coalesces the /// IPLD write when many mutations fire at once. registry_persist_lock: Arc>, + /// FM-1 (Phase 2.5): shared multi-master root arbiter, attached to every + /// bucket opened for writing. Unset (default) = single-master behavior. + /// `OnceLock` so it can be wired AFTER the manager is shared in an `Arc` + /// (the Postgres pool is constructed later in AppState init); reads are + /// lock-free. + root_pointer_store: std::sync::OnceLock>, } impl BucketManager { @@ -444,6 +499,7 @@ impl BucketManager { dirty: std::sync::atomic::AtomicBool::new(false), bucket_write_locks: Arc::new(DashMap::new()), registry_persist_lock: Arc::new(tokio::sync::Mutex::new(())), + root_pointer_store: std::sync::OnceLock::new(), } } @@ -458,9 +514,23 @@ impl BucketManager { dirty: std::sync::atomic::AtomicBool::new(false), bucket_write_locks: Arc::new(DashMap::new()), registry_persist_lock: Arc::new(tokio::sync::Mutex::new(())), + root_pointer_store: std::sync::OnceLock::new(), } } + /// FM-1 (Phase 2.5): enable shared multi-master root arbitration. Every + /// bucket opened via [`open_bucket_for_user`](Self::open_bucket_for_user) + /// afterwards (a) opens at the SHARED root when another master has moved + /// it past this process's cache, and (b) CASes the shared pointer on + /// flush. Call once at startup when the operator flag is on (set-once; + /// later calls are ignored). + pub fn set_root_pointer_store( + &self, + store: Arc, + ) { + let _ = self.root_pointer_store.set(store); + } + /// Rebuild the name_index from the current buckets DashMap fn rebuild_name_index(&self) { self.name_index.clear(); @@ -1135,21 +1205,51 @@ impl BucketManager { let internal_key = Self::scoped_bucket_key(user_id, name); tracing::debug!(bucket_name = %name, internal_key = %internal_key, "Opening user-scoped bucket"); - let metadata = self.buckets.get(&internal_key) + let mut metadata = self.buckets.get(&internal_key) .map(|r| r.clone()) .ok_or_else(|| { tracing::error!(bucket_name = %name, internal_key = %internal_key, "User-scoped bucket not found"); CoreError::BucketNotFound(name.to_string()) })?; + // FM-1 (Phase 2.5): another master may have flushed this bucket past + // our in-process cache. Open at the SHARED root so this write builds + // on the latest state instead of forking from a stale one (a stale + // open would only lose the CAS at flush — correct but wasteful). + if let Some(arbiter) = self.root_pointer_store.get() { + match arbiter.get_root(user_id, name).await { + Ok(Some(shared_root)) if shared_root != metadata.root_cid => { + tracing::debug!( + bucket = %name, + cached = %metadata.root_cid, + shared = %shared_root, + "opening at the shared root (another master moved it)" + ); + metadata.root_cid = shared_root; + // Keep the cache truthful for read paths too. + self.buckets.insert(internal_key.clone(), metadata.clone()); + } + Ok(_) => {} + Err(e) => { + // Arbiter unreachable: opening at the cached root is safe — + // the flush-side CAS still arbitrates. Log and continue. + tracing::warn!(error = %e, bucket = %name, "shared-root lookup failed; opening at cached root"); + } + } + } + // Use load_with_cache_key so flush() updates the correct entry - Bucket::load_with_cache_key( + let mut bucket = Bucket::load_with_cache_key( metadata, Arc::clone(&self.store), self.default_config.clone(), Some(Arc::clone(&self.buckets)), internal_key, - ).await + ).await?; + if let Some(arbiter) = self.root_pointer_store.get() { + bucket.set_root_pointer_store(Arc::clone(arbiter)); + } + Ok(bucket) } /// Delete a bucket for a specific user diff --git a/crates/fula-core/src/lib.rs b/crates/fula-core/src/lib.rs index f348e5b..eba7fa8 100644 --- a/crates/fula-core/src/lib.rs +++ b/crates/fula-core/src/lib.rs @@ -27,9 +27,11 @@ pub mod crdt; pub mod error; pub mod metadata; pub mod prolly; +pub mod root_pointer; pub use bucket::{Bucket, BucketConfig, BucketManager, BucketRegistry}; pub use error::{CoreError, Result}; +pub use root_pointer::{CasOutcome, RootPointerStore}; pub use metadata::{ObjectMetadata, EncryptionMetadata, StorageClass}; pub use prolly::{ProllyTree, ProllyNode, ProllyConfig}; diff --git a/crates/fula-core/src/root_pointer.rs b/crates/fula-core/src/root_pointer.rs new file mode 100644 index 0000000..1c8e474 --- /dev/null +++ b/crates/fula-core/src/root_pointer.rs @@ -0,0 +1,143 @@ +//! FM-1 (Phase 2.5, federated masters) — shared bucket-root arbitration. +//! +//! With more than one gateway serving writes, the per-bucket root pointer +//! must live somewhere ALL masters can compare-and-swap, or concurrent +//! flushes silently drop each other's objects (the in-process +//! `bucket_write_locks` only serialize within one process). This trait is +//! that arbiter, deliberately storage-agnostic: fula-core never learns what +//! backs it (fula-cli injects a Postgres implementation — the Stage-A +//! masters already share one database — and later phases may swap in a +//! chain-backed store without touching this crate). +//! +//! Wiring (when the operator enables the flag): +//! * `Bucket::flush()` CASes `(owner, bucket): old_root -> new_root` +//! BEFORE publishing the new root to the in-process metadata cache. +//! A lost race surfaces as `CoreError::PreconditionFailed` — the same +//! retryable contract as conditional writes, which client SDKs +//! already handle. +//! * `BucketManager::open_bucket_for_user()` consults `get_root` so a +//! bucket modified by ANOTHER master is opened at the shared root, +//! not this process's stale cache. +//! +//! Disabled (no store injected — the default): behavior is byte-identical +//! to today's single-master code. + +use crate::error::Result; +use async_trait::async_trait; +use cid::Cid; + +/// Outcome of a compare-and-swap on the shared root pointer. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CasOutcome { + /// This master won: the shared pointer now holds the new root. + Won, + /// Another master moved the pointer first. `current` is the root the + /// shared store holds now (None if the row vanished — treat as retry). + Conflict { current: Option }, +} + +/// Shared, multi-master-visible bucket-root pointer store. +#[async_trait] +pub trait RootPointerStore: Send + Sync { + /// Atomically set `(owner_id, bucket)` to `new` iff the stored root is + /// `expected` (or no row exists yet — first flush claims the slot). + async fn cas_root( + &self, + owner_id: &str, + bucket: &str, + expected: &Cid, + new: &Cid, + ) -> Result; + + /// The shared root for `(owner_id, bucket)`, if any master has flushed. + async fn get_root(&self, owner_id: &str, bucket: &str) -> Result>; +} + +#[cfg(test)] +mod tests { + use super::test_support::InMemoryRootStore; + use super::*; + + fn cid_of(b: &[u8]) -> Cid { + let h = blake3::hash(b); + let mh = cid::multihash::Multihash::<64>::wrap(0x1e, h.as_bytes()).unwrap(); + Cid::new_v1(0x55, mh) + } + + /// The exact two-master race: both open at root A; master 1 flushes A→B + /// and wins; master 2 flushes A→C and MUST conflict (its write would + /// silently drop master 1's objects otherwise). + #[tokio::test] + async fn second_master_building_on_a_stale_root_loses_the_cas() { + let store = InMemoryRootStore::default(); + let (a, b, c) = (cid_of(b"root-a"), cid_of(b"root-b"), cid_of(b"root-c")); + + assert_eq!( + store.cas_root("owner", "bkt", &a, &b).await.unwrap(), + CasOutcome::Won, + "first flush claims the slot" + ); + match store.cas_root("owner", "bkt", &a, &c).await.unwrap() { + CasOutcome::Conflict { current } => assert_eq!(current, Some(b)), + other => panic!("stale flush must conflict, got {other:?}"), + } + // The loser reopens at the shared root and retries — now it wins. + assert_eq!( + store.cas_root("owner", "bkt", &b, &c).await.unwrap(), + CasOutcome::Won + ); + assert_eq!(store.get_root("owner", "bkt").await.unwrap(), Some(c)); + } + + #[tokio::test] + async fn buckets_and_owners_are_isolated() { + let store = InMemoryRootStore::default(); + let (a, b) = (cid_of(b"a"), cid_of(b"b")); + store.cas_root("o1", "bkt", &a, &b).await.unwrap(); + assert_eq!(store.get_root("o2", "bkt").await.unwrap(), None); + assert_eq!(store.get_root("o1", "other").await.unwrap(), None); + } +} + +#[cfg(test)] +pub mod test_support { + //! In-memory store for unit tests: two Buckets sharing one of these + //! reproduce the cross-master race deterministically. + use super::*; + use dashmap::DashMap; + use std::sync::Arc; + + #[derive(Default, Clone)] + pub struct InMemoryRootStore { + inner: Arc>, + } + + #[async_trait] + impl RootPointerStore for InMemoryRootStore { + async fn cas_root( + &self, + owner_id: &str, + bucket: &str, + expected: &Cid, + new: &Cid, + ) -> Result { + let key = (owner_id.to_string(), bucket.to_string()); + let mut entry = self.inner.entry(key).or_insert(*expected); + if *entry.value() == *expected { + *entry.value_mut() = *new; + Ok(CasOutcome::Won) + } else { + Ok(CasOutcome::Conflict { + current: Some(*entry.value()), + }) + } + } + + async fn get_root(&self, owner_id: &str, bucket: &str) -> Result> { + Ok(self + .inner + .get(&(owner_id.to_string(), bucket.to_string())) + .map(|e| *e.value())) + } + } +} From 12713b97e5b010ea4e3bfbe7157fa4385d603dcd Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Fri, 12 Jun 2026 10:57:45 -0400 Subject: [PATCH 10/15] feat(multimaster): FM-4 - EIP-712 wallet-signature auth (portable identity) Phase 2.5 (fula-api#32). Self-certifying bearer: fula-eip712.. over EIP-712 typed data (domain "Fula Gateway" v1; FulaAuth{wallet,iat,exp}). Any master verifies offline - no shared secret, no session table - so ONE identity works on every federated master; the session user id IS the lowercase wallet (the same wallet the staking/MasterRegistry phases key on). Scope fixed to storage:*; lifetime capped 24h; iat skew 300s; revocation deny-list applies to the raw token unchanged. Additive: prefix + FULA_EIP712_AUTH flag route it; legacy JWT path untouched. /fula/capabilities advertises eip712Auth. Deps: k256 (ecdsa) + sha3 - minimal RustCrypto, only exercised behind the flag. Unit tests: valid sig -> wallet session w/ write scope; wrong-wallet claim rejected; tampered payload rejected; expired/overlong rejected; two independent verifications agree (the portability property). Known v0 limitation (documented in-module): per-PUT registry pinning forwards the bearer to the pinning service, which 401s non-session tokens - full parity needs the pinning service accepting this scheme (follow-up); Stage-B storage-only operators are unaffected. Co-Authored-By: Claude Fable 5 --- crates/fula-cli/Cargo.toml | 5 + crates/fula-cli/src/auth_eip712.rs | 299 ++++++++++++++++++++++++ crates/fula-cli/src/config.rs | 7 + crates/fula-cli/src/handlers/service.rs | 2 + crates/fula-cli/src/lib.rs | 1 + crates/fula-cli/src/main.rs | 7 + crates/fula-cli/src/middleware.rs | 35 ++- 7 files changed, 345 insertions(+), 11 deletions(-) create mode 100644 crates/fula-cli/src/auth_eip712.rs diff --git a/crates/fula-cli/Cargo.toml b/crates/fula-cli/Cargo.toml index 870e97f..3952dd8 100644 --- a/crates/fula-cli/Cargo.toml +++ b/crates/fula-cli/Cargo.toml @@ -90,6 +90,11 @@ reqwest = { workspace = true } # block-by-cid). Same workspace sqlx the project already pins; used ONLY when # FULA_PINS_DATABASE_URL is configured. sqlx = { workspace = true } +# FM-4 (Phase 2.5): EIP-712 wallet-signature auth — secp256k1 signature +# recovery (RustCrypto; pure Rust) + keccak256. Only exercised behind +# FULA_EIP712_AUTH; chosen over ethers/alloy to keep the dependency minimal. +k256 = { version = "0.13", features = ["ecdsa"] } +sha3 = "0.10" [dev-dependencies] criterion = { workspace = true } diff --git a/crates/fula-cli/src/auth_eip712.rs b/crates/fula-cli/src/auth_eip712.rs new file mode 100644 index 0000000..445eb83 --- /dev/null +++ b/crates/fula-cli/src/auth_eip712.rs @@ -0,0 +1,299 @@ +//! FM-4 (Phase 2.5) — portable identity: EIP-712 wallet-signature auth. +//! +//! Federated masters must authenticate the SAME user identically without +//! sharing master-local secrets. A wallet signature is self-certifying: +//! any master can verify it offline (no JWT secret, no session table), so +//! one identity works everywhere — and it is the same wallet the staking / +//! MasterRegistry phases key on. +//! +//! Token format (sent as the Bearer value; the prefix routes it): +//! +//! fula-eip712.. +//! +//! payload JSON: {"wallet":"0x..","iat":,"exp":} +//! +//! The signature is an EIP-712 typed signature over: +//! domain = { name: "Fula Gateway", version: "1" } (chain-agnostic) +//! message = FulaAuth { wallet: address, iat: uint256, exp: uint256 } +//! +//! Verification: recover the secp256k1 signer from the digest, require it to +//! equal `payload.wallet`, require `iat - 300 ≤ now ≤ exp` and a lifetime of +//! at most 24 h. The resulting session's user id IS the lowercase wallet +//! address — every master derives the same hashed user id from it. +//! +//! Scope: fixed to `storage:*`. Replay within the validity window is +//! accepted (bearer-token semantics, same as a JWT); short `exp` is the +//! mitigation, and the existing revocation deny-list applies to the raw +//! token string unchanged. +//! +//! Additive: the legacy JWT/session path is untouched; this path only +//! activates behind `FULA_EIP712_AUTH` AND the `fula-eip712.` prefix. +//! +//! Known v0 limitation (documented in #32): per-PUT registry pinning +//! forwards the bearer to the pinning service, which only knows ITS issued +//! sessions — an EIP-712 bearer 401s there, so registry persistence +//! requires the pinning service to accept this scheme too (follow-up). +//! Storage-only Stage-B operators (no pinning service) are unaffected. + +use crate::error::ApiError; +use crate::state::UserSession; +use base64::engine::general_purpose::URL_SAFE_NO_PAD; +use base64::Engine; +use k256::ecdsa::{RecoveryId, Signature, VerifyingKey}; +use serde::Deserialize; +use sha3::{Digest, Keccak256}; + +pub const TOKEN_PREFIX: &str = "fula-eip712."; + +/// Maximum allowed token lifetime (exp - iat). +const MAX_LIFETIME_SECS: i64 = 24 * 60 * 60; +/// Allowed clock skew on `iat`. +const IAT_SKEW_SECS: i64 = 300; + +#[derive(Debug, Deserialize)] +struct Eip712Payload { + wallet: String, + iat: i64, + exp: i64, +} + +fn keccak(data: &[u8]) -> [u8; 32] { + let mut h = Keccak256::new(); + h.update(data); + h.finalize().into() +} + +/// keccak256("EIP712Domain(string name,string version)") with our values. +fn domain_separator() -> [u8; 32] { + let type_hash = keccak(b"EIP712Domain(string name,string version)"); + let name_hash = keccak(b"Fula Gateway"); + let version_hash = keccak(b"1"); + let mut enc = Vec::with_capacity(96); + enc.extend_from_slice(&type_hash); + enc.extend_from_slice(&name_hash); + enc.extend_from_slice(&version_hash); + keccak(&enc) +} + +/// keccak256(typeHash ‖ wallet-as-uint256 ‖ iat ‖ exp) +fn struct_hash(wallet: &[u8; 20], iat: i64, exp: i64) -> [u8; 32] { + let type_hash = keccak(b"FulaAuth(address wallet,uint256 iat,uint256 exp)"); + let mut enc = Vec::with_capacity(128); + enc.extend_from_slice(&type_hash); + let mut addr32 = [0u8; 32]; + addr32[12..].copy_from_slice(wallet); + enc.extend_from_slice(&addr32); + let mut int32 = [0u8; 32]; + int32[24..].copy_from_slice(&(iat as u64).to_be_bytes()); + enc.extend_from_slice(&int32); + let mut exp32 = [0u8; 32]; + exp32[24..].copy_from_slice(&(exp as u64).to_be_bytes()); + enc.extend_from_slice(&exp32); + keccak(&enc) +} + +/// The EIP-712 signing digest: keccak256(0x1901 ‖ domainSeparator ‖ structHash). +pub fn signing_digest(wallet: &[u8; 20], iat: i64, exp: i64) -> [u8; 32] { + let mut enc = Vec::with_capacity(66); + enc.extend_from_slice(&[0x19, 0x01]); + enc.extend_from_slice(&domain_separator()); + enc.extend_from_slice(&struct_hash(wallet, iat, exp)); + keccak(&enc) +} + +fn parse_wallet(s: &str) -> Result<[u8; 20], ApiError> { + let hexpart = s.strip_prefix("0x").unwrap_or(s); + let bytes = hex::decode(hexpart) + .map_err(|_| bad("wallet is not valid hex"))?; + bytes + .try_into() + .map_err(|_| bad("wallet must be 20 bytes")) +} + +fn bad(msg: &str) -> ApiError { + ApiError::s3(crate::error::S3ErrorCode::InvalidToken, msg) +} + +/// Verify a `fula-eip712.` bearer and mint the portable session. +pub fn verify_eip712_token(token: &str) -> Result { + let rest = token + .strip_prefix(TOKEN_PREFIX) + .ok_or_else(|| bad("not an eip712 token"))?; + let (payload_b64, sig_b64) = rest + .split_once('.') + .ok_or_else(|| bad("malformed eip712 token (need payload.signature)"))?; + + let payload_bytes = URL_SAFE_NO_PAD + .decode(payload_b64) + .map_err(|_| bad("payload is not base64url"))?; + let payload: Eip712Payload = + serde_json::from_slice(&payload_bytes).map_err(|_| bad("payload is not valid JSON"))?; + + let now = chrono::Utc::now().timestamp(); + if payload.exp <= now { + return Err(bad("eip712 token expired")); + } + if payload.iat > now + IAT_SKEW_SECS { + return Err(bad("eip712 token iat is in the future")); + } + if payload.exp - payload.iat > MAX_LIFETIME_SECS { + return Err(bad("eip712 token lifetime exceeds 24h")); + } + + let wallet = parse_wallet(&payload.wallet)?; + let sig_bytes = URL_SAFE_NO_PAD + .decode(sig_b64) + .map_err(|_| bad("signature is not base64url"))?; + if sig_bytes.len() != 65 { + return Err(bad("signature must be 65 bytes (r‖s‖v)")); + } + let v = sig_bytes[64]; + let rec_id = RecoveryId::try_from(match v { + 0 | 1 => v, + 27 | 28 => v - 27, + _ => return Err(bad("invalid recovery id")), + }) + .map_err(|_| bad("invalid recovery id"))?; + let signature = + Signature::from_slice(&sig_bytes[..64]).map_err(|_| bad("invalid signature bytes"))?; + + let digest = signing_digest(&wallet, payload.iat, payload.exp); + let recovered = VerifyingKey::recover_from_prehash(&digest, &signature, rec_id) + .map_err(|_| bad("signature recovery failed"))?; + + // Ethereum address = last 20 bytes of keccak256(uncompressed pubkey[1..]). + let pub_uncompressed = recovered.to_encoded_point(false); + let addr_hash = keccak(&pub_uncompressed.as_bytes()[1..]); + let recovered_addr: [u8; 20] = addr_hash[12..].try_into().expect("20 bytes"); + + if recovered_addr != wallet { + return Err(bad("signature does not match the claimed wallet")); + } + + let wallet_lower = format!("0x{}", hex::encode(wallet)); + let expires_at = chrono::DateTime::from_timestamp(payload.exp, 0) + .unwrap_or_else(|| chrono::Utc::now() + chrono::Duration::hours(1)); + + Ok(UserSession::new( + wallet_lower, + None, + vec!["storage:*".to_string()], + expires_at, + token.to_string(), + )) +} + +#[cfg(test)] +mod tests { + use super::*; + use k256::ecdsa::SigningKey; + + fn test_key() -> SigningKey { + SigningKey::from_slice(&[0x42u8; 32]).expect("static test key") + } + + fn addr_of(key: &SigningKey) -> [u8; 20] { + let pubkey = key.verifying_key().to_encoded_point(false); + let h = keccak(&pubkey.as_bytes()[1..]); + h[12..].try_into().unwrap() + } + + fn mint(key: &SigningKey, wallet: [u8; 20], iat: i64, exp: i64) -> String { + let digest = signing_digest(&wallet, iat, exp); + let (sig, rec) = key.sign_prehash_recoverable(&digest).unwrap(); + let mut sig65 = sig.to_bytes().to_vec(); + sig65.push(rec.to_byte()); + let payload = serde_json::json!({ + "wallet": format!("0x{}", hex::encode(wallet)), + "iat": iat, + "exp": exp, + }); + format!( + "{}{}.{}", + TOKEN_PREFIX, + URL_SAFE_NO_PAD.encode(payload.to_string()), + URL_SAFE_NO_PAD.encode(sig65) + ) + } + + #[test] + fn valid_signature_yields_portable_wallet_session() { + let key = test_key(); + let wallet = addr_of(&key); + let now = chrono::Utc::now().timestamp(); + let token = mint(&key, wallet, now - 10, now + 3600); + + let session = verify_eip712_token(&token).expect("valid token"); + assert_eq!(session.user_id, format!("0x{}", hex::encode(wallet))); + assert!(session.can_write(), "storage:* must grant write"); + } + + #[test] + fn wrong_wallet_claim_is_rejected() { + let key = test_key(); + let now = chrono::Utc::now().timestamp(); + // Sign for OUR address but claim a different wallet in the payload. + let claimed = [0x11u8; 20]; + let digest = signing_digest(&claimed, now - 10, now + 3600); + let (sig, rec) = key.sign_prehash_recoverable(&digest).unwrap(); + let mut sig65 = sig.to_bytes().to_vec(); + sig65.push(rec.to_byte()); + let payload = serde_json::json!({ + "wallet": format!("0x{}", hex::encode(claimed)), + "iat": now - 10, + "exp": now + 3600, + }); + let token = format!( + "{}{}.{}", + TOKEN_PREFIX, + URL_SAFE_NO_PAD.encode(payload.to_string()), + URL_SAFE_NO_PAD.encode(sig65) + ); + assert!(verify_eip712_token(&token).is_err()); + } + + #[test] + fn tampered_payload_is_rejected() { + let key = test_key(); + let wallet = addr_of(&key); + let now = chrono::Utc::now().timestamp(); + let token = mint(&key, wallet, now - 10, now + 3600); + // Re-encode with a longer exp but the ORIGINAL signature. + let parts: Vec<&str> = token.trim_start_matches(TOKEN_PREFIX).split('.').collect(); + let forged_payload = serde_json::json!({ + "wallet": format!("0x{}", hex::encode(wallet)), + "iat": now - 10, + "exp": now + 7200, + }); + let forged = format!( + "{}{}.{}", + TOKEN_PREFIX, + URL_SAFE_NO_PAD.encode(forged_payload.to_string()), + parts[1] + ); + assert!(verify_eip712_token(&forged).is_err()); + } + + #[test] + fn expired_and_overlong_tokens_are_rejected() { + let key = test_key(); + let wallet = addr_of(&key); + let now = chrono::Utc::now().timestamp(); + assert!(verify_eip712_token(&mint(&key, wallet, now - 7200, now - 3600)).is_err()); + assert!(verify_eip712_token(&mint(&key, wallet, now, now + 90_000)).is_err()); + } + + #[test] + fn same_token_verifies_identically_with_no_local_state() { + // The portability property: verification uses NOTHING but the token — + // two "masters" (two verify calls with no shared setup) agree. + let key = test_key(); + let wallet = addr_of(&key); + let now = chrono::Utc::now().timestamp(); + let token = mint(&key, wallet, now - 10, now + 600); + let a = verify_eip712_token(&token).unwrap(); + let b = verify_eip712_token(&token).unwrap(); + assert_eq!(a.user_id, b.user_id); + assert_eq!(a.hashed_user_id, b.hashed_user_id); + } +} diff --git a/crates/fula-cli/src/config.rs b/crates/fula-cli/src/config.rs index d7b306c..8fd10ef 100644 --- a/crates/fula-cli/src/config.rs +++ b/crates/fula-cli/src/config.rs @@ -71,6 +71,12 @@ pub struct GatewayConfig { /// lost updates. Needs the pins-DB (POSTGRES_* env). Default OFF. #[serde(default)] pub bucket_root_cas_enabled: bool, + /// FM-4 (Phase 2.5): accept self-certifying EIP-712 wallet-signature + /// bearers (`fula-eip712.` prefix) — one identity on every federated + /// master with no shared secret. Additive; legacy JWT auth untouched. + /// Default OFF. + #[serde(default)] + pub eip712_auth_enabled: bool, /// Storage API URL for balance/quota checking before uploads pub storage_api_url: Option, /// Admin JWT secret for admin API authentication (separate from user JWT) @@ -213,6 +219,7 @@ impl Default for GatewayConfig { registry_cid_path: Some("/var/lib/fula-gateway/registry.cid".to_string()), remote_cid_put_enabled: false, bucket_root_cas_enabled: false, + eip712_auth_enabled: false, storage_api_url: None, admin_jwt_secret: None, admin_api_enabled: false, diff --git a/crates/fula-cli/src/handlers/service.rs b/crates/fula-cli/src/handlers/service.rs index 15e34f8..cc84521 100644 --- a/crates/fula-cli/src/handlers/service.rs +++ b/crates/fula-cli/src/handlers/service.rs @@ -55,5 +55,7 @@ pub async fn healthz() -> impl IntoResponse { pub async fn capabilities(State(state): State>) -> impl IntoResponse { axum::Json(serde_json::json!({ "remoteCidPut": state.config.remote_cid_put_enabled, + // FM-4: clients/operators can discover wallet-auth support the same way. + "eip712Auth": state.config.eip712_auth_enabled, })) } diff --git a/crates/fula-cli/src/lib.rs b/crates/fula-cli/src/lib.rs index 5d06a86..de7a86e 100644 --- a/crates/fula-cli/src/lib.rs +++ b/crates/fula-cli/src/lib.rs @@ -33,6 +33,7 @@ //! ``` pub mod auth; +pub mod auth_eip712; pub mod config; pub mod entries_store; pub mod error; diff --git a/crates/fula-cli/src/main.rs b/crates/fula-cli/src/main.rs index 0d2beab..03a0a75 100644 --- a/crates/fula-cli/src/main.rs +++ b/crates/fula-cli/src/main.rs @@ -104,6 +104,12 @@ struct Args { /// each other's writes. Default OFF. #[arg(long, env = "FULA_BUCKET_ROOT_CAS")] bucket_root_cas: bool, + + /// FM-4 (Phase 2.5): accept EIP-712 wallet-signature bearers + /// (fula-eip712. prefix) — portable identity across federated masters. + /// Default OFF. + #[arg(long, env = "FULA_EIP712_AUTH")] + eip712_auth: bool, } #[tokio::main] @@ -177,6 +183,7 @@ async fn main() -> anyhow::Result<()> { local_retain_backfill: if args.no_local_retain_backfill { Some(false) } else { None }, remote_cid_put_enabled: args.remote_cid_put, bucket_root_cas_enabled: args.bucket_root_cas, + eip712_auth_enabled: args.eip712_auth, ..Default::default() }; diff --git a/crates/fula-cli/src/middleware.rs b/crates/fula-cli/src/middleware.rs index 68e1534..642df3b 100644 --- a/crates/fula-cli/src/middleware.rs +++ b/crates/fula-cli/src/middleware.rs @@ -73,17 +73,30 @@ pub async fn auth_middleware( // Extract JWT from either Bearer token or AWS Sig V4 format let token = extract_token_from_header(header, request.headers())?; - let secret = state.config.jwt_secret.as_ref() - .ok_or_else(|| ApiError::s3(S3ErrorCode::InternalError, "JWT secret not configured"))?; - - let claims = validate_token(&token, secret)?; - // Audit F3: honor a manually-revoked key. No-op unless the revocation - // deny-list is enabled (env switch + pins DB). Deny-list + fail-open, - // so a currently-valid token is never rejected. `token` is the raw - // JWT — the issuer hashes the same string into `api_keys.key_hash`. - crate::revocation::ensure_not_revoked(state.revocation.as_deref(), &token)?; - // Pass the raw JWT token to the session for forwarding to pinning service - claims_to_session(claims, token) + // FM-4 (Phase 2.5): portable wallet identity. Self-certifying — + // no master-local secret involved, so the SAME token works on + // every federated master. Additive: only the `fula-eip712.` + // prefix routes here, and only behind the flag; the legacy JWT + // path below is untouched. + if state.config.eip712_auth_enabled + && token.starts_with(crate::auth_eip712::TOKEN_PREFIX) + { + let session = crate::auth_eip712::verify_eip712_token(&token)?; + crate::revocation::ensure_not_revoked(state.revocation.as_deref(), &token)?; + session + } else { + let secret = state.config.jwt_secret.as_ref() + .ok_or_else(|| ApiError::s3(S3ErrorCode::InternalError, "JWT secret not configured"))?; + + let claims = validate_token(&token, secret)?; + // Audit F3: honor a manually-revoked key. No-op unless the revocation + // deny-list is enabled (env switch + pins DB). Deny-list + fail-open, + // so a currently-valid token is never rejected. `token` is the raw + // JWT — the issuer hashes the same string into `api_keys.key_hash`. + crate::revocation::ensure_not_revoked(state.revocation.as_deref(), &token)?; + // Pass the raw JWT token to the session for forwarding to pinning service + claims_to_session(claims, token) + } } None => { return Err(ApiError::s3( From 74ec2998ec7768867ae183070eb77c599576b120 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Fri, 12 Jun 2026 12:05:10 -0400 Subject: [PATCH 11/15] test(crypto): pin issue #34 - forest write amplification on sequential puts Three current-behaviour pins quantifying the O(N^2) bulk-upload cost: - node layer: store() re-PUTs an entire unchanged InMemory tree because persisted children are never downgraded to Stored (the only Stored constructor is the load path, Pointer::from_wire) - forest layer: per-flush PUT count and bytes grow with bucket size for the put_object_flat pattern (upsert + flush per file, one directory): flush #10 = 1 PUT / 10.8 KB -> flush #150 = 17 PUTs / 161.7 KB; 150 x 4 KB files = 1630 node PUTs / 12.2 MB of index re-uploads - contrast: marginal flush after 150 uploads is 17 PUTs in the long-lived session vs 3 PUTs for a fresh forest loaded from the same manifest - the amplification is session-accumulated InMemory state, not inherent tree cost Invert these pins into regression guards when #34 is fixed. Co-Authored-By: Claude Fable 5 --- crates/fula-crypto/src/sharded_hamt_forest.rs | 262 ++++++++++++++++++ crates/fula-crypto/src/wnfs_hamt/node.rs | 83 ++++++ 2 files changed, 345 insertions(+) diff --git a/crates/fula-crypto/src/sharded_hamt_forest.rs b/crates/fula-crypto/src/sharded_hamt_forest.rs index 3566b12..e8b656b 100644 --- a/crates/fula-crypto/src/sharded_hamt_forest.rs +++ b/crates/fula-crypto/src/sharded_hamt_forest.rs @@ -5391,4 +5391,266 @@ mod tests { extracted.directories.keys().collect::>() ); } + + // ===================================================================== + // Issue #34 — per-upload cost grows with bucket size (forest write + // amplification). These are CURRENT-BEHAVIOUR pins: they pass on the + // buggy code and quantify the bug; invert them when #34 is fixed. + // + // Mechanism (see also `node::round_trip_tests:: + // issue_34_store_reputs_entire_unchanged_in_memory_tree` for the + // node-layer pin): `Node::set_value` marks every node on a mutated + // path `ChildPtr::InMemory`; `Node::store(&self)` re-PUTs every + // InMemory node but can never downgrade them to `Stored` + // (`ChildPtr::Stored` is only constructed by the load path, + // `Pointer::from_wire`). In a long-lived session the InMemory set + // grows monotonically, so EVERY flush re-uploads the whole + // ever-touched subtree. All files in one directory route to ONE + // shard (dir-local routing), so for the FxFiles bulk-upload pattern + // (N × `put_object_flat` = N × (upsert + flush) into one dir) the + // per-flush cost grows with N and the total is O(N²). + // ===================================================================== + + /// PUT-call + PUT-byte counting backend, local to the #34 tests + /// (`CountingBlobBackend` counts ops but not payload bytes, and byte + /// growth is half the #34 story). + struct Issue34CountingBackend { + inner: InMemoryBackend, + puts: std::sync::atomic::AtomicU64, + put_bytes: std::sync::atomic::AtomicU64, + } + + impl Issue34CountingBackend { + fn new() -> Self { + Self { + inner: InMemoryBackend::new(), + puts: std::sync::atomic::AtomicU64::new(0), + put_bytes: std::sync::atomic::AtomicU64::new(0), + } + } + + /// `(total PUT calls, total PUT payload bytes)` so far. + fn snapshot(&self) -> (u64, u64) { + use std::sync::atomic::Ordering::Relaxed; + (self.puts.load(Relaxed), self.put_bytes.load(Relaxed)) + } + } + + #[async_trait::async_trait] + impl crate::wnfs_hamt::v7_store::BlobBackend for Issue34CountingBackend { + async fn get(&self, path: &str) -> Result> { + self.inner.get(path).await + } + + async fn put( + &self, + path: &str, + bytes: Vec, + ) -> Result { + use std::sync::atomic::Ordering::Relaxed; + self.puts.fetch_add(1, Relaxed); + self.put_bytes.fetch_add(bytes.len() as u64, Relaxed); + self.inner.put(path, bytes).await + } + } + + /// Entry weighted like production: `put_object_flat` stashes the + /// `x-fula-encryption` JSON (wrapped DEK + nonce + encrypted private + /// metadata, ~0.9 KB) into `user_metadata` before the upsert + /// (`fula-client/src/encryption.rs`), so each forest entry carries + /// ~1 KB. Per-node payloads — and therefore the re-upload bytes this + /// test measures — scale with that weight; using it keeps the numbers + /// representative of the FxFiles repro rather than artificially small. + fn issue34_entry(path: &str) -> ForestFileEntry { + let mut e = file_entry(path, 4096); + e.user_metadata.insert( + "x-fula-encryption".to_string(), + "x".repeat(900), + ); + e + } + + /// Issue #34, pin 1: the marginal flush cost of `upsert + flush_dirty` + /// (= what every `put_object_flat` call does) grows with the number of + /// objects already in the bucket — in PUT calls AND in PUT bytes. + /// + /// This is the SDK-side reproduction of the FxFiles measurement + /// (cumulative avg 1.04 s/file at n=25 → 3.8 s/file at n=125): each + /// backend PUT here is a real sequential network round trip in + /// production (`S3BlobBackend` → master), so per-flush PUT growth is + /// per-upload latency growth. + #[tokio::test] + async fn issue_34_per_flush_put_cost_grows_with_bucket_size() { + const N: usize = 150; + // Pin the shard salt so the hot-shard index is stable run-to-run. + // (The HAMT shape itself is salt-independent — node hashing is + // `H::hash(key)` over fixed path strings — so the counts below are + // deterministic either way; the pinned salt also stabilizes which + // shard the directory routes to.) + const FIXED_SALT: [u8; 32] = [0x34; 32]; + + let backend = Arc::new(Issue34CountingBackend::new()); + let mut manifest = crate::private_forest::ShardManifestV7::new(16); + manifest.root.shard_salt = FIXED_SALT.to_vec(); + let mut forest = + ShardedHamtPrivateForest::from_manifest(manifest, "documents-v8", test_dek()); + + let mut puts_per_flush = Vec::with_capacity(N); + let mut bytes_per_flush = Vec::with_capacity(N); + for i in 0..N { + forest + .upsert_file( + issue34_entry(&format!("/e2e/perf/file-{:03}.bin", i)), + &backend, + ) + .await + .unwrap(); + let (p0, b0) = backend.snapshot(); + forest.flush_dirty(&backend).await.unwrap(); + let (p1, b1) = backend.snapshot(); + puts_per_flush.push(p1 - p0); + bytes_per_flush.push(b1 - b0); + } + + let (total_puts, total_bytes) = backend.snapshot(); + eprintln!( + "\n[#34] one directory, one hot shard, flush after every upsert \ + (the `put_object_flat` pattern):" + ); + eprintln!(" upload # | HAMT-node PUTs in that flush | bytes in that flush"); + for &n in &[1usize, 10, 25, 50, 75, 100, 125, 150] { + eprintln!( + " {:>8} | {:>28} | {:>19}", + n, + puts_per_flush[n - 1], + bytes_per_flush[n - 1] + ); + } + eprintln!( + " TOTAL for {} files of 4 KB: {} node PUTs, {:.1} MB uploaded \ + (index alone, object bodies excluded)", + N, + total_puts, + total_bytes as f64 / 1.0e6 + ); + + // (1) Marginal PUT-call count grows several-fold: flushes 5-14 + // (ancestor-chain warmup excluded) vs the last 10 flushes. On the + // current code this ratio is ~10×; assert a conservative ≥3× so + // minor tree-shape changes don't flake the pin. + let early_puts: u64 = puts_per_flush[4..14].iter().sum(); + let late_puts: u64 = puts_per_flush[N - 10..].iter().sum(); + assert!( + late_puts >= 3 * early_puts, + "issue #34 pin: expected per-flush PUT count to grow ≥3× as the \ + bucket fills (early flushes 5-14: {} PUTs total; last 10 \ + flushes: {} PUTs total). If late ≈ early ≈ O(path depth), the \ + write amplification is FIXED — invert this test into a \ + flat-marginal-cost regression guard.", + early_puts, + late_puts + ); + + // (2) Marginal PUT bytes grow at least as fast (every ever-touched + // node is re-serialized + re-encrypted + re-PUT on every flush, so + // per-flush bytes are O(bucket size)). + let early_bytes: u64 = bytes_per_flush[4..14].iter().sum(); + let late_bytes: u64 = bytes_per_flush[N - 10..].iter().sum(); + assert!( + late_bytes >= 5 * early_bytes, + "issue #34 pin: expected per-flush PUT bytes to grow ≥5× as the \ + bucket fills (early flushes 5-14: {} B; last 10 flushes: {} B).", + early_bytes, + late_bytes + ); + } + + /// Issue #34, pin 2: the amplification is SESSION-ACCUMULATED state, + /// not an inherent cost of the tree size. + /// + /// Proof by contrast, same bucket / same salt / same persisted data: + /// - the 150-upload session's marginal flush re-PUTs the whole + /// ever-touched hot-shard tree (~17 nodes: root + every level-1 + /// child, all stuck `InMemory`); + /// - a FRESH forest built from the same manifest (= what an app + /// restart or a new device produces; every pointer loads as + /// `Stored`) doing ONE upsert + flush re-PUTs only the touched + /// root→leaf path (~3 nodes). + /// + /// An incremental flush would make the long-lived session behave like + /// the fresh one. This contrast is the direct falsification of "the + /// index is just big, the cost is unavoidable". + #[tokio::test] + async fn issue_34_fresh_session_flush_is_incremental_long_session_is_not() { + const N: usize = 150; + const FIXED_SALT: [u8; 32] = [0x34; 32]; + + let backend = Arc::new(Issue34CountingBackend::new()); + let mut manifest = crate::private_forest::ShardManifestV7::new(16); + manifest.root.shard_salt = FIXED_SALT.to_vec(); + let mut forest = + ShardedHamtPrivateForest::from_manifest(manifest, "documents-v8", test_dek()); + + // The long-lived session: N × (upsert + flush), like FxFiles. + for i in 0..N { + forest + .upsert_file( + issue34_entry(&format!("/e2e/perf/file-{:03}.bin", i)), + &backend, + ) + .await + .unwrap(); + forest.flush_dirty(&backend).await.unwrap(); + } + + // Marginal cost of upload #151 in the long-lived session. + forest + .upsert_file(issue34_entry("/e2e/perf/file-150.bin"), &backend) + .await + .unwrap(); + let (p0, _) = backend.snapshot(); + let manifest_after = forest.flush_dirty(&backend).await.unwrap().clone(); + let (p1, _) = backend.snapshot(); + let long_session_puts = p1 - p0; + + // Fresh session over the SAME persisted state: from_manifest is the + // production load path, so every shard-root pointer deserializes as + // `Stored`/`StoredV2` and only the genuinely mutated path goes + // `InMemory` on the next write. + let mut fresh = ShardedHamtPrivateForest::from_manifest( + manifest_after, + "documents-v8", + test_dek(), + ); + fresh + .upsert_file(issue34_entry("/e2e/perf/file-151.bin"), &backend) + .await + .unwrap(); + let (q0, _) = backend.snapshot(); + fresh.flush_dirty(&backend).await.unwrap(); + let (q1, _) = backend.snapshot(); + let fresh_session_puts = q1 - q0; + + eprintln!( + "\n[#34] marginal flush after 150 uploads — long-lived session: \ + {} node PUTs; fresh session (same data, loaded from manifest): \ + {} node PUTs", + long_session_puts, fresh_session_puts + ); + + assert!( + fresh_session_puts >= 1, + "fresh session must persist at least the touched path" + ); + assert!( + long_session_puts >= 3 * fresh_session_puts, + "issue #34 pin: the long-lived session's marginal flush re-PUT \ + {} nodes while a fresh session over the same data re-PUT only \ + {} — the difference is exactly the never-downgraded InMemory \ + set. If the ratio collapsed to ~1, the write amplification is \ + FIXED — invert this test into a regression guard.", + long_session_puts, + fresh_session_puts + ); + } } diff --git a/crates/fula-crypto/src/wnfs_hamt/node.rs b/crates/fula-crypto/src/wnfs_hamt/node.rs index ddb647f..2b6345e 100644 --- a/crates/fula-crypto/src/wnfs_hamt/node.rs +++ b/crates/fula-crypto/src/wnfs_hamt/node.rs @@ -854,4 +854,87 @@ mod round_trip_tests { distinct_orders.len() ); } + + /// Issue #34 — root-cause pin at the node layer (CURRENT-BEHAVIOUR test: + /// it passes on the buggy code and documents the bug; invert it when + /// #34 is fixed). + /// + /// `set_value` replaces every node on a mutated root→leaf path with + /// `Pointer::Link(ChildPtr::InMemory(_))`. `store(&self)` recursively + /// persists every `InMemory` child — but, taking `&self`, it cannot + /// downgrade them back to `Stored` after the PUT succeeds, and nothing + /// else does (`ChildPtr::Stored` is only ever constructed by + /// `Pointer::from_wire`, i.e. the load path). Consequence: the + /// `InMemory` set of a long-lived tree only ever grows, and EVERY + /// `store()` re-serializes and re-PUTs the ENTIRE ever-touched subtree + /// — unchanged nodes included. That is the engine of the O(N²) + /// sequential-upload cost reported in issue #34 (each + /// `put_object_flat` = upsert + flush; each flush re-uploads the whole + /// accumulated tree). + /// + /// The pin: two consecutive `store()` calls with NO mutation in + /// between PUT the same number of nodes. An incremental design would + /// PUT ~0 on the second call (nothing changed; the tree is + /// content-addressed so the root key is provably identical). + #[tokio::test] + async fn issue_34_store_reputs_entire_unchanged_in_memory_tree() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + struct PutCountingStore { + inner: InMemoryStore, + puts: AtomicUsize, + } + #[async_trait::async_trait] + impl HamtNodeStore for PutCountingStore { + async fn get_node(&self, key: &StorageKey) -> Result { + self.inner.get_node(key).await + } + async fn put_node(&self, bytes: HamtNodeBytes) -> Result { + self.puts.fetch_add(1, Ordering::Relaxed); + self.inner.put_node(bytes).await + } + } + + let store = PutCountingStore { + inner: InMemoryStore::new(), + puts: AtomicUsize::new(0), + }; + let mut root: Arc = Arc::new(TestNode::default()); + // 64 fixed keys force most root slots past HAMT_VALUES_BUCKET_SIZE + // (= 3), splitting into Link children — a multi-node tree. Keys are + // constant strings, so the BLAKE3-driven shape (and therefore every + // count below) is fully deterministic. + for i in 0u64..64 { + let k = format!("issue34-key-{:04}", i).into_bytes(); + root.set(k, i, &store).await.unwrap(); + } + + let first_key = root.store(&store).await.unwrap().storage_key; + let first = store.puts.load(Ordering::Relaxed); + assert!( + first >= 5, + "setup degenerate: 64 entries should split into several nodes, \ + got only {} PUTs", + first + ); + + // NO mutation between the two stores. + let second_key = root.store(&store).await.unwrap().storage_key; + let second = store.puts.load(Ordering::Relaxed) - first; + + assert_eq!( + first_key, second_key, + "tree unchanged → content address must be identical" + ); + assert_eq!( + second, first, + "issue #34 write amplification: the second store() of a \ + byte-identical tree re-PUT {} nodes (same as the first store's \ + {}), because InMemory children are never downgraded to Stored \ + after a successful persist. An incremental flush would re-PUT 0 \ + nodes here. If this assertion starts failing with second ≈ 0, \ + the bug is FIXED — invert the test into a regression guard.", + second, first + ); + } } From 2d24a24de65b427e57c93c19664d4d19f38309b5 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Fri, 12 Jun 2026 13:23:32 -0400 Subject: [PATCH 12/15] chore: re-lock branch-only deps (k256/ecdsa/rfc6979 for FM-4) after main merge --- Cargo.lock | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index d584710..4cf28f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1457,6 +1457,20 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "ecdsa" +version = "0.16.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee27f32b5c5292967d2d4a9d7f1e0b0aed2c15daded5a60300e4abb9d8020bca" +dependencies = [ + "der", + "digest", + "elliptic-curve", + "rfc6979", + "signature", + "spki", +] + [[package]] name = "ed25519" version = "2.2.3" @@ -1504,6 +1518,7 @@ dependencies = [ "generic-array", "group", "hkdf", + "pkcs8", "rand_core 0.6.4", "sec1", "subtle", @@ -1827,6 +1842,7 @@ dependencies = [ "hex", "hyper 1.8.1", "jsonwebtoken", + "k256", "mime", "mime_guess", "oauth2", @@ -1839,6 +1855,7 @@ dependencies = [ "serde", "serde_json", "sha2", + "sha3", "sqlx", "tempfile", "test-log", @@ -2982,6 +2999,20 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "k256" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6e3919bbaa2945715f0bb6d3934a173d1e9a59ac23767fbaaef277265a7411b" +dependencies = [ + "cfg-if", + "ecdsa", + "elliptic-curve", + "once_cell", + "sha2", + "signature", +] + [[package]] name = "keccak" version = "0.1.5" @@ -4527,6 +4558,16 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "rfc6979" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dd2a808d456c4a54e300a23e9f5a67e122c3024119acbfd73e3bf664491cb2" +dependencies = [ + "hmac", + "subtle", +] + [[package]] name = "ring" version = "0.17.14" @@ -4739,6 +4780,7 @@ dependencies = [ "base16ct", "der", "generic-array", + "pkcs8", "subtle", "zeroize", ] From 70c271410889e004d3812e2a862a96271381316d Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Fri, 12 Jun 2026 19:33:46 -0400 Subject: [PATCH 13/15] fix(test): move the 1 GiB payload into the upload - clone doubled RSS and OOM-killed run #4 dmesg: oom-kill live_ingest_e2e total-vm 3.26GB anon-rss 3.18GB on the 7.8GB e2e box. The blake3 want-hash captured before the move carries everything verification needs. Part of #31 Co-Authored-By: Claude Fable 5 --- crates/fula-client/tests/live_ingest_e2e.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/fula-client/tests/live_ingest_e2e.rs b/crates/fula-client/tests/live_ingest_e2e.rs index 7dc1dce..47d2a21 100644 --- a/crates/fula-client/tests/live_ingest_e2e.rs +++ b/crates/fula-client/tests/live_ingest_e2e.rs @@ -123,11 +123,13 @@ async fn live_1gib_chunked_via_ingest() { let want = blake3::hash(&data); let started = std::time::Instant::now(); - c.put_object_flat(bucket, key, data.clone(), Some("application/octet-stream")) + // MOVE the payload — holding payload + a clone is >2 GiB in this one + // process and OOM-kills the run on the 7.8 GB e2e box (run #4, SIGKILL). + // `want` already carries everything the verification needs. + c.put_object_flat(bucket, key, data, Some("application/octet-stream")) .await .expect("1 GiB chunked flat upload via ingest"); eprintln!("1 GiB upload took {:?}", started.elapsed()); - drop(data); let got = c.get_object_flat(bucket, key).await.expect("1 GiB download"); assert_eq!(got.len(), GIB); From 063db9bec55886ecf9296a2eab747fdfdb175cf4 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Mon, 15 Jun 2026 18:35:27 -0400 Subject: [PATCH 14/15] test(multimaster): FM-1 PgRootStore integration test (real Postgres) End-to-end CAS against the real bucket_roots table (migration 020): claim-on-first-flush, stale-root flush conflicts reporting current, loser-retry-at-shared-root wins, version increments; plus an 8-way concurrent race asserting EXACTLY ONE winner. The exact arbitration two live gateways perform on flush. Gated on POSTGRES_* (skips without). Part of #32 Co-Authored-By: Claude Fable 5 --- crates/fula-cli/tests/root_store_pg_it.rs | 118 ++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 crates/fula-cli/tests/root_store_pg_it.rs diff --git a/crates/fula-cli/tests/root_store_pg_it.rs b/crates/fula-cli/tests/root_store_pg_it.rs new file mode 100644 index 0000000..0c1efed --- /dev/null +++ b/crates/fula-cli/tests/root_store_pg_it.rs @@ -0,0 +1,118 @@ +//! FM-1 (Phase 2.5) — PgRootStore integration test against a REAL Postgres. +//! +//! Proves the shared bucket-root arbiter that makes concurrent federated +//! masters safe behaves correctly against the actual database + migration 020 +//! `bucket_roots` table — the exact CAS two live gateways perform on flush. +//! +//! Skips cleanly when POSTGRES_* is unset (local dev); the Phase 2.5 drill +//! runs it on the test master's stack DB. +//! +//! Run: POSTGRES_HOST=127.0.0.1 POSTGRES_DB=pinning_service \ +//! POSTGRES_USER=pinning_user POSTGRES_PASSWORD=… \ +//! cargo test -p fula-cli --test root_store_pg_it -- --ignored --nocapture + +#![cfg(not(target_arch = "wasm32"))] + +use cid::Cid; +use fula_core::root_pointer::{CasOutcome, RootPointerStore}; +use fula_cli::root_store_pg::PgRootStore; +use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; + +fn cid_of(b: &[u8]) -> Cid { + let h = blake3::hash(b); + let mh = cid::multihash::Multihash::<64>::wrap(0x1e, h.as_bytes()).unwrap(); + Cid::new_v1(0x55, mh) +} + +async fn pool_or_skip() -> Option { + let env = |k: &str| std::env::var(k).ok().filter(|s| !s.trim().is_empty()); + let (host, db, user) = (env("POSTGRES_HOST")?, env("POSTGRES_DB")?, env("POSTGRES_USER")?); + let port = env("POSTGRES_PORT").and_then(|s| s.parse().ok()).unwrap_or(5432u16); + let pass = std::env::var("POSTGRES_PASSWORD").unwrap_or_default(); + let opts = PgConnectOptions::new().host(&host).port(port).database(&db).username(&user).password(&pass); + match PgPoolOptions::new().max_connections(4).connect_with(opts).await { + Ok(p) => Some(p), + Err(e) => { + eprintln!("SKIP: cannot reach Postgres: {e}"); + None + } + } +} + +#[tokio::test] +#[ignore] +async fn pg_cas_arbitrates_the_two_master_race() { + let Some(pool) = pool_or_skip().await else { return }; + let owner = "p25-it-owner"; + let bucket = format!("p25-it-bkt-{}", std::process::id()); + // Clean any prior row for a deterministic run. + let _ = sqlx::query("DELETE FROM bucket_roots WHERE owner_id = $1 AND bucket = $2") + .bind(owner).bind(&bucket).execute(&pool).await; + + let store = PgRootStore::new(pool.clone()); + let (a, b, c) = (cid_of(b"r-a"), cid_of(b"r-b"), cid_of(b"r-c")); + + // First flush claims the slot (no row yet — expected==new is the bootstrap). + assert_eq!(store.cas_root(owner, &bucket, &a, &b).await.unwrap(), CasOutcome::Won); + assert_eq!(store.get_root(owner, &bucket).await.unwrap(), Some(b), "shared root is now B"); + + // Master #2 built on the STALE root A → must conflict, reporting current=B. + match store.cas_root(owner, &bucket, &a, &c).await.unwrap() { + CasOutcome::Conflict { current } => assert_eq!(current, Some(b)), + other => panic!("stale flush must conflict, got {other:?}"), + } + // The pointer is unchanged by the losing CAS. + assert_eq!(store.get_root(owner, &bucket).await.unwrap(), Some(b)); + + // Loser reopens at the shared root B and retries → wins. + assert_eq!(store.cas_root(owner, &bucket, &b, &c).await.unwrap(), CasOutcome::Won); + assert_eq!(store.get_root(owner, &bucket).await.unwrap(), Some(c)); + + // version incremented across the two winning CASes (claim=1, then +1). + let version: i64 = sqlx::query_scalar("SELECT version FROM bucket_roots WHERE owner_id=$1 AND bucket=$2") + .bind(owner).bind(&bucket).fetch_one(&pool).await.unwrap(); + assert_eq!(version, 2, "two winning CASes ⇒ version 2"); + + let _ = sqlx::query("DELETE FROM bucket_roots WHERE owner_id = $1 AND bucket = $2") + .bind(owner).bind(&bucket).execute(&pool).await; +} + +/// Concurrency: fire N tasks that all build on the SAME root; EXACTLY ONE may +/// win — the rest must conflict. This is the real two-master race, collapsed +/// into one process hitting the real Postgres. +#[tokio::test] +#[ignore] +async fn pg_cas_admits_exactly_one_concurrent_winner() { + let Some(pool) = pool_or_skip().await else { return }; + let owner = "p25-it-owner"; + let bucket = format!("p25-it-conc-{}", std::process::id()); + let _ = sqlx::query("DELETE FROM bucket_roots WHERE owner_id=$1 AND bucket=$2") + .bind(owner).bind(&bucket).execute(&pool).await; + + let base = cid_of(b"base"); + // Establish the base root. + PgRootStore::new(pool.clone()).cas_root(owner, &bucket, &base, &base).await.unwrap(); + + // 8 racers, each proposing a distinct new root from `base`. + let mut handles = Vec::new(); + for i in 0..8u8 { + let pool = pool.clone(); + let bucket = bucket.clone(); + let base = base; + handles.push(tokio::spawn(async move { + let store = PgRootStore::new(pool); + let new = cid_of(&[i; 8]); + store.cas_root(owner, &bucket, &base, &new).await.unwrap() + })); + } + let mut wins = 0; + for h in handles { + if matches!(h.await.unwrap(), CasOutcome::Won) { + wins += 1; + } + } + assert_eq!(wins, 1, "exactly one concurrent CAS from the same base may win"); + + let _ = sqlx::query("DELETE FROM bucket_roots WHERE owner_id=$1 AND bucket=$2") + .bind(owner).bind(&bucket).execute(&pool).await; +} From abb881d2ba5659ed821fb5eb6a4e4f14f1c9f5f6 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Mon, 15 Jun 2026 18:37:55 -0400 Subject: [PATCH 15/15] fix(test): FM-1 unit cid_of uses seed digests, not blake3 (not a fula-core dep) root_pointer tests only need distinct CIDs; wrapping a seed-filled 32-byte digest as multihash 0x1e avoids adding blake3 to fula-core. Compile error E0433 from the unit gate. Part of #32 Co-Authored-By: Claude Fable 5 --- crates/fula-core/src/root_pointer.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/fula-core/src/root_pointer.rs b/crates/fula-core/src/root_pointer.rs index 1c8e474..016d268 100644 --- a/crates/fula-core/src/root_pointer.rs +++ b/crates/fula-core/src/root_pointer.rs @@ -58,9 +58,11 @@ mod tests { use super::test_support::InMemoryRootStore; use super::*; - fn cid_of(b: &[u8]) -> Cid { - let h = blake3::hash(b); - let mh = cid::multihash::Multihash::<64>::wrap(0x1e, h.as_bytes()).unwrap(); + /// Distinct (raw, blake3-coded) CIDs from a seed — no real hashing needed, + /// the tests only require uniqueness. Avoids a blake3 dep on fula-core. + fn cid_of(seed: u8) -> Cid { + let digest = [seed; 32]; + let mh = cid::multihash::Multihash::<64>::wrap(0x1e, &digest).unwrap(); Cid::new_v1(0x55, mh) } @@ -70,7 +72,7 @@ mod tests { #[tokio::test] async fn second_master_building_on_a_stale_root_loses_the_cas() { let store = InMemoryRootStore::default(); - let (a, b, c) = (cid_of(b"root-a"), cid_of(b"root-b"), cid_of(b"root-c")); + let (a, b, c) = (cid_of(1), cid_of(2), cid_of(3)); assert_eq!( store.cas_root("owner", "bkt", &a, &b).await.unwrap(), @@ -92,7 +94,7 @@ mod tests { #[tokio::test] async fn buckets_and_owners_are_isolated() { let store = InMemoryRootStore::default(); - let (a, b) = (cid_of(b"a"), cid_of(b"b")); + let (a, b) = (cid_of(10), cid_of(11)); store.cas_root("o1", "bkt", &a, &b).await.unwrap(); assert_eq!(store.get_root("o2", "bkt").await.unwrap(), None); assert_eq!(store.get_root("o1", "other").await.unwrap(), None);