From 9d13d9863938baa007937b75998f3a156e0619cd Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Tue, 16 Jun 2026 20:46:12 -0400 Subject: [PATCH] fix(client): retry transient content-chunk PUTs on wasm + native (0.6.10) Large (chunked) uploads could fail on a single sporadic chunk-PUT drop (ERR_CONNECTION_CLOSED / transient 5xx under the 16-wide concurrent burst). The content-chunk PUT path (put_object_chunked_internal -> put_object_with_ metadata -> request()) had NO retry on either target: the blob-backend retry only wraps __fula_forest_v7_nodes/, and retry_idempotent only wrapped the S3 multipart path AND collapsed to a single attempt on wasm for lack of a sleep primitive. Small (1-chunk) files almost never hit a drop; large ones did, worst on the web (FxFiles #50). - multipart::retry_idempotent: unify the loop across targets; add a wasm backoff sleep via gloo-timers (TimeoutFuture); make it pub(crate). - is_transient: un-gate for wasm (body already wasm-aware) so the unified retry can classify transient errors there. - put_object_chunked_internal: wrap each chunk PUT in retry_idempotent (4 attempts, exponential backoff capped at 5s). Safe: chunk keys are content addressed (idempotent). Per-attempt clones are cheap (Arc / Bytes / small structs). - add gloo-timers (wasm32 target, futures feature); bump workspace version 0.6.9 -> 0.6.10. Tests: - chunk_put_retries_transient (new): RED before / GREEN after -- wiremock injects one transient chunk 503 and asserts the chunked upload survives via the per-chunk retry. - chunk_retry_real_server_e2e (new, #[ignore]): 3 MiB chunked upload + byte-identical round-trip against the live master (Mode A creds). - full fula-client suite green (208 lib + 4 blob-backend retry); cargo check --target wasm32-unknown-unknown clean. Scope note: the per-file INDEX-object PUT (written after the chunks) is the same idempotent, currently-unretried pattern. Left out of scope here (the reported failure and the test both target the content chunks); a one-line follow-up can wrap it in the same retry_idempotent. Co-Authored-By: Claude Opus 4.8 --- Cargo.lock | 13 +- Cargo.toml | 2 +- crates/fula-client/Cargo.toml | 4 + crates/fula-client/src/encryption.rs | 59 ++++++--- crates/fula-client/src/multipart.rs | 73 +++++------ .../tests/chunk_put_retries_transient.rs | 113 ++++++++++++++++++ .../tests/chunk_retry_real_server_e2e.rs | 93 ++++++++++++++ 7 files changed, 299 insertions(+), 58 deletions(-) create mode 100644 crates/fula-client/tests/chunk_put_retries_transient.rs create mode 100644 crates/fula-client/tests/chunk_retry_real_server_e2e.rs diff --git a/Cargo.lock b/Cargo.lock index 2aa7788..a61f0f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1736,7 +1736,7 @@ dependencies = [ [[package]] name = "fula-api" -version = "0.6.9" +version = "0.6.10" dependencies = [ "anyhow", "axum", @@ -1765,7 +1765,7 @@ dependencies = [ [[package]] name = "fula-blockstore" -version = "0.6.9" +version = "0.6.10" dependencies = [ "anyhow", "async-trait", @@ -1803,7 +1803,7 @@ dependencies = [ [[package]] name = "fula-cli" -version = "0.6.9" +version = "0.6.10" dependencies = [ "anyhow", "async-trait", @@ -1857,7 +1857,7 @@ dependencies = [ [[package]] name = "fula-client" -version = "0.6.9" +version = "0.6.10" dependencies = [ "anyhow", "async-trait", @@ -1871,6 +1871,7 @@ dependencies = [ "fs2", "fula-crypto", "futures", + "gloo-timers", "hex", "mime_guess", "parking_lot", @@ -1899,7 +1900,7 @@ dependencies = [ [[package]] name = "fula-core" -version = "0.6.9" +version = "0.6.10" dependencies = [ "anyhow", "async-trait", @@ -1934,7 +1935,7 @@ dependencies = [ [[package]] name = "fula-crypto" -version = "0.6.9" +version = "0.6.10" dependencies = [ "aes-gcm", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 4e45609..ba1fdd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ name = "encrypted_upload_test" path = "examples/encrypted_upload_test.rs" [workspace.package] -version = "0.6.9" +version = "0.6.10" edition = "2021" license = "MIT OR Apache-2.0" repository = "https://github.com/functionland/fula-api" diff --git a/crates/fula-client/Cargo.toml b/crates/fula-client/Cargo.toml index 09603de..5c1e7f4 100644 --- a/crates/fula-client/Cargo.toml +++ b/crates/fula-client/Cargo.toml @@ -83,6 +83,10 @@ reqwest = { version = "0.12", default-features = false, features = ["json"] } uuid = { version = "1.11", features = ["v4", "serde", "js"] } # MultipartAbortGuard Drop impl spawns the abort future on wasm wasm-bindgen-futures = "0.4" +# Backoff sleep for retry_idempotent on wasm — the browser has no tokio +# reactor. `futures` feature exposes TimeoutFuture (a setTimeout-backed Future) +# so the per-chunk upload retry (FxFiles #50) actually sleeps between attempts. +gloo-timers = { version = "0.3", features = ["futures"] } # Walkable-v8 (W.9.2): `S3BlobBackend::put` parses master's PUT-response # ETag into a `Cid` on every target so the cross-platform CID-stamping # seam stays symmetric. cid 0.11 is wasm-compatible; default-features diff --git a/crates/fula-client/src/encryption.rs b/crates/fula-client/src/encryption.rs index 522aa0b..8ac2517 100644 --- a/crates/fula-client/src/encryption.rs +++ b/crates/fula-client/src/encryption.rs @@ -7113,23 +7113,48 @@ impl EncryptedClient { let chunk_key_ret = chunk_key.clone(); async move { - let put_result = if let Some(ref pin) = pinning { - client.put_object_with_metadata_and_pinning( - &bucket, - &chunk_key, - chunk.ciphertext, - Some(chunk_metadata), - &pin.endpoint, - &pin.token, - ).await? - } else { - client.put_object_with_metadata( - &bucket, - &chunk_key, - chunk.ciphertext, - Some(chunk_metadata), - ).await? - }; + // FxFiles #50: the content-chunk PUT had no retry on EITHER + // target — the native blob-backend retry only wraps forest + // nodes, not content chunks. One sporadic chunk-PUT drop + // (ERR_CONNECTION_CLOSED / transient 5xx under the 16-wide + // concurrent burst) thus failed the whole large upload (worst + // on the web, where the retry was wasm-gated). Retry each chunk + // PUT on transient errors — safe because the chunk key is + // content-addressed (idempotent). Per-attempt clones are cheap + // (Arc / Bytes / small structs). + let ciphertext = bytes::Bytes::from(chunk.ciphertext); + let put_result = crate::multipart::retry_idempotent(4, || { + let client = client.clone(); + let bucket = bucket.clone(); + let chunk_key = chunk_key.clone(); + let pinning = pinning.clone(); + let chunk_metadata = chunk_metadata.clone(); + let body = ciphertext.clone(); + async move { + if let Some(ref pin) = pinning { + client + .put_object_with_metadata_and_pinning( + &bucket, + &chunk_key, + body, + Some(chunk_metadata), + &pin.endpoint, + &pin.token, + ) + .await + } else { + client + .put_object_with_metadata( + &bucket, + &chunk_key, + body, + Some(chunk_metadata), + ) + .await + } + } + }) + .await?; // W.9.4-A2: verify master's etag-attested CID against // the pre-computed BLAKE3(ciphertext). Mismatch // soft-fails to None — chunk PUT succeeded, only the diff --git a/crates/fula-client/src/multipart.rs b/crates/fula-client/src/multipart.rs index 8cebc56..296e3d5 100644 --- a/crates/fula-client/src/multipart.rs +++ b/crates/fula-client/src/multipart.rs @@ -415,54 +415,59 @@ pub async fn upload_large_file( /// an orphan upload) or `complete_upload` (separately handled — a completed /// upload replies `NoSuchUpload` on retry, which callers must treat as /// success-equivalent). (NEW-F3.) -async fn retry_idempotent(max_attempts: usize, mut op: F) -> Result +pub(crate) async fn retry_idempotent(max_attempts: usize, mut op: F) -> Result where F: FnMut() -> Fut, Fut: std::future::Future>, { - // WASM target has no reliable sleep primitive available from this crate - // (no tokio reactor / gloo-timers dep), so a retry loop would hammer the - // server in a tight cycle — worse than no retry. Collapse to - // single-attempt on WASM; native callers get the backoff path. - #[cfg(target_arch = "wasm32")] - { - let _ = max_attempts; - return op().await; - } - #[cfg(not(target_arch = "wasm32"))] - { - let mut attempt = 0usize; - loop { - attempt += 1; - match op().await { - Ok(v) => return Ok(v), - Err(e) if attempt >= max_attempts || !is_transient(&e) => return Err(e), - Err(e) => { - let backoff_ms = std::cmp::min(5_000u64, 100u64 * (1u64 << (attempt as u32 - 1))); - tracing::debug!( - attempt, - max_attempts, - backoff_ms, - error = %e, - "multipart: transient error, retrying" - ); - tokio::time::sleep(Duration::from_millis(backoff_ms)).await; - } + // Unified retry for native AND wasm — the only per-target difference is the + // sleep primitive (see [`retry_backoff_sleep`]). wasm previously collapsed + // to a single attempt for lack of a sleep dep, which left the chunked + // content-upload path (`encryption.rs::put_object_chunked_internal`) with + // NO retry on the web: one sporadic chunk-PUT drop (ERR_CONNECTION_CLOSED / + // transient 5xx under the 16-wide concurrent burst) failed the whole large + // upload while small (1-chunk) files were fine (FxFiles #50). gloo-timers + // gives wasm a real backoff so the retry actually fires there. + let mut attempt = 0usize; + loop { + attempt += 1; + match op().await { + Ok(v) => return Ok(v), + Err(e) if attempt >= max_attempts || !is_transient(&e) => return Err(e), + Err(e) => { + let backoff_ms = std::cmp::min(5_000u64, 100u64 * (1u64 << (attempt as u32 - 1))); + tracing::debug!( + attempt, + max_attempts, + backoff_ms, + error = %e, + "retry_idempotent: transient error, retrying" + ); + retry_backoff_sleep(backoff_ms).await; } } } } +/// Backoff sleep for [`retry_idempotent`]. tokio on native; gloo-timers on +/// wasm32 — the browser has no tokio reactor and the wasm build's tokio +/// feature set excludes `tokio::time`. +async fn retry_backoff_sleep(ms: u64) { + #[cfg(not(target_arch = "wasm32"))] + tokio::time::sleep(Duration::from_millis(ms)).await; + #[cfg(target_arch = "wasm32")] + gloo_timers::future::TimeoutFuture::new(ms as u32).await; +} + /// Classify a ClientError as retry-safe. Network-level failures (connect, /// timeout, decode) are always transient. S3-reported statuses 429/500/502/503/504 /// are transient; everything else (auth, 4xx policy errors, malformed response) /// is not. /// -/// Native-only: `retry_idempotent` short-circuits to a single attempt on -/// wasm32 (no sleep primitive), so the classifier has no caller there. -/// Shared with `S3BlobBackend::{get, put}` in `encryption.rs` so the -/// blob-backend retry loop matches the same transient set. -#[cfg(not(target_arch = "wasm32"))] +/// Used on BOTH targets now: `retry_idempotent` retries on wasm too (FxFiles +/// #50), and it is shared with `S3BlobBackend::{get, put}` in `encryption.rs` +/// so the blob-backend retry loop matches the same transient set. The body is +/// wasm-aware — reqwest's `is_connect` is native-only (see the cfg inside). pub(crate) fn is_transient(err: &ClientError) -> bool { match err { ClientError::Http(e) => { diff --git a/crates/fula-client/tests/chunk_put_retries_transient.rs b/crates/fula-client/tests/chunk_put_retries_transient.rs new file mode 100644 index 0000000..c027d49 --- /dev/null +++ b/crates/fula-client/tests/chunk_put_retries_transient.rs @@ -0,0 +1,113 @@ +//! Regression test (FxFiles issue #50 follow-up): content-chunk PUTs must be +//! retried on a transient failure — on web AND native. +//! +//! The chunked content-upload path +//! `put_object_flat` → `put_object_chunked_internal` +//! → `FulaClient::put_object_with_metadata` → `request()` +//! does a single `send()` with NO retry on either target. The native +//! blob-backend retry loop (`S3BlobBackend::put`) only wraps the +//! `__fula_forest_v7_nodes/` class, and `retry_idempotent` only wraps the +//! S3-multipart path — neither covers the per-file content chunks. So a single +//! sporadic chunk-PUT drop (`ERR_CONNECTION_CLOSED` / 503 under the 16-wide +//! concurrent burst) fails the whole large upload. +//! +//! This drives a >768 KB (therefore chunked) encrypted upload through a +//! wiremock master that injects exactly ONE 503 on the first content/chunk PUT +//! and asserts the upload still succeeds. RED before the fix (no chunk retry), +//! GREEN after wrapping the chunk PUT in `retry_idempotent`. + +#![cfg(not(target_arch = "wasm32"))] + +use bytes::Bytes; +use cid::multihash::Multihash; +use cid::Cid; +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; +use wiremock::matchers::method; +use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate}; + +/// BLAKE3-raw CID of the body, returned as the ETag so the SDK's W.9.3 / +/// W.9.4 self-verify accepts the (retried) PUT response. +fn blake3_raw_cid(data: &[u8]) -> Cid { + let h = blake3::hash(data); + let mh = Multihash::<64>::wrap(0x1e, h.as_bytes()).expect("blake3 multihash wrap"); + Cid::new_v1(0x55, mh) +} + +/// Injects ONE 503 on the first CONTENT PUT (the content chunks upload before +/// the per-file index object, and forest blobs live under `__fula_forest_*`, +/// so the first non-forest PUT is a chunk). Everything else gets 200 + a +/// matching ETag. +struct FlakeFirstChunkPut { + injected: Arc, + injected_count: Arc, +} + +impl Respond for FlakeFirstChunkPut { + fn respond(&self, req: &Request) -> ResponseTemplate { + let is_forest = req.url.path().contains("__fula_forest_"); + if !is_forest && !self.injected.swap(true, Ordering::SeqCst) { + self.injected_count.fetch_add(1, Ordering::SeqCst); + return ResponseTemplate::new(503); + } + let cid = blake3_raw_cid(&req.body); + ResponseTemplate::new(200).insert_header("ETag", cid.to_string()) + } +} + +#[tokio::test] +async fn chunked_upload_survives_one_transient_chunk_put() { + let server = MockServer::start().await; + let injected = Arc::new(AtomicBool::new(false)); + let injected_count = Arc::new(AtomicUsize::new(0)); + + Mock::given(method("PUT")) + .respond_with(FlakeFirstChunkPut { + injected: injected.clone(), + injected_count: injected_count.clone(), + }) + .mount(&server) + .await; + // Fresh bucket → 404 GET → cold v7 forest bootstrap. + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(404)) + .mount(&server) + .await; + // The pre-fix failure path best-effort DELETEs already-uploaded chunks. + Mock::given(method("DELETE")) + .respond_with(ResponseTemplate::new(204)) + .mount(&server) + .await; + Mock::given(method("HEAD")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + let mut config = Config::new(&server.uri()).with_token("test-jwt"); + config.walkable_v8_writer_enabled = true; + let secret = SecretKey::generate(); + let enc_config = EncryptionConfig::from_secret_key(secret); + let client = EncryptedClient::new(config, enc_config).expect("EncryptedClient::new"); + + // 2 MiB > CHUNKED_THRESHOLD (768 KB) ⇒ chunked upload: several concurrent + // chunk PUTs, one of which gets the injected 503. + let data = vec![0xABu8; 2 * 1024 * 1024]; + + let result = client + .put_object_flat("videos-v8", "/promo.mp4", Bytes::from(data), Some("video/mp4")) + .await; + + assert_eq!( + injected_count.load(Ordering::SeqCst), + 1, + "test must inject exactly one transient chunk 503" + ); + assert!( + result.is_ok(), + "chunked upload must survive one transient chunk-PUT 503 via per-chunk \ + retry; got error: {:?}", + result.err() + ); +} diff --git a/crates/fula-client/tests/chunk_retry_real_server_e2e.rs b/crates/fula-client/tests/chunk_retry_real_server_e2e.rs new file mode 100644 index 0000000..66a68af --- /dev/null +++ b/crates/fula-client/tests/chunk_retry_real_server_e2e.rs @@ -0,0 +1,93 @@ +//! Real-server E2E (FxFiles #50): a LARGE (chunked) encrypted upload + round +//! trip against the LIVE master, exercising the per-chunk upload path end to +//! end with a real account's credentials. Confirms the SDK still uploads and +//! reads back a multi-chunk file after the per-chunk-retry change. +//! +//! `#[ignore]` — needs network + real credentials. Run (PowerShell, env from +//! `e2e-credentials.env`): +//! +//! ```powershell +//! cargo test -p fula-client --test chunk_retry_real_server_e2e --release ` +//! -- --ignored --nocapture +//! ``` +//! +//! Required env: `FULA_S3`, `FULA_JWT`, `FULA_TEST_PROVIDER`, +//! `FULA_TEST_OAUTH_SUB`, `FULA_TEST_EMAIL` (the Mode A derivation triple — +//! Argon2id("fula-files-v1", "{provider}:{sub}:{email}"), auth_service.dart +//! byte-for-byte). A fresh, uniquely-named bucket is created so the test is +//! self-contained and never touches the account's real data. + +#![cfg(not(target_arch = "wasm32"))] + +use bytes::Bytes; +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; + +fn env(name: &str) -> String { + std::env::var(name).unwrap_or_else(|_| panic!("missing required env {name}")) +} + +#[tokio::test] +#[ignore = "real-server E2E; needs FULA_S3 + FULA_JWT + Mode A triple"] +async fn large_chunked_upload_roundtrips_against_real_master() { + let s3 = env("FULA_S3"); + let jwt = env("FULA_JWT"); + let input = format!( + "{}:{}:{}", + env("FULA_TEST_PROVIDER"), + env("FULA_TEST_OAUTH_SUB"), + env("FULA_TEST_EMAIL"), + ); + + // Mode A secret = Argon2id("fula-files-v1", "{provider}:{sub}:{email}"). + let key = fula_crypto::hashing::derive_key_argon2id("fula-files-v1", input.as_bytes()); + let secret = SecretKey::from_bytes(&key).expect("32-byte secret from Argon2id"); + + let mut config = Config::new(&s3).with_token(&jwt); + config.walkable_v8_writer_enabled = true; // match FxFiles production + let client = EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)) + .expect("EncryptedClient::new"); + + // Fresh uniquely-named bucket → clean forest, never collides with real data. + let epoch = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let bucket = format!("e2e-chunkretry-{epoch}-v8"); + // Best-effort (the gateway may auto-create on first write; tolerate exists). + if let Err(e) = client.create_bucket(&bucket).await { + eprintln!("[#50 e2e] create_bucket({bucket}) returned {e} — continuing"); + } + + // 3 MiB > CHUNKED_THRESHOLD (768 KB) ⇒ multi-chunk concurrent upload — the + // exact path that drops on the web without the per-chunk retry. + let mut data = vec![0u8; 3 * 1024 * 1024]; + for (i, b) in data.iter_mut().enumerate() { + *b = (i % 251) as u8; // non-trivial pattern so the round-trip is meaningful + } + let key_path = "/promo-video-e2e.bin"; + + client + .put_object_flat( + &bucket, + key_path, + Bytes::from(data.clone()), + Some("application/octet-stream"), + ) + .await + .expect("large chunked upload must succeed against the live master"); + + // Round-trip: download + verify byte-identical (chunks stored + reassembled). + let got = client + .get_object_flat(&bucket, key_path) + .await + .expect("download must succeed"); + assert_eq!(got.len(), data.len(), "downloaded size mismatch"); + assert_eq!(&got[..], &data[..], "downloaded bytes must equal uploaded"); + + // Best-effort cleanup so the test account doesn't accrue orphans. + let _ = client.delete_object_flat(&bucket, key_path).await; + let _ = client.delete_bucket(&bucket).await; + + eprintln!("[#50 e2e] OK: 3 MiB chunked upload + round-trip against {s3} (bucket {bucket})"); +}