From 28ded2a9ed870fa6bd2b0ce99df81ec354bfde12 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 18 Jun 2026 12:32:01 -0400 Subject: [PATCH 1/6] Streaming upload P1: plan-mode ChunkedEncoder (no-AEAD pass 1) + plan doc Foundation for web streaming + resumable large-file uploads (see docs/web-streaming-resumable-upload-plan.md). Adds a plan-only mode to ChunkedEncoder (into_plan_only): it still generates the per-chunk nonce, feeds the plaintext to the BAO + content hashers, and advances the chunk count, but skips the AEAD encrypt and retains NO ciphertext. The chunking / BAO / nonce code is shared verbatim with the encrypting path, so plan-mode and a full encode produce identical root_hash / content_hash / num_chunks for the same input (the random per-chunk nonces differ, which is fine). This is pass 1 of the streaming upload: commit the integrity root + nonce list without holding ciphertext, then pass 2 re-encrypts each chunk from its stored nonce (deterministic AEAD => identical ciphertext => idempotent PUT). Tests (fula-crypto): - test_plan_mode_matches_full_encode: root/content_hash/num_chunks parity vs a full encode; plan-mode retains no ciphertext. - test_plan_mode_nonces_reencrypt_and_decrypt_roundtrip: commit nonces -> re-encrypt each chunk from its stored nonce -> decode byte-exact. This is the resume-safety core (deterministic re-encryption from committed nonces). Full fula-crypto suite green (452 passed); wasm32 build green via fula-flutter. No change to the encrypting path's behavior (existing encode/decode tests pass). Co-Authored-By: Claude Opus 4.8 --- crates/fula-crypto/src/chunked.rs | 130 ++++++++++- docs/web-streaming-resumable-upload-plan.md | 236 ++++++++++++++++++++ 2 files changed, 358 insertions(+), 8 deletions(-) create mode 100644 docs/web-streaming-resumable-upload-plan.md diff --git a/crates/fula-crypto/src/chunked.rs b/crates/fula-crypto/src/chunked.rs index 23f4ab3..0cb5742 100644 --- a/crates/fula-crypto/src/chunked.rs +++ b/crates/fula-crypto/src/chunked.rs @@ -239,6 +239,15 @@ pub struct ChunkedEncoder { current_chunk: Vec, bytes_processed: u64, aad_prefix: Option>, + /// Plan-only mode (streaming pass 1): still generate the per-chunk nonce and + /// feed the plaintext to the BAO + content hashers and advance the chunk + /// count, but skip the AEAD encrypt and retain NO ciphertext. The committed + /// integrity root + nonce list stay valid for pass 2, which re-encrypts each + /// chunk from its stored nonce (deterministic AEAD => identical ciphertext). + /// Default false (normal encode). The chunking/BAO/nonce code is shared with + /// the encrypting path verbatim, so plan-mode and full-encode produce the + /// SAME root, content hash and chunk count for the same input. + plan_only: bool, } /// An encrypted chunk ready for upload @@ -270,6 +279,7 @@ impl ChunkedEncoder { current_chunk: Vec::with_capacity(chunk_size), bytes_processed: 0, aad_prefix: None, + plan_only: false, } } @@ -292,6 +302,15 @@ impl ChunkedEncoder { } } + /// Switch this encoder into plan-only mode (no AEAD; see the `plan_only` + /// field). Chainable: `ChunkedEncoder::with_aad(dek, prefix).into_plan_only()`. + /// Pass 1 of the streaming/resumable upload uses this to commit the nonce + /// list + integrity root without holding ciphertext. + pub fn into_plan_only(mut self) -> Self { + self.plan_only = true; + self + } + /// Feed data into the encoder /// /// Returns any complete chunks that are ready for upload. @@ -365,18 +384,26 @@ impl ChunkedEncoder { // Generate a unique nonce for this chunk let nonce = Nonce::generate(); - // Encrypt the chunk, with AAD if prefix is set - let aead = Aead::new_default(&self.dek); - let ciphertext = if let Some(ref prefix) = self.aad_prefix { - let aad = format!("{}:{}", String::from_utf8_lossy(prefix), chunk_index).into_bytes(); - aead.encrypt_with_aad(&nonce, &self.current_chunk, &aad)? + // Encrypt the chunk (with AAD if a prefix is set) — UNLESS plan-only, + // where pass 1 commits the nonce + integrity root without retaining + // ciphertext and pass 2 re-encrypts from the stored nonce. The nonce is + // still generated above so the committed nonce list is complete. + let ciphertext = if self.plan_only { + Bytes::new() } else { - aead.encrypt(&nonce, &self.current_chunk)? + let aead = Aead::new_default(&self.dek); + let ct = if let Some(ref prefix) = self.aad_prefix { + let aad = format!("{}:{}", String::from_utf8_lossy(prefix), chunk_index).into_bytes(); + aead.encrypt_with_aad(&nonce, &self.current_chunk, &aad)? + } else { + aead.encrypt(&nonce, &self.current_chunk)? + }; + Bytes::from(ct) }; - + let chunk = EncryptedChunk { index: chunk_index, - ciphertext: Bytes::from(ciphertext), + ciphertext, nonce: nonce.clone(), }; @@ -883,6 +910,93 @@ mod tests { ); } + /// Plan-only mode (streaming pass 1) must produce the SAME integrity root, + /// content hash and chunk count as a full encode of the same input — they + /// are all plaintext-derived, so encryption can't change them. (Per-chunk + /// nonces are random and therefore differ between the two encoders; that is + /// expected and harmless.) This is what lets pass 1 commit the manifest + /// without ciphertext and have pass 2's re-encrypt match the committed root. + #[test] + fn test_plan_mode_matches_full_encode() { + let dek = DekKey::generate(); + let cs = MIN_CHUNK_SIZE; + let original = b"plan vs full parity ".repeat(5000); // multi-chunk + partial tail + let prefix: &[u8] = b"fula:v4:chunk:teststore/key"; + + let mut full = ChunkedEncoder::with_aad_and_chunk_size(dek.clone(), prefix, cs); + full.update(&original).unwrap(); + let full_ch = full.content_hash_hex(); + let (_fc, full_meta, _fob) = full.finalize().unwrap(); + + let mut plan = + ChunkedEncoder::with_aad_and_chunk_size(dek.clone(), prefix, cs).into_plan_only(); + let ready = plan.update(&original).unwrap(); + assert!( + ready.iter().all(|c| c.ciphertext.is_empty()), + "plan-only mode must not retain ciphertext" + ); + let plan_ch = plan.content_hash_hex(); + let (final_chunk, plan_meta, _pob) = plan.finalize().unwrap(); + assert!( + final_chunk.as_ref().map_or(true, |c| c.ciphertext.is_empty()), + "plan-only final chunk must not retain ciphertext" + ); + + assert_eq!(full_meta.root_hash, plan_meta.root_hash, "BAO root must match"); + assert_eq!(full_ch, plan_ch, "content hash must match"); + assert_eq!( + full_meta.num_chunks, plan_meta.num_chunks, + "chunk count must match" + ); + assert!(full_meta.num_chunks > 1, "test needs a multi-chunk input"); + assert_eq!( + plan_meta.chunk_nonces.len(), + plan_meta.num_chunks as usize, + "plan-only must commit one nonce per chunk" + ); + } + + /// Load-bearing correctness test for the whole streaming scheme: pass 1 + /// (plan-only) commits the nonces + metadata WITHOUT ciphertext, then pass 2 + /// re-encrypts each chunk from its STORED nonce (deterministic AEAD), and the + /// committed metadata decrypts the result back to the exact original. Proves + /// commit-then-re-encrypt round-trips — the foundation of resume safety. + #[test] + fn test_plan_mode_nonces_reencrypt_and_decrypt_roundtrip() { + let dek = DekKey::generate(); + let cs = MIN_CHUNK_SIZE; + let original = b"pass1 commits nonces; pass2 re-encrypts then decode. ".repeat(3000); + let prefix: &[u8] = b"fula:v4:chunk:teststore/key"; + + // PASS 1 — plan-only: commit nonces + metadata, retain no ciphertext. + let mut planner = + ChunkedEncoder::with_aad_and_chunk_size(dek.clone(), prefix, cs).into_plan_only(); + planner.update(&original).unwrap(); + let (_final, metadata, _ob) = planner.finalize().unwrap(); + let num_chunks = metadata.num_chunks as usize; + assert!(num_chunks > 1); + + // PASS 2 (simulated): re-encrypt each chunk from its stored nonce + the + // same AAD the encoder uses, exactly as the streaming uploader will. + let mut decoder = ChunkedDecoder::with_aad(dek.clone(), metadata.clone(), prefix); + for i in 0..num_chunks { + let start = i * cs; + let end = ((i + 1) * cs).min(original.len()); + let pt = &original[start..end]; + let nonce = metadata.get_chunk_nonce(i as u32).unwrap(); + let aead = Aead::new_default(&dek); + let aad = format!("{}:{}", String::from_utf8_lossy(prefix), i).into_bytes(); + let ct = aead.encrypt_with_aad(&nonce, pt, &aad).unwrap(); + decoder.decrypt_chunk(i as u32, &ct).unwrap(); + } + let recovered = decoder.finalize().unwrap(); + assert_eq!( + recovered.as_ref(), + original.as_slice(), + "commit-then-re-encrypt-from-stored-nonce must round-trip byte-exact" + ); + } + #[test] fn test_chunk_key_generation() { let base = "abc123/file.txt"; diff --git a/docs/web-streaming-resumable-upload-plan.md b/docs/web-streaming-resumable-upload-plan.md new file mode 100644 index 0000000..264dcae --- /dev/null +++ b/docs/web-streaming-resumable-upload-plan.md @@ -0,0 +1,236 @@ +# Web Streaming + Resumable Large-File Upload — Plan + +Status: **PLAN (not started)** · Owner: SDK (`fula-api`) + FxFiles web · Created 2026-06-18 +Delivery: **one `fula-api` PR delivered in phases P1–P5** (streaming AND resume together), +then a separate FxFiles consumer PR (P6, needs the published SDK). +Build on branch `feat/wasm-resumable-upload` (groundwork commit `468543e` already lifts +`UploadManifest`/`ManifestChunk` onto wasm + un-gates `decrypt_resumable_private_meta`). + +--- + +## 0. Problem & goal + +Large files OOM low-RAM web tabs. Root cause: the wasm upload path materializes the +**entire ciphertext** before the first PUT (`fula-client/src/encryption.rs:7325`, +`put_object_chunked_internal` → `encoder.update(data)` → `all_chunks` Vec → `buffer_unordered`). +Peak memory ≈ 2× file size. The interim "surface the failure" UX shipped (FxFiles v1.11.4.6, +PR #59); **this plan removes the cap** so multi-hundred-MB / GB files upload from a 1–2 GB phone. + +Goal: stream the file through the encrypt+PUT loop so **peak memory is independent of file +size**, AND make the upload **resumable** after interruption — under hard guarantees: + +| Requirement (user) | Where addressed | +|---|---| +| 1. No silent corruption / chunk mixup | §4 INV-4, §3 AAD binding, download-side `bao_root`+`content_hash` verify; tests §8 | +| 2. Properly unit + e2e tested | §8 (round-trip, interrupt→resume, mixup-detector, nonce-reuse guard, e2e) | +| 3. Smooth error handling, no corruption on interruption | §2 two-pass atomicity, §6 error matrix, per-chunk idempotent retry | +| 4. Resume + security + no unattended ghost nodes | §2 resume, §4 INV-1/2/3/6/7, §5 orphan lifecycle | + +--- + +## 1. Verified facts (the design rests on these — re-confirm if the SDK changes) + +- **BAO root + `content_hash` are computed over PLAINTEXT**, not ciphertext + (`fula-crypto/src/chunked.rs:313-317`: `bao_encoder.update(data)` + `content_hasher.update(data)` + where `data` is the plaintext). ⇒ **Pass 1 needs no encryption.** +- `ChunkedEncoder::content_hash_hex()` = `blake3(plaintext)` (`chunked.rs:325`). ⇒ cheap resume-verify. +- Per-chunk nonce is **random** (`Nonce::generate()`, `chunked.rs:366`). ⇒ must persist; reuse only after content-verify. +- AAD = `"fula:v4:chunk:{storage_key}:{index}"` (`chunked.rs:371`). ⇒ cryptographic anti-mixup binding. +- Chunks are **pinned per-chunk on PUT** (`encryption.rs:7411`, `put_object_with_metadata_and_pinning`). + ⇒ abandoned uploads leave *pinned* orphans → active cleanup required (§5). +- Per-chunk PUT **retry already exists** (`encryption.rs:7403`, `retry_idempotent(4, …)`, FxFiles #50). ⇒ reuse in pass 2. +- Post-PUT **CID verify exists** (`encryption.rs:7447`, walkable_v8 `verify_etag_against_expected_cid`). ⇒ wire into the "uploaded" flag. +- Native already has **file-based resumable** (`UploadManifest`, stored-nonce reuse, BAO root check, WAL), + all `#[cfg(not(wasm32))]`. wasm mirrors it **minus filesystem + WAL**. +- **Gateway auto-GC is OFF** (one-off manual GC only — see memory `project_fula_v7_migration_gone_fix`). + This is a *load-bearing assumption* for §5; the orphan design must not silently depend on it (and doesn't — it cleans actively). + +**Security design validated by independent adversarial review (Codex / GPT-5.5):** the nonce-reuse +argument is sound *iff* §4's invariants are enforced; the dangerous edge is reuse ordering on resume (INV-1). + +--- + +## 2. Architecture — two-pass, where the upload pass *is* the resume path + +A page reload loses the browser `Blob` reference (browsers won't persist file handles; iOS Safari +has no File System Access API). So **in-session** resume (network blip, no reload) is seamless; +**cross-reload** resume requires the user to re-select the same file, gated by INV-1. + +``` +ChunkSource (Dart Blob.slice → bytes, pulled lazily by Rust) ── §10 de-risk first (P1) + │ + PASS 1 — plan/commit (hash-only, NO encryption; ~1 chunk in memory) + │ chunk plaintext → generate nonce/chunk → feed BAO + content_hash + │ finalize → root_hash + content_hash + nonces[] + │ ⇒ commit immutable manifest (§3). Interrupted? nothing uploaded → RESTART pass 1. + ▼ + PASS 2 — upload (re-encrypt with STORED nonce; ~concurrency×chunk in memory) + │ for each not-yet-uploaded chunk i: AES-GCM(dek, nonce_i, plaintext_i, AAD=.../i) + │ deterministic ⇒ identical ciphertext ⇒ idempotent content-addressed PUT (retry per chunk) + │ post-PUT CID verify → mark uploaded[i]; poll ManifestHandle → Dart persists to IndexedDB + │ Interrupted? manifest has uploaded[] → RESUME pass 2 (reuse stored nonces, INV-1). + ▼ + FINALIZE — write index/metadata (header_safe, 0.6.13) + forest entry. COMMIT POINT. +``` + +**Fresh upload = pass 1 + pass 2-from-0. Resume = reuse manifest + pass 2-from-K.** +Resume never re-runs BAO mid-stream → no fragile blake3-state serialization. +Pass 2's re-encrypt-from-stored-nonce is *exactly* the existing native resume logic, applied to a Blob source. + +**Why two-pass (not single-pass):** because BAO is over plaintext, pass 1 is nearly free, so two-pass +buys a clean separation of *sequential* BAO/commit from *concurrent, idempotent* upload — far easier to +prove correct than entangling a sequential BAO advance with out-of-order concurrent PUTs. (Single-pass +was considered and rejected on this basis, not on a false "avoids blake3 serialization" claim.) + +**Concurrency:** 2–4 in-flight chunks on low-end (`WebDeviceClass.lowEnd`), up to 16 on desktop. + +--- + +## 3. The manifest (split: immutable commitment + mutable progress) + +Persisted to **IndexedDB by Dart** (the SDK exposes the bytes via a pollable `ManifestHandle`, mirroring +`ProgressHandle`/`CancelHandle` — see memory `reference_fula_frb_polling_not_streamsink`). **No raw DEK is ever persisted** — only `wrapped_dek`. + +**Immutable commitment block** (frozen the instant pass 1 finalizes; the *only* thing that gates nonce reuse): +`algorithm_version`, `storage_key`, `bucket`, `total_size`, `chunk_size`, `num_chunks` +(`== ceil(total_size/chunk_size)`), `nonces[]` (len `== num_chunks`, base64), `bao_root`, +`content_hash`, `wrapped_dek`, `kek_version`, `upload_id`, `created_at`. + +**Mutable progress block:** `uploaded[]` (per-chunk flag = "PUT succeeded AND post-PUT CID verified"), +optional `chunk_cids[]` (offline-walk hints, recomputed in pass 2). + +--- + +## 4. Security & integrity invariants (HARD — each maps to a test in §8) + +- **INV-1 (nonce-reuse gate — the catastrophic one).** No stored nonce may encrypt until the byte + stream is *proven identical* to the manifest. **Cross-reload: fully recompute `content_hash` + + `total_size` + `num_chunks` and match the manifest BEFORE any AES-GCM call.** Never interleave + hashing with encrypting. (In-session reuse is safe because the `Blob` is the same immutable source.) +- **INV-2 (atomic commit).** Pass 1 publishes the commitment block only after *all* its fields are + finalized from one stream; reject if `nonces.length != num_chunks` or `num_chunks != ceil(total_size/chunk_size)`. +- **INV-3 (per-chunk precondition).** Before encrypting chunk `i`: `0 ≤ i < num_chunks`; `nonce[i]` + present and 96-bit; AAD uses `manifest.storage_key` and exact `i`; slice offset `== i*chunk_size`; + slice length `== min(chunk_size, total_size - offset)`. Reject extra/short/zero-length reads. +- **INV-4 (anti-mixup).** AAD binds ciphertext to `(storage_key, index)`; each chunk has its own S3 key; + progress is indexed by `chunk_index` (not completion order). Download verifies `bao_root` + `content_hash` + ⇒ any scramble/truncation **fails loudly, never silently wrong**. +- **INV-5 (empty / tiny files).** Define explicitly: `num_chunks = 0`, no nonces, no AES-GCM chunks; single-chunk and partial-final-chunk paths covered by tests. +- **INV-6 (concurrent writer).** A **per-`storage_key` lock** (Web Locks API `navigator.locks`, with an + IndexedDB lease + TTL fallback for older browsers) prevents two tabs writing the same chunk keys with + *different* DEKs (→ mixed object). Residual mixed state is **caught by download verify** (not silent, not + a key leak). *Alternative considered (Codex):* `upload_id`-scoped temporary chunk keys + atomic promote at + finalize — more robust but changes the chunk-key scheme (touches reads + native); **deferred** unless cross-tab proves real. +- **INV-7 (manifest integrity).** Pass 2 revalidates the manifest's internal consistency before use and + unwraps only the expected `wrapped_dek`/`kek_version`. *Optional defense-in-depth:* MAC the manifest under + a KEK-derived key to detect IndexedDB tampering/rollback (can't recover the DEK regardless, since it's wrapped). +- **INV-8 (stable DEK across resume).** Reuse `wrapped_dek` from the manifest, re-unwrap via the user's KEK; never mint a new DEK on resume. + +--- + +## 5. Orphan / ghost-node lifecycle (requirement 4) + +Decision: **keep per-chunk pinning** (so chunks survive mid-upload regardless of the gateway's GC config — +no dependence on auto-GC being off) **+ active cleanup**, so every uploaded chunk ends in exactly one of: + +- **(a) referenced** by a finalized file (finalize writes index/forest; pinned chunks are reachable), or +- **(b) deleted on cancel** — the existing cancel compensating-delete (unpin + delete uploaded chunks) + drop manifest, or +- **(c) deleted by the startup sweep** — on app start, scan IndexedDB manifests; any `created_at` older than + TTL (e.g. 24–48 h) and not completed → unpin + delete its uploaded chunk keys (identifiable via `upload_id`), then drop the manifest. + +⇒ **no unattended ghost nodes**, and no reliance on server GC. + +--- + +## 6. Error-handling matrix (requirement 3) + +| Event | Behavior | +|---|---| +| Transient chunk PUT fail (`ERR_CONNECTION_CLOSED`/5xx) | per-chunk `retry_idempotent` (exists, `encryption.rs:7403`). **Never restart the whole upload** (the original bug). | +| Index/metadata PUT fail (large header) | `header_safe` strip + retry (0.6.13, exists). | +| Network drop mid-upload (in-session, Blob retained) | pause; resume pass 2 on reconnect. | +| Tab close / reload | manifest in IndexedDB; cross-reload resume = re-pick + INV-1 verify + pass 2. | +| Pass 1 fail | restart pass 1 (nothing committed or uploaded). | +| Finalize fail | retry finalize (idempotent; chunks already present + pinned). | +| Post-PUT CID verify mismatch | do **not** mark uploaded; re-PUT that chunk. | +| User cancel | compensating-delete + drop manifest (§5b). | +| Different file on resume (INV-1 mismatch) | refuse; discard manifest; fresh upload with new nonces. | + +--- + +## 7. Phasing — one `fula-api` PR; each phase compiles native + wasm and has a test gate + +- **P1 — ChunkSource + FRB pull-callback (de-risk the boundary first).** + Rust `ChunkSource` trait (`async fn chunk(index,offset,len) -> Bytes`); FRB binding so Dart passes a + `Blob.slice → arrayBuffer → bytes` callback. **Gate:** Rust pulls N slices via the Dart callback and + reassembles `== original`, on wasm. *If the async Rust→Dart callback proves brittle on wasm, fall back to + the PUSH model* (Dart feeds chunks into a stateful handle) — this phase picks the one that works. + Files: `fula-flutter/src/api/forest.rs`, new `chunk_source` module, `fula-client` trait. + +- **P2 — Plan-mode encoder + streaming fresh upload (the OOM fix; no resume yet).** + `fula-crypto`: add a `ChunkedEncoder` **plan mode** (generate nonce + feed BAO + `content_hash`, **skip AEAD**) + sharing the chunking/nonce/BAO code with the real path. `fula-client`: `put_object_flat_streaming_wasm` + (pass 1 + pass 2 + finalize) using `ChunkSource`, bounded concurrency, cancel, `header_safe`. + **Gates:** (a) plan-mode parity — `root_hash`/`content_hash`/`num_chunks` identical to the full encoder for + the same input; (b) round-trip byte-exact for a multi-chunk file; (c) parity vs the whole-buffer path. + **Milestone: large uploads work, memory-bounded.** + +- **P3 — Manifest + resume.** + `UploadManifest` commitment/progress split (extends `468543e`); `ManifestHandle` (pollable); resume entrypoint + enforcing **INV-1** (content-verify before nonce reuse). **Gates:** interrupt@K → resume → byte-exact; + nonce-reuse guard (changed file → refuse); empty / single-chunk / partial-final (INV-5). + +- **P4 — Orphan lifecycle + error hardening + concurrent-writer lock.** + Cancel compensating-delete; startup sweep (§5c); per-`storage_key` lock (INV-6); wire post-PUT CID verify + into the `uploaded[]` flag. **Gates:** cancel deletes chunks; abandoned sweep deletes orphans; concurrent + same-key writer rejected; torn-manifest rejected (INV-2). + +- **P5 — FRB surface + version + full suite + review → publish.** + Wire `put_flat_streaming` + resume + `ManifestHandle` into FRB; bump workspace version (**0.7.0** — significant + feature); `cargo check` native + wasm + flutter; full test suite; final adversarial review of the resume + + concurrent-writer code (codex). User publishes to pub.dev + the wasm release asset. + +- **P6 — FxFiles consumer PR (separate repo, post-publish).** + Re-pin `fula_client`; `tools/sync-wasm-pkg.ps1` for `web/pkg`; `WebUploadManager`: Blob `ChunkSource`, + manifest → IndexedDB, resume on failure/reload, Sync Queue resumable rows, device-class concurrency; version + bump; deploy. **Web e2e gate** (real browser, low-RAM device-class override): 200 MB+ upload (no OOM), + interrupt → resume → exact, cancel cleans up. + +--- + +## 8. Test plan (requirements 1, 2, 3) + +- **fula-crypto unit:** plan-mode parity (root/content_hash/nonce-count); empty/single/partial-final chunking; AAD binding. +- **fula-client unit:** round-trip byte-exact; interrupt→resume byte-exact; **nonce-reuse guard** (changed + content → refuse, INV-1); **mixup-detector** (deliberately swap a chunk → download fails, INV-4); + concurrent-writer rejected (INV-6); cancel → chunks deleted; abandoned sweep → orphans deleted; + **manifest atomicity** (torn/short manifest rejected, INV-2/3). +- **fula-client e2e** (real server, gated like existing `*_e2e.rs`): large multi-chunk upload → download exact; + mid-flight interrupt → resume → exact; cross-reload (simulated: drop in-memory state, reload manifest, re-feed source). +- **FxFiles web e2e (P6):** real browser on a low-RAM profile; 200 MB+ no-OOM; interrupt→resume; cancel cleanup. + +--- + +## 9. Memory budget + +- Pass 1: 1 chunk buffer + BAO/blake3 hashers (negligible). +- Pass 2: `concurrency × (plaintext slice + ciphertext)` ≈ `2 × C × chunk_size`. C=3, chunk=4 MB ⇒ **~24 MB, + independent of file size** (vs current ~2× file size). 2 GB on a 1 GB phone becomes feasible. + +--- + +## 10. Open verification items (resolve at P1/P2 start) + +1. **FRB async Rust→Dart callback on wasm** — confirm it works (P1 *is* this de-risk); fallback = push model. +2. **Plan-mode parity** — prove `root_hash` identical before relying on pass-1/pass-2 determinism. +3. **`chunk_size` is fixed & manifest-bound across SDK versions** — INV-2/INV-3 determinism depends on it + (add `algorithm_version` to the manifest and refuse resume across incompatible versions). + +--- + +## 11. Non-goals / deferred + +- File System Access API persistent handles (cross-reload resume without re-pick) — iOS Safari unsupported. +- `upload_id`-staging chunk keys instead of a per-key lock (INV-6 alt) — bigger change (key scheme + reads); deferred. +- Native path changes — native already has file-based resumable; this work is wasm/web. The fula-crypto + plan-mode is shared but purely additive. From c3b74a0bccbdea862770ab6da5827f6d55e16434 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 18 Jun 2026 12:43:56 -0400 Subject: [PATCH 2/6] P2 (wip): streaming_put_chunk -- pass-2 per-chunk encrypt-from-stored-nonce + PUT First verifiable piece of the streaming upload core (see docs/web-streaming-resumable-upload-plan.md + task #44 notes). Adds EncryptedClient::streaming_put_chunk: encrypts ONE caller-supplied (pushed) plaintext chunk with the nonce committed in pass 1, then PUTs it -- mirroring the native resume re-encrypt (~8520) and the chunk-PUT closure in put_object_chunked_internal (transient retry_idempotent, pinning, post-PUT CID self-verify). Deterministic AES-GCM => identical ciphertext => idempotent content-addressed PUT, safe to retry or repeat on resume. AAD binds ciphertext to (storage_key, chunk_index). Compiles native (dead-code warning expected -- wired up by the streaming session + FRB handle in following commits). No change to existing paths. Co-Authored-By: Claude Opus 4.8 --- crates/fula-client/src/encryption.rs | 101 +++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/crates/fula-client/src/encryption.rs b/crates/fula-client/src/encryption.rs index 475d3c2..d9f9beb 100644 --- a/crates/fula-client/src/encryption.rs +++ b/crates/fula-client/src/encryption.rs @@ -7627,6 +7627,107 @@ impl EncryptedClient { Ok((result, index_body, index_cid)) } + /// Streaming upload, pass 2 — encrypt ONE chunk from its PRE-COMMITTED + /// nonce and PUT it. + /// + /// The plaintext is supplied by the caller (pushed from Dart, sliced from a + /// browser Blob) instead of sliced from a whole-file buffer, so peak memory + /// stays bounded regardless of file size. The chunk is encrypted with the + /// nonce committed during pass 1 (NEVER regenerated); because AES-GCM is + /// deterministic for a fixed (key, nonce, plaintext, AAD), the ciphertext — + /// hence its content-addressed key and CID — is identical to what pass 1 + /// would have produced, so the PUT is idempotent and safe to retry or to + /// repeat on resume. The AAD binds the ciphertext to (storage_key, + /// chunk_index): the anti-mixup guarantee. This mirrors the native resume + /// re-encrypt (`resume_upload_with_cancel_and_progress`) and the chunk-PUT + /// closure in `put_object_chunked_internal` (transient retry, pinning, + /// post-PUT CID self-verify), but for a single pushed chunk. + async fn streaming_put_chunk( + &self, + bucket: &str, + storage_key: &str, + chunked_meta: &ChunkedFileMetadata, + chunk_index: u32, + plaintext: &[u8], + dek: &fula_crypto::keys::DekKey, + walkable_v8: bool, + ) -> Result<(String, Option)> { + // Re-encrypt with the committed nonce. Identical to the native resume + // path's per-chunk encrypt (see ~8520): same AAD prefix, same nonce + // source, same AEAD. Reusing a nonce here is safe because it only ever + // encrypts this exact (dek, plaintext, AAD) triple. + let aad_prefix = format!("fula:v4:chunk:{}", storage_key); + let nonce = chunked_meta + .get_chunk_nonce(chunk_index) + .map_err(ClientError::Encryption)?; + let aead = Aead::new_default(dek); + let aad = format!("{}:{}", aad_prefix, chunk_index).into_bytes(); + let ciphertext = Bytes::from( + aead.encrypt_with_aad(&nonce, plaintext, &aad) + .map_err(ClientError::Encryption)?, + ); + + let chunk_key = ChunkedFileMetadata::chunk_key(storage_key, chunk_index); + let chunk_metadata = ObjectMetadata::new() + .with_content_type("application/octet-stream") + .with_metadata("x-fula-chunk", "true") + .with_metadata("x-fula-chunk-index", &chunk_index.to_string()); + + // Pre-compute the expected CID before `ciphertext` moves into the PUT. + let expected_chunk_cid = if walkable_v8 { + Some(crate::walkable_v8::local_blake3_raw_cid(&ciphertext)) + } else { + None + }; + + // Per-chunk PUT with transient retry (FxFiles #50). The chunk key is + // content-addressed, so retrying — or re-PUTting the same chunk on a + // resume — is idempotent. + let pinning = self.pinning.clone(); + let inner = self.inner.clone(); + let put_result = crate::multipart::retry_idempotent(4, || { + let client = inner.clone(); + let bucket = bucket.to_string(); + let chunk_key = chunk_key.clone(); + let pinning = pinning.clone(); + let chunk_metadata = chunk_metadata.clone(); + let body = ciphertext.clone(); + async move { + if let Some(ref pin) = pinning { + client + .put_object_with_metadata_and_pinning( + &bucket, + &chunk_key, + body, + Some(chunk_metadata), + &pin.endpoint, + &pin.token, + ) + .await + } else { + client + .put_object_with_metadata(&bucket, &chunk_key, body, Some(chunk_metadata)) + .await + } + } + }) + .await?; + + // Post-PUT CID self-verify (walkable-v8): confirms the stored bytes + // match what we encrypted. None when the flag is off or verify fails. + let chunk_cid = match (walkable_v8, expected_chunk_cid) { + (true, Some(expected)) => crate::walkable_v8::verify_etag_against_expected_cid( + &put_result.etag, + expected, + bucket, + &chunk_key, + ), + _ => None, + }; + + Ok((chunk_key, chunk_cid)) + } + /// Upload an object with resumable chunked encoding. /// /// Like `put_object_encrypted_with_type`, but writes a manifest file at From 1fa2d6d0c6f398460df7eeadcfcc307b459fe6f9 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 18 Jun 2026 12:58:47 -0400 Subject: [PATCH 3/6] P2 (wip): fula-client streaming upload core (begin / finalize_plan / put_chunk / finish) Push-model streaming-upload methods on EncryptedClient (pub for the FRB handle + integration tests), completing the fula-client side of the OOM fix: - streaming_begin: prelude (ensure_forest_loaded, generate DEK, derive flat storage_key incl. the v7 shard-salt path, HPKE-wrap DEK, KEK version). Read-only vs live state; mirrors the head of put_object_flat_deferred_locked. - streaming_finalize_plan: end of pass 1 -- finalize the plan-only encoder, build PrivateMetadata (size + content_hash from the pushed plaintext) + encrypted metadata; returns ChunkedFileMetadata (nonces + BAO root) + the metas. - streaming_put_chunk: pass-2 per-chunk encrypt-from-stored-nonce + PUT. - streaming_finish: index PUT (header-safe) + forest register + flush, under the per-bucket write lock. register_streaming_upload_in_forest mirrors the wasm-proven upsert in put_object_flat_deferred_locked (v7/monolithic, WAL auto-skipped on wasm, orphan cleanup of the prior upload). Peak memory is bounded by what the caller holds in flight, not file size (pass 1 holds ~1 chunk; pass 2 only the pushed chunk). Compiles native + wasm (via fula-flutter). Not yet FRB-wired or end-to-end tested -- the stateful-mock round-trip test (P2 gate) + the FRB handle follow. Co-Authored-By: Claude Opus 4.8 --- crates/fula-client/src/encryption.rs | 340 ++++++++++++++++++++++++++- 1 file changed, 339 insertions(+), 1 deletion(-) diff --git a/crates/fula-client/src/encryption.rs b/crates/fula-client/src/encryption.rs index d9f9beb..f52569a 100644 --- a/crates/fula-client/src/encryption.rs +++ b/crates/fula-client/src/encryption.rs @@ -7642,7 +7642,7 @@ impl EncryptedClient { /// re-encrypt (`resume_upload_with_cancel_and_progress`) and the chunk-PUT /// closure in `put_object_chunked_internal` (transient retry, pinning, /// post-PUT CID self-verify), but for a single pushed chunk. - async fn streaming_put_chunk( + pub async fn streaming_put_chunk( &self, bucket: &str, storage_key: &str, @@ -7728,6 +7728,344 @@ impl EncryptedClient { Ok((chunk_key, chunk_cid)) } + /// Streaming upload, prelude — the one-time setup before any chunks are + /// pushed: ensure the forest is loaded, generate the per-file DEK, derive + /// the flat storage key (monolithic inline; v7 reads `shard_salt` behind + /// the forest RwLock without holding the DashMap guard), HPKE-wrap the DEK, + /// and read the KEK version. Mirrors the head of + /// `put_object_flat_deferred_locked` (~6976-7043). The remaining metadata + /// (`private_meta` / `encrypted_meta`) is built at finalize, once pass 1 has + /// computed the total size + content hash from the pushed plaintext. + /// + /// Read-only w.r.t. live write state (the forest load is idempotent), and + /// deliberately a separate path so the non-streaming uploader is untouched. + /// Returns `(storage_key, dek, wrapped_dek, kek_version)`. + pub async fn streaming_begin( + &self, + bucket: &str, + key: &str, + ) -> Result<(String, fula_crypto::keys::DekKey, EncryptedData, u32)> { + self.ensure_forest_loaded(bucket).await?; + let dek = self.encryption.key_manager.generate_dek(); + + enum KeyPath { + Ready(String), + V7(Arc>), + } + let path = { + let cache_entry = self.forest_cache.get(bucket).ok_or_else(|| { + ClientError::Encryption(fula_crypto::CryptoError::Encryption( + "forest not loaded".to_string(), + )) + })?; + match cache_entry.value() { + ForestCacheEntry::Monolithic { forest, .. } => { + KeyPath::Ready(forest.generate_key(key, &dek)) + } + ForestCacheEntry::ShardedHamt { forest, .. } => KeyPath::V7(forest.clone()), + } + }; + let storage_key = match path { + KeyPath::Ready(sk) => sk, + KeyPath::V7(forest_arc) => { + let salt = { + let guard = forest_arc.read().await; + guard.manifest().shard_salt().to_vec() + }; + generate_flat_key(key, &dek, &salt) + } + }; + + let encryptor = Encryptor::new(self.encryption.public_key()); + let wrapped_dek = encryptor + .encrypt_dek(&dek) + .map_err(ClientError::Encryption)?; + let kek_version = self.encryption.key_manager.version(); + + Ok((storage_key, dek, wrapped_dek, kek_version)) + } + + /// Streaming upload, end of pass 1 — finalize the plan-mode encoder and + /// build the per-file metadata now that the total size + content hash are + /// known from the pushed plaintext. Returns the committed + /// `ChunkedFileMetadata` (nonces + BAO root), the `PrivateMetadata`, and its + /// AEAD-encrypted form. The caller keeps these for pass 2 + `streaming_finish` + /// (and, in P3, derives the persisted resume manifest from them). `encoder` + /// MUST be the plan-only encoder built with AAD prefix + /// `fula:v4:chunk:{storage_key}` and the same `dek` from `streaming_begin`. + pub fn streaming_finalize_plan( + &self, + encoder: ChunkedEncoder, + dek: &fula_crypto::keys::DekKey, + storage_key: &str, + key: &str, + content_type: Option<&str>, + ) -> Result<(ChunkedFileMetadata, PrivateMetadata, EncryptedPrivateMetadata)> { + // Content hash + size must be read BEFORE finalize() consumes the encoder. + let content_hash = encoder.content_hash_hex(); + let total_size = encoder.bytes_processed(); + let (_final_chunk, chunked_metadata, _outboard) = + encoder.finalize().map_err(ClientError::Encryption)?; + + let private_meta = PrivateMetadata::new(key, total_size) + .with_content_type(content_type.unwrap_or("application/octet-stream")) + .with_content_hash(content_hash); + let meta_aad = EncryptedPrivateMetadata::aad_v2(storage_key); + let encrypted_meta = EncryptedPrivateMetadata::encrypt_v2(&private_meta, dek, &meta_aad) + .map_err(ClientError::Encryption)?; + + Ok((chunked_metadata, private_meta, encrypted_meta)) + } + + /// Streaming upload, finalize ("commit") — write the index/metadata object, + /// register the file in the encrypted forest, and flush to master. After + /// this the file is listable; before it, the uploaded chunks are + /// unreferenced (an abandoned/cancelled upload leaves them collectable — see + /// the plan's orphan lifecycle). Mirrors the index-write tail of + /// `put_object_chunked_internal` (~7515-7627) + the wasm-proven forest + /// registration of `put_object_flat_deferred_locked` (~7154-7286). The + /// per-bucket write lock + post-register `flush_forest` give durability in + /// place of the native-only WAL. + #[allow(clippy::too_many_arguments)] + pub async fn streaming_finish( + &self, + bucket: &str, + key: &str, + storage_key: &str, + wrapped_dek: &EncryptedData, + encrypted_meta: &EncryptedPrivateMetadata, + kek_version: u32, + private_meta: &PrivateMetadata, + mut chunked_metadata: ChunkedFileMetadata, + chunk_cids: Vec>, + ) -> Result { + let walkable_v8 = self.inner.config().walkable_v8_writer_enabled; + + // W.9.4-A2: stamp per-chunk CID hints. Only when at least one is Some, + // so the wire stays byte-identical to v0.5 output when the flag is off. + if walkable_v8 && chunk_cids.iter().any(|c| c.is_some()) { + chunked_metadata.populate_chunk_cids(chunk_cids); + } + + // Index body = the FULL metadata JSON; the header copy is shrunk via + // `header_safe_enc_metadata` so a large chunked file's metadata can't + // blow the gateway header limit (0.6.13). + let enc_metadata = serde_json::json!({ + "version": 4, + "algorithm": "AES-256-GCM", + "wrapped_key": serde_json::to_value(wrapped_dek).unwrap(), + "kek_version": kek_version, + "metadata_privacy": true, + "obfuscation_mode": "flat", + "private_metadata": encrypted_meta.to_json().map_err(ClientError::Encryption)?, + "chunked": serde_json::to_value(&chunked_metadata).unwrap(), + }); + let index_body = enc_metadata.to_string(); + let metadata = ObjectMetadata::new() + .with_content_type("application/json") + .with_metadata("x-fula-encrypted", "true") + .with_metadata("x-fula-chunked", "true") + .with_metadata("x-fula-encryption", &header_safe_enc_metadata(&index_body)); + + let expected_index_cid = if walkable_v8 { + Some(crate::walkable_v8::local_blake3_raw_cid(index_body.as_bytes())) + } else { + None + }; + + // Index PUT with transient retry (idempotent, content-addressed). + let pinning = self.pinning.clone(); + let inner = self.inner.clone(); + let index_result = crate::multipart::retry_idempotent(4, || { + let client = inner.clone(); + let pinning = pinning.clone(); + let body = Bytes::from(index_body.clone()); + let metadata = metadata.clone(); + let bucket = bucket.to_string(); + let storage_key = storage_key.to_string(); + async move { + if let Some(ref pin) = pinning { + client + .put_object_with_metadata_and_pinning( + &bucket, + &storage_key, + body, + Some(metadata), + &pin.endpoint, + &pin.token, + ) + .await + } else { + client + .put_object_with_metadata(&bucket, &storage_key, body, Some(metadata)) + .await + } + } + }) + .await?; + + let index_cid = match (walkable_v8, expected_index_cid) { + (true, Some(expected)) => crate::walkable_v8::verify_etag_against_expected_cid( + &index_result.etag, + expected, + bucket, + storage_key, + ), + _ => None, + }; + + // Commit to the forest under the per-bucket write lock (mirrors the + // non-streaming path), then flush so a fresh client load sees the file. + let lock = self.bucket_write_lock(bucket); + let _guard = lock.lock().await; + self.ensure_forest_loaded(bucket).await?; + self.register_streaming_upload_in_forest( + bucket, + key, + storage_key, + index_cid, + &index_body, + private_meta, + ) + .await?; + self.flush_forest_locked(bucket).await?; + + Ok(index_result) + } + + /// Forest registration for a streaming chunked upload. Mirrors the + /// wasm-proven upsert in `put_object_flat_deferred_locked` (~7154-7286): + /// build the encrypted `ForestFileEntry`, stamp the index CID + metadata, + /// upsert into the v7 (sharded-HAMT) or monolithic forest, append to the + /// WAL on native (auto-skipped on wasm), and best-effort clean up the + /// previous upload's orphaned objects. Caller MUST hold the per-bucket + /// write lock and have `ensure_forest_loaded`. + async fn register_streaming_upload_in_forest( + &self, + bucket: &str, + key: &str, + storage_key: &str, + index_cid: Option, + index_metadata_json: &str, + private_meta: &PrivateMetadata, + ) -> Result<()> { + let mut forest_entry = + ForestFileEntry::from_metadata(private_meta, storage_key.to_string()); + forest_entry.mark_encrypted(); + forest_entry.storage_cid = index_cid; + forest_entry + .user_metadata + .insert("x-fula-encrypted".to_string(), "true".to_string()); + forest_entry + .user_metadata + .insert("x-fula-encryption".to_string(), index_metadata_json.to_string()); + forest_entry + .user_metadata + .insert("x-fula-chunked".to_string(), "true".to_string()); + + let now = chrono::Utc::now().timestamp(); + #[cfg(not(target_arch = "wasm32"))] + let wal_entry_clone = forest_entry.clone(); + + let is_v7 = self.is_forest_sharded_hamt(bucket); + let old_storage_key: Option = if is_v7 { + let forest_arc = { + let cache_entry = self.forest_cache.get(bucket).ok_or_else(|| { + ClientError::Encryption(fula_crypto::CryptoError::Decryption(format!( + "forest cache missing for bucket {} during streaming-upload registration", + bucket, + ))) + })?; + match cache_entry.value() { + ForestCacheEntry::ShardedHamt { forest, .. } => forest.clone(), + _ => unreachable!("is_forest_sharded_hamt guard above"), + } + }; + let backend: Arc = + Arc::new(S3BlobBackend::new(self.inner.clone(), bucket.to_string())); + let old = { + let mut guard = forest_arc.write().await; + let prior = guard + .get_file(key, &backend) + .await + .map_err(ClientError::Encryption)?; + debug_assert!( + forest_entry.encrypted, + "v7 upsert invariant violated: streaming entry for {} has encrypted=false", + forest_entry.path + ); + guard + .upsert_file(forest_entry, &backend) + .await + .map_err(ClientError::Encryption)?; + prior.map(|e| e.storage_key) + }; + if let Some(mut cache_entry) = self.forest_cache.get_mut(bucket) { + if let ForestCacheEntry::ShardedHamt { loaded_at, .. } = cache_entry.value_mut() { + *loaded_at = now; + } + } + old + } else { + let (mut forest, prior_etag, prior_seq) = { + let cache_entry = self.forest_cache.get(bucket).ok_or_else(|| { + ClientError::Encryption(fula_crypto::CryptoError::Decryption(format!( + "forest cache missing for bucket {} during streaming-upload registration", + bucket, + ))) + })?; + match cache_entry.value() { + ForestCacheEntry::Monolithic { + forest, + index_etag, + last_sequence, + .. + } => (forest.clone(), index_etag.clone(), *last_sequence), + ForestCacheEntry::ShardedHamt { .. } => unreachable!("is_v7 handled above"), + } + }; + let old = forest.get_storage_key(key).map(|s| s.to_string()); + forest.upsert_file(forest_entry); + self.forest_cache.insert( + bucket.to_string(), + ForestCacheEntry::Monolithic { + forest, + loaded_at: now, + dirty: true, + index_etag: prior_etag, + last_sequence: prior_seq, + }, + ); + old + }; + + #[cfg(not(target_arch = "wasm32"))] + { + let wal_mac = wal::derive_mac_key(&self.encryption.key_manager, bucket); + if let Err(e) = wal::append( + bucket, + &wal_mac, + WalEntry::Insert { + key: key.to_string(), + entry: wal_entry_clone, + }, + ) { + tracing::warn!(%bucket, error = %e, "WAL append failed (streaming-upload register); continuing"); + } + } + + // Best-effort cleanup of the orphaned previous upload (per-upload random + // DEKs make a shared key astronomically unlikely; the guard is cheap). + if let Some(old_key) = old_storage_key { + if old_key != storage_key { + let num_chunks = self.get_chunked_num_chunks(bucket, &old_key).await; + self.cleanup_orphaned_storage(bucket, &old_key, num_chunks).await; + } + } + + Ok(()) + } + /// Upload an object with resumable chunked encoding. /// /// Like `put_object_encrypted_with_type`, but writes a manifest file at From ba21a6ecbd9d1f39c8370b486e934e920508cad0 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 18 Jun 2026 13:03:56 -0400 Subject: [PATCH 4/6] P2: streaming upload round-trip test (the P2 gate) -- PASSES byte-exact Hermetic test driving the streaming methods exactly as the FRB handle will (streaming_begin -> plan-only encoder -> streaming_finalize_plan -> streaming_put_chunk loop -> streaming_finish) against a STATEFUL wiremock that stores PUT bodies and serves them on GET, then downloads via get_object_flat and asserts BYTE-EXACT recovery. No network / no credentials -- runs in CI. Proves the streaming path produces a normally downloadable, decryptable object: pass-1 commits nonces without ciphertext, pass-2 re-encrypts each chunk from its stored nonce, the index + forest register + flush land correctly, and the standard download reconstructs the exact bytes (incl. the 0.6.13 body-fallback recovering chunk nonces). walkable-v8 post-PUT CID self-verify also exercised. Co-Authored-By: Claude Opus 4.8 --- .../tests/streaming_upload_roundtrip.rs | 188 ++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 crates/fula-client/tests/streaming_upload_roundtrip.rs diff --git a/crates/fula-client/tests/streaming_upload_roundtrip.rs b/crates/fula-client/tests/streaming_upload_roundtrip.rs new file mode 100644 index 0000000..430a6a3 --- /dev/null +++ b/crates/fula-client/tests/streaming_upload_roundtrip.rs @@ -0,0 +1,188 @@ +//! Hermetic round-trip test for the PUSH-model streaming upload (P2 of +//! docs/web-streaming-resumable-upload-plan.md). +//! +//! Drives the streaming methods exactly as the FRB handle will — +//! streaming_begin +//! -> plan-only ChunkedEncoder (pass 1, fed arbitrary streaming slices) +//! -> streaming_finalize_plan +//! -> streaming_put_chunk loop (pass 2, encrypt-from-stored-nonce) +//! -> streaming_finish (index + forest register + flush) +//! — against a STATEFUL wiremock that stores PUT bodies and serves them back on +//! GET, then downloads via `get_object_flat` and asserts BYTE-EXACT recovery. +//! +//! This is the P2 gate: it proves the streaming path produces a normally +//! downloadable, decryptable object. No network / no credentials. + +#![cfg(not(target_arch = "wasm32"))] + +use cid::multihash::Multihash; +use cid::Cid; +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::chunked::ChunkedEncoder; +use fula_crypto::keys::SecretKey; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use wiremock::matchers::method; +use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate}; + +/// BLAKE3-raw CID of `data`, returned as the ETag so the SDK's walkable-v8 +/// post-PUT self-verify accepts the response. +fn blake3_raw_cid(data: &[u8]) -> Cid { + let h = blake3::hash(data); + let mh = Multihash::<64>::wrap(0x1e, h.as_bytes()).expect("blake3 multihash wrap"); + Cid::new_v1(0x55, mh) +} + +type Store = Arc>>>; + +/// PUT: store the body keyed by URL path; ETag = BLAKE3-raw CID(body). +struct StorePut { + store: Store, +} +impl Respond for StorePut { + fn respond(&self, req: &Request) -> ResponseTemplate { + let path = req.url.path().to_string(); + let cid = blake3_raw_cid(&req.body); + self.store.lock().unwrap().insert(path, req.body.clone()); + ResponseTemplate::new(200).insert_header("ETag", cid.to_string()) + } +} + +/// GET: serve the stored body (200 + matching ETag) or 404. +struct ServeGet { + store: Store, +} +impl Respond for ServeGet { + fn respond(&self, req: &Request) -> ResponseTemplate { + match self.store.lock().unwrap().get(req.url.path()) { + Some(body) => ResponseTemplate::new(200) + .insert_header("ETag", blake3_raw_cid(body).to_string()) + .set_body_bytes(body.clone()), + None => ResponseTemplate::new(404), + } + } +} + +/// HEAD: 200 if stored, else 404 (cold-bootstrap probes a fresh bucket). +struct HeadProbe { + store: Store, +} +impl Respond for HeadProbe { + fn respond(&self, req: &Request) -> ResponseTemplate { + if self.store.lock().unwrap().contains_key(req.url.path()) { + ResponseTemplate::new(200) + } else { + ResponseTemplate::new(404) + } + } +} + +#[tokio::test] +async fn streaming_upload_roundtrip_byte_exact() { + let server = MockServer::start().await; + let store: Store = Arc::new(Mutex::new(HashMap::new())); + Mock::given(method("PUT")) + .respond_with(StorePut { store: store.clone() }) + .mount(&server) + .await; + Mock::given(method("GET")) + .respond_with(ServeGet { store: store.clone() }) + .mount(&server) + .await; + Mock::given(method("HEAD")) + .respond_with(HeadProbe { store: store.clone() }) + .mount(&server) + .await; + Mock::given(method("DELETE")) + .respond_with(ResponseTemplate::new(204)) + .mount(&server) + .await; + + let mut config = Config::new(&server.uri()).with_token("test-jwt"); + config.walkable_v8_writer_enabled = true; + let secret = SecretKey::generate(); + let enc_config = EncryptionConfig::from_secret_key(secret); + let client = EncryptedClient::new(config, enc_config).expect("EncryptedClient::new"); + + let bucket = "videos-v8"; + let key = "/promo.mp4"; + // Multi-chunk with a sub-chunk tail; non-trivial byte pattern. + let mut data = vec![0u8; 700 * 1024 + 123]; + for (i, b) in data.iter_mut().enumerate() { + *b = (i % 251) as u8; + } + + // ---- streaming upload (push model; mirrors the FRB handle) ---- + let (storage_key, dek, wrapped_dek, kek_version) = + client.streaming_begin(bucket, key).await.expect("streaming_begin"); + + // pass 1: plan-only encoder, fed arbitrary streaming slices. + let aad_prefix = format!("fula:v4:chunk:{}", storage_key); + let mut encoder = ChunkedEncoder::with_aad_and_chunk_size( + dek.clone(), + aad_prefix.into_bytes(), + 64 * 1024, + ) + .into_plan_only(); + for slice in data.chunks(100 * 1024) { + encoder.update(slice).expect("plan update"); + } + let (chunked_metadata, private_meta, encrypted_meta) = client + .streaming_finalize_plan(encoder, &dek, &storage_key, key, Some("video/mp4")) + .expect("streaming_finalize_plan"); + + let cs = chunked_metadata.chunk_size as usize; + let num_chunks = chunked_metadata.num_chunks as usize; + assert!(num_chunks > 1, "test needs a multi-chunk file (got {num_chunks})"); + + // pass 2: upload each chunk from its committed nonce. + let mut chunk_cids = vec![None; num_chunks]; + for i in 0..num_chunks { + let start = i * cs; + let end = ((i + 1) * cs).min(data.len()); + let (_chunk_key, cid) = client + .streaming_put_chunk( + bucket, + &storage_key, + &chunked_metadata, + i as u32, + &data[start..end], + &dek, + true, + ) + .await + .expect("streaming_put_chunk"); + chunk_cids[i] = cid; + } + + client + .streaming_finish( + bucket, + key, + &storage_key, + &wrapped_dek, + &encrypted_meta, + kek_version, + &private_meta, + chunked_metadata, + chunk_cids, + ) + .await + .expect("streaming_finish"); + + // ---- download + verify byte-exact ---- + let downloaded = client + .get_object_flat(bucket, key) + .await + .expect("get_object_flat"); + assert_eq!( + downloaded.len(), + data.len(), + "round-trip length mismatch" + ); + assert_eq!( + downloaded.as_ref(), + data.as_slice(), + "streaming upload -> download must be byte-exact" + ); +} From 517c292521233912a6017e2e74ed3e727d809ca7 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 18 Jun 2026 13:12:53 -0400 Subject: [PATCH 5/6] P2 (done): FRB push-model streaming-upload handle + finalize the core Adds the FRB handle that exposes the streaming upload to Dart (fula-flutter forest.rs), completing P2 of the web streaming + resumable upload plan: - StreamingUploadHandle (opaque) + StreamingPlanInfo (num_chunks, chunk_size). - streaming_upload_begin / _plan_chunk / _finalize_plan / _upload_chunk / _finish. Dart slices the file from a Blob and drives the two passes; the handle never holds the whole file. The std::sync::Mutex is held only for brief sync critical sections (never across an .await), so concurrent _upload_chunk calls for distinct indices run their PUTs in parallel (Dart bounds concurrency). Pure Dart->Rust calls + handle state -- no Rust->Dart callback. - streaming_put_chunk now reads walkable_v8 from config (one fewer param). - cid added as a direct fula-flutter dep (handle stores per-chunk CID hints). Verified: fula-flutter compiles native (the Send+Sync gate for FRB opaques) AND wasm32; the fula-client round-trip test still passes byte-exact. The handle drives the exact sequence that test proves end-to-end. Dart bindings are FRB-codegen'd at publish; the live wasm path is validated at P6 (browser e2e). Co-Authored-By: Claude Opus 4.8 --- Cargo.lock | 1 + crates/fula-client/src/encryption.rs | 2 +- .../tests/streaming_upload_roundtrip.rs | 1 - crates/fula-flutter/Cargo.toml | 2 + crates/fula-flutter/src/api/forest.rs | 287 ++++++++++++++++++ 5 files changed, 291 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eca5c3d..4bdb3c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1986,6 +1986,7 @@ dependencies = [ "async-lock", "bytes", "chrono", + "cid 0.11.1", "flutter_rust_bridge", "fula-client", "fula-crypto", diff --git a/crates/fula-client/src/encryption.rs b/crates/fula-client/src/encryption.rs index f52569a..a8b7ca6 100644 --- a/crates/fula-client/src/encryption.rs +++ b/crates/fula-client/src/encryption.rs @@ -7650,8 +7650,8 @@ impl EncryptedClient { chunk_index: u32, plaintext: &[u8], dek: &fula_crypto::keys::DekKey, - walkable_v8: bool, ) -> Result<(String, Option)> { + let walkable_v8 = self.inner.config().walkable_v8_writer_enabled; // Re-encrypt with the committed nonce. Identical to the native resume // path's per-chunk encrypt (see ~8520): same AAD prefix, same nonce // source, same AEAD. Reusing a nonce here is safe because it only ever diff --git a/crates/fula-client/tests/streaming_upload_roundtrip.rs b/crates/fula-client/tests/streaming_upload_roundtrip.rs index 430a6a3..90403f9 100644 --- a/crates/fula-client/tests/streaming_upload_roundtrip.rs +++ b/crates/fula-client/tests/streaming_upload_roundtrip.rs @@ -148,7 +148,6 @@ async fn streaming_upload_roundtrip_byte_exact() { i as u32, &data[start..end], &dek, - true, ) .await .expect("streaming_put_chunk"); diff --git a/crates/fula-flutter/Cargo.toml b/crates/fula-flutter/Cargo.toml index d2cb0ff..529756d 100644 --- a/crates/fula-flutter/Cargo.toml +++ b/crates/fula-flutter/Cargo.toml @@ -38,6 +38,8 @@ bytes = { workspace = true } # Thread safety parking_lot = { workspace = true } +# Streaming-upload handle stores per-chunk CID hints (Vec>). +cid = { workspace = true } # Platform-specific dependencies [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/crates/fula-flutter/src/api/forest.rs b/crates/fula-flutter/src/api/forest.rs index c6537e5..a465e69 100644 --- a/crates/fula-flutter/src/api/forest.rs +++ b/crates/fula-flutter/src/api/forest.rs @@ -662,6 +662,293 @@ pub async fn put_flat_with_progress_cancellable( Ok(result.into()) } +// ============================================================================ +// Streaming upload (web OOM fix) — PUSH model. +// docs/web-streaming-resumable-upload-plan.md (P2). Dart slices the file from a +// Blob and drives the two passes; this never holds the whole file. Pass 1 feeds +// a plan-only encoder (commit nonces + integrity root, no ciphertext); pass 2 +// re-encrypts each chunk from its committed nonce and PUTs it; finish writes the +// index + registers the forest. Uses the proven `put_flat`/handle rails — no +// Rust→Dart callback (FRB on wasm surfaces push via handles, not callbacks). +// ============================================================================ + +/// Returned by `streaming_upload_finalize_plan` so Dart knows how to slice +/// pass 2 (chunk `i` = file bytes `[i*chunk_size .. min((i+1)*chunk_size, len)]`). +pub struct StreamingPlanInfo { + pub num_chunks: u32, + pub chunk_size: u32, +} + +enum StreamingPhase { + Planning { + encoder: fula_crypto::chunked::ChunkedEncoder, + content_type: Option, + }, + Uploading { + chunked_metadata: std::sync::Arc, + private_meta: fula_crypto::private_metadata::PrivateMetadata, + encrypted_meta: fula_crypto::private_metadata::EncryptedPrivateMetadata, + chunk_cids: Vec>, + }, + Done, +} + +struct StreamingUploadInner { + client: EncryptedClientHandle, + bucket: String, + key: String, + storage_key: String, + dek: fula_crypto::keys::DekKey, + wrapped_dek: fula_crypto::hpke::EncryptedData, + kek_version: u32, + phase: StreamingPhase, +} + +/// Opaque handle to an in-progress streaming upload. The `std::sync::Mutex` is +/// only ever held for brief synchronous critical sections (never across an +/// `.await`), so concurrent `streaming_upload_chunk` calls for distinct indices +/// run their PUTs in parallel. +pub struct StreamingUploadHandle { + inner: std::sync::Arc>, +} + +/// Begin a streaming upload: run the prelude (DEK, storage_key, wrapped DEK, +/// KEK version) and create the plan-only encoder. Drive the returned handle via +/// `streaming_upload_plan_chunk` (×N, in order) → `streaming_upload_finalize_plan` +/// → `streaming_upload_chunk` (×N) → `streaming_upload_finish`. +pub async fn streaming_upload_begin( + client: &EncryptedClientHandle, + bucket: String, + key: String, + content_type: Option, +) -> anyhow::Result { + let (storage_key, dek, wrapped_dek, kek_version) = { + let guard = client.inner.read().await; + guard.streaming_begin(&bucket, &key).await? + }; + let aad_prefix = format!("fula:v4:chunk:{}", storage_key); + let encoder = + fula_crypto::chunked::ChunkedEncoder::with_aad(dek.clone(), aad_prefix.into_bytes()) + .into_plan_only(); + Ok(StreamingUploadHandle { + inner: std::sync::Arc::new(std::sync::Mutex::new(StreamingUploadInner { + client: client.clone(), + bucket, + key, + storage_key, + dek, + wrapped_dek, + kek_version, + phase: StreamingPhase::Planning { + encoder, + content_type, + }, + })), + }) +} + +/// Pass 1 — feed one plaintext slice (pushed from Dart) into the plan-only +/// encoder. MUST be called sequentially in file order (BAO is order-sensitive). +pub async fn streaming_upload_plan_chunk( + handle: &StreamingUploadHandle, + bytes: Vec, +) -> anyhow::Result<()> { + let mut g = handle.inner.lock().unwrap(); + match &mut g.phase { + StreamingPhase::Planning { encoder, .. } => { + encoder + .update(&bytes) + .map_err(|e| anyhow::anyhow!("streaming plan chunk failed: {e}"))?; + Ok(()) + } + _ => anyhow::bail!("streaming_upload_plan_chunk: not in planning phase"), + } +} + +/// End of pass 1 — finalize the plan, commit per-file metadata, and return the +/// chunk count + chunk size so Dart can slice pass 2. +pub async fn streaming_upload_finalize_plan( + handle: &StreamingUploadHandle, +) -> anyhow::Result { + // Region 1: take the encoder out (sync, no await held). + let (encoder, content_type, dek, storage_key, key, client) = { + let mut g = handle.inner.lock().unwrap(); + let (encoder, content_type) = + match std::mem::replace(&mut g.phase, StreamingPhase::Done) { + StreamingPhase::Planning { + encoder, + content_type, + } => (encoder, content_type), + other => { + g.phase = other; + anyhow::bail!("streaming_upload_finalize_plan: not in planning phase"); + } + }; + ( + encoder, + content_type, + g.dek.clone(), + g.storage_key.clone(), + g.key.clone(), + g.client.clone(), + ) + }; + // streaming_finalize_plan is synchronous; the client read guard is only for + // method access. No handle lock held here. + let (chunked_metadata, private_meta, encrypted_meta) = { + let guard = client.inner.read().await; + guard.streaming_finalize_plan( + encoder, + &dek, + &storage_key, + &key, + content_type.as_deref(), + )? + }; + let num_chunks = chunked_metadata.num_chunks; + let chunk_size = chunked_metadata.chunk_size; + // Region 2: install the uploading phase. + { + let mut g = handle.inner.lock().unwrap(); + g.phase = StreamingPhase::Uploading { + chunked_metadata: std::sync::Arc::new(chunked_metadata), + private_meta, + encrypted_meta, + chunk_cids: vec![None; num_chunks as usize], + }; + } + Ok(StreamingPlanInfo { + num_chunks, + chunk_size, + }) +} + +/// Pass 2 — encrypt + upload one chunk (pushed from Dart) using its committed +/// nonce. Safe to call concurrently for distinct indices (Dart bounds the +/// concurrency) and idempotent, so safe to retry. +pub async fn streaming_upload_chunk( + handle: &StreamingUploadHandle, + chunk_index: u32, + bytes: Vec, +) -> anyhow::Result<()> { + let (client, bucket, storage_key, dek, chunked_metadata) = { + let g = handle.inner.lock().unwrap(); + match &g.phase { + StreamingPhase::Uploading { + chunked_metadata, .. + } => ( + g.client.clone(), + g.bucket.clone(), + g.storage_key.clone(), + g.dek.clone(), + chunked_metadata.clone(), + ), + _ => anyhow::bail!("streaming_upload_chunk: not in uploading phase"), + } + }; + let (_chunk_key, cid) = { + let guard = client.inner.read().await; + guard + .streaming_put_chunk( + &bucket, + &storage_key, + &chunked_metadata, + chunk_index, + &bytes, + &dek, + ) + .await? + }; + { + let mut g = handle.inner.lock().unwrap(); + if let StreamingPhase::Uploading { chunk_cids, .. } = &mut g.phase { + if let Some(slot) = chunk_cids.get_mut(chunk_index as usize) { + *slot = cid; + } + } + } + Ok(()) +} + +/// Finalize ("commit") — write the index object, register the file in the +/// encrypted forest, and flush. After this the file is listable; before it the +/// uploaded chunks are unreferenced. +pub async fn streaming_upload_finish( + handle: &StreamingUploadHandle, +) -> anyhow::Result { + #[allow(clippy::type_complexity)] + let ( + client, + bucket, + key, + storage_key, + wrapped_dek, + encrypted_meta, + kek_version, + private_meta, + chunked_metadata, + chunk_cids, + ): ( + EncryptedClientHandle, + String, + String, + String, + fula_crypto::hpke::EncryptedData, + fula_crypto::private_metadata::EncryptedPrivateMetadata, + u32, + fula_crypto::private_metadata::PrivateMetadata, + std::sync::Arc, + Vec>, + ) = { + let mut g = handle.inner.lock().unwrap(); + let inner = &mut *g; + match std::mem::replace(&mut inner.phase, StreamingPhase::Done) { + StreamingPhase::Uploading { + chunked_metadata, + private_meta, + encrypted_meta, + chunk_cids, + } => ( + inner.client.clone(), + inner.bucket.clone(), + inner.key.clone(), + inner.storage_key.clone(), + inner.wrapped_dek.clone(), + encrypted_meta, + inner.kek_version, + private_meta, + chunked_metadata, + chunk_cids, + ), + other => { + inner.phase = other; + anyhow::bail!("streaming_upload_finish: not in uploading phase"); + } + } + }; + // Sole owner now that the phase moved out (any racing upload_chunk Arc clone + // is transient); clone as a fallback if one is still in flight. + let chunked_metadata = std::sync::Arc::try_unwrap(chunked_metadata) + .unwrap_or_else(|arc| (*arc).clone()); + let result = { + let guard = client.inner.read().await; + guard + .streaming_finish( + &bucket, + &key, + &storage_key, + &wrapped_dek, + &encrypted_meta, + kek_version, + &private_meta, + chunked_metadata, + chunk_cids, + ) + .await? + }; + Ok(result.into()) +} + /// [`put_flat_resumable_from_path_cancellable`] with live progress (native). #[cfg(not(target_arch = "wasm32"))] pub async fn put_flat_resumable_from_path_with_progress( From 5362135862dbc9b8cd4efe70f48084d37dfac545 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 18 Jun 2026 13:45:07 -0400 Subject: [PATCH 6/6] P2: real-server streaming upload e2e -- PASSES byte-exact (50MB / 200 chunks) Real-gateway e2e (#[ignore]; run with the Mode A creds): drives the streaming sequence (begin -> plan-only encoder -> finalize_plan -> put_chunk loop -> finish) for a 50 MB / 200-chunk file, downloads via get_object_flat, asserts byte-exact. 200 chunks @ 256 KB pushes the index metadata past the 16 KB header budget, so it also exercises header_safe_enc_metadata stripping + body/forest fallback on the real server. Verified on the production gateway: 52,428,800 bytes round-tripped byte-exact in 108s. Complements the hermetic streaming_upload_roundtrip.rs (mock). Co-Authored-By: Claude Opus 4.8 --- .../fula-client/tests/streaming_upload_e2e.rs | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 crates/fula-client/tests/streaming_upload_e2e.rs diff --git a/crates/fula-client/tests/streaming_upload_e2e.rs b/crates/fula-client/tests/streaming_upload_e2e.rs new file mode 100644 index 0000000..f72483c --- /dev/null +++ b/crates/fula-client/tests/streaming_upload_e2e.rs @@ -0,0 +1,154 @@ +//! E2E (real server): the PUSH-model streaming upload must produce a normally +//! downloadable, BYTE-EXACT object on the real gateway. Validates P2 of +//! docs/web-streaming-resumable-upload-plan.md against production — not just the +//! hermetic mock in streaming_upload_roundtrip.rs. +//! +//! Drives the exact sequence the FRB handle uses: +//! streaming_begin -> plan-only ChunkedEncoder (pass 1) -> streaming_finalize_plan +//! -> streaming_put_chunk loop (pass 2) -> streaming_finish +//! then downloads via get_object_flat and asserts byte-exact. The file is large +//! enough (~50 MB, ~200 chunks at 256 KB) that the index metadata exceeds the +//! gateway's 16 KB header budget, so this also exercises header_safe_enc_metadata +//! stripping + the body/forest fallback on the real server. +//! +//! `#[ignore]` — needs network + real credentials. Run (PowerShell, env loaded +//! from e2e-credentials.env): +//! cargo test -p fula-client --test streaming_upload_e2e --release -- --ignored --nocapture +//! +//! Required env: FULA_S3, FULA_JWT, FULA_TEST_PROVIDER, FULA_TEST_OAUTH_SUB, +//! FULA_TEST_EMAIL (the Mode A derivation triple). + +#![cfg(not(target_arch = "wasm32"))] + +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::chunked::ChunkedEncoder; +use fula_crypto::keys::SecretKey; + +fn env(name: &str) -> String { + std::env::var(name).unwrap_or_else(|_| panic!("missing required env {name}")) +} + +#[tokio::test] +#[ignore = "real-server; needs FULA_S3 + FULA_JWT + Mode A triple"] +async fn streaming_upload_large_file_roundtrips_on_real_server() { + let s3 = env("FULA_S3"); + let jwt = env("FULA_JWT"); + let input = format!( + "{}:{}:{}", + env("FULA_TEST_PROVIDER"), + env("FULA_TEST_OAUTH_SUB"), + env("FULA_TEST_EMAIL"), + ); + let kek = fula_crypto::hashing::derive_key_argon2id("fula-files-v1", input.as_bytes()); + let secret = SecretKey::from_bytes(&kek).expect("32-byte secret from Argon2id"); + + let mut config = Config::new(&s3).with_token(&jwt); + // Match FxFiles production: stamps chunk_cids into the index metadata. + config.walkable_v8_writer_enabled = true; + let client = EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)) + .expect("EncryptedClient::new"); + + let epoch = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let bucket = format!("e2e-streaming-{epoch}-v8"); + eprintln!("[streaming_e2e] BUCKET={bucket}"); + if let Err(e) = client.create_bucket(&bucket).await { + eprintln!("[streaming_e2e] create_bucket({bucket}) -> {e} (continuing)"); + } + + // ~50 MB -> ~200 chunks at 256 KB -> index metadata over the 16 KB header + // budget (exercises header_safe_enc_metadata stripping + body fallback). + let size = 50 * 1024 * 1024; + let mut data = vec![0u8; size]; + for (i, b) in data.iter_mut().enumerate() { + *b = ((i * 31 + 7) % 251) as u8; + } + let key_path = "/big-streamed.bin"; + + // ---- streaming upload (push model; mirrors the FRB handle) ---- + eprintln!("[streaming_e2e] streaming_begin..."); + let (storage_key, dek, wrapped_dek, kek_version) = client + .streaming_begin(&bucket, key_path) + .await + .expect("streaming_begin"); + + // pass 1: plan-only encoder (default 256 KB chunks, matching production), + // fed in arbitrary 1 MiB streaming slices. + let aad_prefix = format!("fula:v4:chunk:{}", storage_key); + let mut encoder = + ChunkedEncoder::with_aad(dek.clone(), aad_prefix.into_bytes()).into_plan_only(); + eprintln!("[streaming_e2e] pass 1 (plan)..."); + for slice in data.chunks(1024 * 1024) { + encoder.update(slice).expect("plan update"); + } + let (chunked_metadata, private_meta, encrypted_meta) = client + .streaming_finalize_plan(encoder, &dek, &storage_key, key_path, Some("application/octet-stream")) + .expect("streaming_finalize_plan"); + + let cs = chunked_metadata.chunk_size as usize; + let num_chunks = chunked_metadata.num_chunks as usize; + eprintln!("[streaming_e2e] {num_chunks} chunks @ {cs} bytes; pass 2 (upload)..."); + assert!( + num_chunks > 120, + "need >120 chunks to exceed the 16 KB header budget (got {num_chunks})" + ); + + // pass 2: upload each chunk from its committed nonce. + let mut chunk_cids = vec![None; num_chunks]; + for i in 0..num_chunks { + let start = i * cs; + let end = ((i + 1) * cs).min(data.len()); + let (_chunk_key, cid) = client + .streaming_put_chunk( + &bucket, + &storage_key, + &chunked_metadata, + i as u32, + &data[start..end], + &dek, + ) + .await + .unwrap_or_else(|e| panic!("streaming_put_chunk[{i}] failed: {e}")); + chunk_cids[i] = cid; + if i % 50 == 0 { + eprintln!("[streaming_e2e] uploaded chunk {i}/{num_chunks}"); + } + } + + eprintln!("[streaming_e2e] streaming_finish..."); + client + .streaming_finish( + &bucket, + key_path, + &storage_key, + &wrapped_dek, + &encrypted_meta, + kek_version, + &private_meta, + chunked_metadata, + chunk_cids, + ) + .await + .expect("streaming_finish"); + eprintln!("[streaming_e2e] upload OK"); + + // ---- download + verify byte-exact (the gate) ---- + eprintln!("[streaming_e2e] downloading via get_object_flat..."); + let downloaded = client + .get_object_flat(&bucket, key_path) + .await + .expect("get_object_flat"); + assert_eq!(downloaded.len(), data.len(), "round-trip length mismatch"); + assert_eq!( + downloaded.as_ref(), + data.as_slice(), + "streaming upload -> download MUST be byte-exact on the real gateway" + ); + eprintln!( + "[streaming_e2e] OK: {} bytes ({} chunks) round-tripped byte-exact via streaming on the real gateway", + data.len(), + num_chunks + ); +}