diff --git a/Cargo.lock b/Cargo.lock index 1c7c6b6..bcbacd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1736,7 +1736,7 @@ dependencies = [ [[package]] name = "fula-api" -version = "0.6.10" +version = "0.6.11" dependencies = [ "anyhow", "axum", @@ -1765,7 +1765,7 @@ dependencies = [ [[package]] name = "fula-blockstore" -version = "0.6.10" +version = "0.6.11" dependencies = [ "anyhow", "async-trait", @@ -1803,7 +1803,7 @@ dependencies = [ [[package]] name = "fula-cli" -version = "0.6.10" +version = "0.6.11" dependencies = [ "anyhow", "async-trait", @@ -1857,7 +1857,7 @@ dependencies = [ [[package]] name = "fula-client" -version = "0.6.10" +version = "0.6.11" dependencies = [ "anyhow", "async-trait", @@ -1900,7 +1900,7 @@ dependencies = [ [[package]] name = "fula-core" -version = "0.6.10" +version = "0.6.11" dependencies = [ "anyhow", "async-trait", @@ -1935,7 +1935,7 @@ dependencies = [ [[package]] name = "fula-crypto" -version = "0.6.10" +version = "0.6.11" dependencies = [ "aes-gcm", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index ba1fdd7..f4075d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ name = "encrypted_upload_test" path = "examples/encrypted_upload_test.rs" [workspace.package] -version = "0.6.10" +version = "0.6.11" edition = "2021" license = "MIT OR Apache-2.0" repository = "https://github.com/functionland/fula-api" diff --git a/crates/fula-client/src/encryption.rs b/crates/fula-client/src/encryption.rs index 8ac2517..0710aae 100644 --- a/crates/fula-client/src/encryption.rs +++ b/crates/fula-client/src/encryption.rs @@ -6665,7 +6665,32 @@ impl EncryptedClient { let lock = self.bucket_write_lock(bucket); let _guard = lock.lock().await; let result = self - .put_object_flat_deferred_locked(bucket, key, data, content_type) + .put_object_flat_deferred_locked(bucket, key, data, content_type, None) + .await?; + self.flush_forest_locked(bucket).await?; + Ok(result) + } + + /// Like [`put_object_flat`], but reports cumulative upload progress + /// (`bytes_uploaded`, `total_bytes`) via `progress` as each content chunk's + /// PUT completes — the real per-chunk progress FxFiles uses to drive an + /// upload percentage on web + native (the SDK previously only exposed a + /// completion event, leaving apps with a time-based estimate). The callback + /// is `Send + Sync` so it can be invoked from the concurrent chunk-upload + /// tasks on both native (multi-thread) and wasm. Non-chunked (small) files + /// emit a single 100% event on completion via the caller. + pub async fn put_object_flat_with_progress( + &self, + bucket: &str, + key: &str, + data: impl Into, + content_type: Option<&str>, + progress: Arc, + ) -> Result { + let lock = self.bucket_write_lock(bucket); + let _guard = lock.lock().await; + let result = self + .put_object_flat_deferred_locked(bucket, key, data, content_type, Some(progress)) .await?; self.flush_forest_locked(bucket).await?; Ok(result) @@ -6701,7 +6726,7 @@ impl EncryptedClient { ) -> Result { let lock = self.bucket_write_lock(bucket); let _guard = lock.lock().await; - self.put_object_flat_deferred_locked(bucket, key, data, content_type) + self.put_object_flat_deferred_locked(bucket, key, data, content_type, None) .await } @@ -6718,6 +6743,7 @@ impl EncryptedClient { key: &str, data: impl Into, content_type: Option<&str>, + progress: Option>, ) -> Result { let data = data.into(); let original_size = data.len() as u64; @@ -6819,6 +6845,7 @@ impl EncryptedClient { &wrapped_dek, &encrypted_meta, kek_version, + progress, ).await? } else { // SINGLE OBJECT: File is small enough for one block @@ -7058,6 +7085,7 @@ impl EncryptedClient { wrapped_dek: &EncryptedData, encrypted_meta: &EncryptedPrivateMetadata, kek_version: u32, + progress: Option>, ) -> Result<(PutObjectResult, String, Option)> { // Create chunked encoder with AAD binding chunks to storage key let aad_prefix = format!("fula:v4:chunk:{}", storage_key); @@ -7088,6 +7116,12 @@ impl EncryptedClient { // same code runs on wasm32 (where tokio has no multi-thread runtime). use futures::StreamExt; let pinning = self.pinning.clone(); + // Real progress (FxFiles): cumulative bytes reported per completed + // chunk. Count-proportional to total so it lands exactly on + // `total_bytes` at the final chunk, regardless of completion order. + let total_bytes = data.len() as u64; + let num_chunks = (chunked_metadata.num_chunks as u64).max(1); + let uploaded_chunks = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); let futs = all_chunks.into_iter().map(|chunk| { let chunk_key = ChunkedFileMetadata::chunk_key(storage_key, chunk.index); let chunk_metadata = ObjectMetadata::new() @@ -7111,6 +7145,8 @@ impl EncryptedClient { let bucket = bucket.to_string(); let pinning = pinning.clone(); let chunk_key_ret = chunk_key.clone(); + let progress_cb = progress.clone(); + let uploaded_chunks = uploaded_chunks.clone(); async move { // FxFiles #50: the content-chunk PUT had no retry on EITHER @@ -7155,6 +7191,18 @@ impl EncryptedClient { } }) .await?; + // Real upload progress (FxFiles): emit cumulative bytes once + // this chunk's PUT (with retry) has succeeded. Count- + // proportional to the file size so the final chunk lands on + // exactly `total_bytes` (chunk plaintext sizes vary; the count + // is monotonic and exact at completion). Invoked from the + // concurrent upload tasks, hence the Send+Sync callback. + if let Some(ref cb) = progress_cb { + let done = uploaded_chunks + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + + 1; + cb(done.saturating_mul(total_bytes) / num_chunks, total_bytes); + } // W.9.4-A2: verify master's etag-attested CID against // the pre-computed BLAKE3(ciphertext). Mismatch // soft-fails to None — chunk PUT succeeded, only the @@ -7360,6 +7408,38 @@ impl EncryptedClient { content_type: Option<&str>, manifest_path: &std::path::Path, cancel: Option>, + ) -> Result { + // Thin wrapper preserving the pre-progress public signature + // (external test callers + the fula-flutter bridge depend on it). + // Delegates to the progress-aware body with no callback. + self.put_object_encrypted_resumable_with_cancel_and_progress( + bucket, key, data, content_type, manifest_path, cancel, None, + ) + .await + } + + /// Like [`put_object_encrypted_resumable_with_cancel`] but also reports + /// **cumulative byte progress** as each content chunk's PUT completes + /// (FxFiles upload-% — the SDK previously only exposed a time estimate). + /// + /// `progress(bytes_uploaded, total_bytes)` is invoked from the chunk + /// upload tasks (up to `MAX_CONCURRENT_CHUNK_UPLOADS` concurrently), so + /// the callback must be `Send + Sync`. `bytes_uploaded` is a count- + /// proportional cumulative estimate (`completed_chunks · total / N`); it + /// reaches `total_bytes` when the last chunk's PUT returns, BEFORE the + /// index PUT + forest-flush tail — UIs that show a bar should cap at + /// <100% until their own completion signal. Pass `None` for `progress` + /// for behaviour identical to the wrapper above. + #[cfg(not(target_arch = "wasm32"))] + pub async fn put_object_encrypted_resumable_with_cancel_and_progress( + &self, + bucket: &str, + key: &str, + data: impl Into, + content_type: Option<&str>, + manifest_path: &std::path::Path, + cancel: Option>, + progress: Option>, ) -> Result { // Issue #16 extension: serialize the resumable write path against // other same-bucket writers (`put_object_flat`, `flush_forest`, @@ -7473,6 +7553,13 @@ impl EncryptedClient { let semaphore = Arc::new(tokio::sync::Semaphore::new(Self::MAX_CONCURRENT_CHUNK_UPLOADS)); let mut handles = Vec::with_capacity(all_chunks.len()); + // FxFiles upload-%: count-proportional cumulative progress. Each + // completed chunk advances the bar by `total/N`; emitted from inside + // the spawned tasks so it reflects real concurrent completion order. + let progress_total_bytes = original_size; + let progress_num_chunks = (num_chunks_total as u64).max(1); + let uploaded_chunks = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + for chunk in all_chunks { let chunk_key = ChunkedFileMetadata::chunk_key(&storage_key, chunk.index); let chunk_key_ret = chunk_key.clone(); @@ -7499,6 +7586,11 @@ impl EncryptedClient { // each chunk's PUT can short-circuit cooperatively. let cancel_for_task = cancel.clone(); + // FxFiles upload-%: per-task progress handles (callback + shared + // counter); `progress_total_bytes`/`progress_num_chunks` are Copy. + let progress_cb = progress.clone(); + let uploaded_chunks = uploaded_chunks.clone(); + let handle = tokio::spawn(async move { // Issue #18: check BEFORE acquiring the permit so cancelled // chunks don't even hold a concurrency slot. Tasks already @@ -7545,6 +7637,18 @@ impl EncryptedClient { ), _ => None, }; + // FxFiles upload-%: advance cumulative progress now this + // chunk's PUT completed. Count-proportional so the final + // chunk's emit equals `total_bytes` (100%). + if let Some(ref cb) = progress_cb { + let done = uploaded_chunks + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + + 1; + cb( + done.saturating_mul(progress_total_bytes) / progress_num_chunks, + progress_total_bytes, + ); + } Ok::<(u32, String, Option), ClientError>((chunk_idx, chunk_key_ret, chunk_cid)) }); handles.push(handle); @@ -7960,6 +8064,31 @@ impl EncryptedClient { manifest_path: &std::path::Path, data: &[u8], cancel: Option>, + ) -> Result { + // Thin wrapper preserving the pre-progress public signature + // (external test callers + the fula-flutter bridge depend on it). + self.resume_upload_with_cancel_and_progress(manifest_path, data, cancel, None) + .await + } + + /// Like [`resume_upload_with_cancel`] but reports cumulative byte + /// progress as each re-uploaded chunk completes (FxFiles upload-%). + /// + /// The cumulative counter is **seeded with the already-uploaded chunk + /// count** from the manifest, so the bar continues from where the + /// interrupted attempt stopped and still reaches `total_bytes` even + /// though only the remaining chunks are re-PUT. The callback runs from + /// the chunk tasks (must be `Send + Sync`); see + /// [`put_object_encrypted_resumable_with_cancel_and_progress`] for the + /// premature-100% / completion-cap note. A manifest with nothing left + /// (`remaining() == 0`) finalizes without emitting. + #[cfg(not(target_arch = "wasm32"))] + pub async fn resume_upload_with_cancel_and_progress( + &self, + manifest_path: &std::path::Path, + data: &[u8], + cancel: Option>, + progress: Option>, ) -> Result { // Issue #16 + #17: acquire the per-bucket write mutex AFTER an // initial load to identify the bucket (the name lives in the @@ -8089,6 +8218,18 @@ impl EncryptedClient { let semaphore = Arc::new(tokio::sync::Semaphore::new(Self::MAX_CONCURRENT_CHUNK_UPLOADS)); let mut handles = Vec::new(); + // FxFiles upload-%: seed the cumulative counter with the chunks the + // interrupted attempt already uploaded so the bar continues from + // where it stopped and still reaches `total_bytes` (only the + // remaining chunks are re-PUT below). `total_size` read here, before + // the post-upload `populate_chunk_cids` mutation; it's a Copy u64. + let progress_total_bytes = chunked_meta.total_size; + let progress_num_chunks = (manifest.num_chunks as u64).max(1); + let progress_already_uploaded = + (manifest.num_chunks as u64).saturating_sub(manifest.remaining() as u64); + let uploaded_chunks = + std::sync::Arc::new(std::sync::atomic::AtomicU64::new(progress_already_uploaded)); + for mc in &manifest.chunks { if mc.uploaded { continue; @@ -8134,6 +8275,11 @@ impl EncryptedClient { // each chunk's PUT can short-circuit cooperatively. let cancel_for_task = cancel.clone(); + // FxFiles upload-%: per-task progress handles (callback + shared + // counter); `progress_total_bytes`/`progress_num_chunks` are Copy. + let progress_cb = progress.clone(); + let uploaded_chunks = uploaded_chunks.clone(); + let handle = tokio::spawn(async move { // Issue #18: check BEFORE acquiring the permit so cancelled // chunks don't even hold a concurrency slot. Same semantics @@ -8170,6 +8316,19 @@ impl EncryptedClient { ), _ => None, }; + // FxFiles upload-%: advance cumulative progress (seeded with + // already-uploaded chunks) now this re-uploaded chunk's PUT + // completed. Reaches `total_bytes` when the last remaining + // chunk lands. + if let Some(ref cb) = progress_cb { + let done = uploaded_chunks + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + + 1; + cb( + done.saturating_mul(progress_total_bytes) / progress_num_chunks, + progress_total_bytes, + ); + } Ok::<(u32, String, Option), ClientError>((chunk_index, chunk_key_ret, chunk_cid)) }); handles.push(handle); diff --git a/crates/fula-client/tests/chunk_put_progress.rs b/crates/fula-client/tests/chunk_put_progress.rs new file mode 100644 index 0000000..0f78a72 --- /dev/null +++ b/crates/fula-client/tests/chunk_put_progress.rs @@ -0,0 +1,87 @@ +//! TDD for chunk-granular upload progress (FxFiles "show upload %"). +//! +//! A chunked `put_object_flat_with_progress` must report **cumulative** +//! bytes_uploaded as each chunk completes, with `total` = the file size and a +//! final event equal to `total` (100%). This is the real progress the SDK +//! previously didn't expose (apps only had a time-based estimate). RED before +//! the progress callback exists; GREEN after. + +#![cfg(not(target_arch = "wasm32"))] + +use bytes::Bytes; +use cid::multihash::Multihash; +use cid::Cid; +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; +use std::sync::{Arc, Mutex}; +use wiremock::matchers::method; +use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate}; + +fn blake3_raw_cid(data: &[u8]) -> Cid { + let h = blake3::hash(data); + let mh = Multihash::<64>::wrap(0x1e, h.as_bytes()).expect("blake3 multihash wrap"); + Cid::new_v1(0x55, mh) +} + +/// 200 + matching ETag for every PUT so the W.9.3/W.9.4 self-verify accepts it. +struct EtagResponder; +impl Respond for EtagResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + ResponseTemplate::new(200).insert_header("ETag", blake3_raw_cid(&req.body).to_string()) + } +} + +#[tokio::test] +async fn chunked_put_reports_cumulative_progress() { + let server = MockServer::start().await; + Mock::given(method("PUT")).respond_with(EtagResponder).mount(&server).await; + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(404)) + .mount(&server) + .await; + Mock::given(method("HEAD")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + let mut config = Config::new(&server.uri()).with_token("test-jwt"); + config.walkable_v8_writer_enabled = true; + let secret = SecretKey::generate(); + let client = EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)) + .expect("EncryptedClient::new"); + + // 2 MiB > CHUNKED_THRESHOLD (768 KB) ⇒ ~3 chunks ⇒ multiple progress events. + let data = vec![0xCDu8; 2 * 1024 * 1024]; + let total = data.len() as u64; + + let events: Arc>> = Arc::new(Mutex::new(Vec::new())); + let ev = events.clone(); + let progress: Arc = Arc::new(move |done, tot| { + ev.lock().unwrap().push((done, tot)); + }); + + client + .put_object_flat_with_progress( + "videos-v8", + "/promo.mp4", + Bytes::from(data), + Some("video/mp4"), + progress, + ) + .await + .expect("chunked upload must succeed"); + + let evs = events.lock().unwrap().clone(); + assert!(!evs.is_empty(), "progress must be reported at least once"); + assert!( + evs.iter().all(|(_, t)| *t == total), + "every event's total must equal the file size ({total})" + ); + for (done, _) in &evs { + assert!(*done <= total, "cumulative bytes must not exceed total"); + } + // Under buffer_unordered(16) the completion order != push order, so assert + // the MAX reported cumulative reaches total (not that the last-pushed does). + let max_done = evs.iter().map(|(d, _)| *d).max().unwrap(); + assert_eq!(max_done, total, "progress must reach 100% (max cumulative == total)"); +} diff --git a/crates/fula-client/tests/resumable_put_progress.rs b/crates/fula-client/tests/resumable_put_progress.rs new file mode 100644 index 0000000..57a1ee3 --- /dev/null +++ b/crates/fula-client/tests/resumable_put_progress.rs @@ -0,0 +1,217 @@ +//! Chunk-granular upload progress on the NATIVE resumable paths. +//! +//! The web path (`put_object_flat_with_progress`) is covered by +//! `chunk_put_progress.rs`. These tests exercise the separate (mirrored) +//! native chunk loops: +//! * `put_object_encrypted_resumable_with_cancel_and_progress` (fresh) +//! * `resume_upload_with_cancel_and_progress` (resume) +//! The resume test is the important one: it proves the progress counter is +//! **seeded with the already-uploaded chunk count**, so the bar continues +//! from where an interrupted attempt stopped and still reaches 100% even +//! though only the *remaining* chunks are re-uploaded. + +#![cfg(not(target_arch = "wasm32"))] + +use bytes::Bytes; +use cid::multihash::Multihash; +use cid::Cid; +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use tempfile::TempDir; +use wiremock::matchers::method; +use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate}; + +fn blake3_raw_cid(data: &[u8]) -> Cid { + let h = blake3::hash(data); + let mh = Multihash::<64>::wrap(0x1e, h.as_bytes()).expect("blake3 multihash wrap"); + Cid::new_v1(0x55, mh) +} + +/// 200 + matching ETag so the W.9.3/W.9.4 self-verify accepts the PUT. +fn ok_with_etag(req: &Request) -> ResponseTemplate { + ResponseTemplate::new(200).insert_header("ETag", blake3_raw_cid(&req.body).to_string()) +} + +/// Every PUT succeeds (fresh-path test). +struct EtagResponder; +impl Respond for EtagResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + ok_with_etag(req) + } +} + +/// Two-phase PUT responder for the resume test, with NO dependency on the +/// (encoder-determined) chunk count: +/// * While `phase2 == false`: the first `ok_budget` PUTs succeed and the +/// rest fail with a NON-transient 403 (so the SDK doesn't retry them). +/// A phase-1 upload therefore lands EXACTLY `ok_budget` chunks and then +/// errors, leaving a real partial manifest on disk. +/// * Once the test flips `phase2 = true`: every PUT succeeds, so the +/// resume re-uploads the remaining chunks + finalize + forest flush. +struct PhasedResponder { + seen: AtomicUsize, + phase2: Arc, + ok_budget: usize, +} +impl Respond for PhasedResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + if self.phase2.load(Ordering::Acquire) { + return ok_with_etag(req); + } + let n = self.seen.fetch_add(1, Ordering::SeqCst); + if n < self.ok_budget { + ok_with_etag(req) + } else { + ResponseTemplate::new(403) + } + } +} + +fn make_client(uri: &str, secret: SecretKey) -> EncryptedClient { + let mut config = Config::new(uri).with_token("test-jwt"); + config.walkable_v8_writer_enabled = true; + EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)) + .expect("EncryptedClient::new") +} + +type Events = Arc>>; +fn progress_collector() -> (Events, Arc) { + let events: Events = Arc::new(Mutex::new(Vec::new())); + let ev = events.clone(); + let cb: Arc = Arc::new(move |done, tot| { + ev.lock().unwrap().push((done, tot)); + }); + (events, cb) +} + +/// Fresh resumable upload reports cumulative progress reaching 100%. +#[tokio::test] +async fn resumable_fresh_reports_cumulative_progress() { + let server = MockServer::start().await; + Mock::given(method("PUT")).respond_with(EtagResponder).mount(&server).await; + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(404)) + .mount(&server) + .await; + Mock::given(method("HEAD")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + let client = make_client(&server.uri(), SecretKey::generate()); + // 2 MiB > CHUNKED_THRESHOLD (768 KB) ⇒ multiple chunks ⇒ multiple events. + let data = vec![0xABu8; 2 * 1024 * 1024]; + let total = data.len() as u64; + + let manifest_dir = TempDir::new().expect("manifest tempdir"); + let manifest_path = manifest_dir.path().join("fresh.manifest"); + + let (events, cb) = progress_collector(); + client + .put_object_encrypted_resumable_with_cancel_and_progress( + "videos-v8", + "/fresh.mp4", + Bytes::from(data), + Some("video/mp4"), + &manifest_path, + None, // no cancel + Some(cb), // progress + ) + .await + .expect("fresh resumable upload must succeed"); + + let evs = events.lock().unwrap().clone(); + assert!(!evs.is_empty(), "progress must be reported at least once"); + assert!( + evs.iter().all(|(_, t)| *t == total), + "every event's total must equal the file size ({total})" + ); + for (done, _) in &evs { + assert!(*done <= total, "cumulative bytes must not exceed total"); + } + // buffer/spawn completion order != push order, so assert the MAX reached + // total (not the last-pushed). + let max_done = evs.iter().map(|(d, _)| *d).max().unwrap(); + assert_eq!(max_done, total, "fresh progress must reach 100% (max == total)"); +} + +/// Resume seeds the cumulative counter with already-uploaded chunks, so the +/// bar continues from where it stopped and still reaches 100%. +#[tokio::test] +async fn resume_reports_seeded_cumulative_progress() { + let secret = SecretKey::generate(); + let data = vec![0xCDu8; 2 * 1024 * 1024]; + let total = data.len() as u64; + + let manifest_dir = TempDir::new().expect("manifest tempdir"); + let manifest_path = manifest_dir.path().join("resume.manifest"); + + let phase2 = Arc::new(AtomicBool::new(false)); + let server = MockServer::start().await; + Mock::given(method("PUT")) + .respond_with(PhasedResponder { + seen: AtomicUsize::new(0), + phase2: phase2.clone(), + ok_budget: 1, // exactly one chunk lands in phase 1 + }) + .mount(&server) + .await; + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(404)) + .mount(&server) + .await; + Mock::given(method("HEAD")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + let client = make_client(&server.uri(), secret); + + // ── Phase 1: only one chunk is allowed to land, the rest 403 ⇒ the + // upload fails and a real partial manifest persists. + let phase1 = client + .put_object_encrypted_resumable_with_cancel( + "videos-v8", + "/resume.mp4", + Bytes::from(data.clone()), + Some("video/mp4"), + &manifest_path, + None, + ) + .await; + assert!(phase1.is_err(), "phase 1 must fail (only one chunk allowed through)"); + assert!( + manifest_path.exists(), + "a partial manifest must persist after the failed attempt" + ); + + // ── Phase 2: let everything through; resume re-uploads ONLY the + // remaining chunks. A correctly seeded counter still reaches `total`. + phase2.store(true, Ordering::Release); + + let (events, cb) = progress_collector(); + client + .resume_upload_with_cancel_and_progress(&manifest_path, &data, None, Some(cb)) + .await + .expect("resume must succeed"); + + let evs = events.lock().unwrap().clone(); + assert!(!evs.is_empty(), "resume must report progress for re-uploaded chunks"); + assert!( + evs.iter().all(|(_, t)| *t == total), + "every event's total must equal the file size ({total})" + ); + let min_done = evs.iter().map(|(d, _)| *d).min().unwrap(); + let max_done = evs.iter().map(|(d, _)| *d).max().unwrap(); + assert!(min_done > 0, "a seeded counter never emits a zero-based event"); + // DECISIVE seed proof: phase 1 already landed one chunk, so the resume + // re-uploads strictly fewer than N chunks. An UNSEEDED counter could + // only reach (remaining/N)·total < total. Reaching `total` is possible + // ONLY if the counter was seeded with the already-uploaded chunk count. + assert_eq!( + max_done, total, + "seeded resume progress must reach 100% (max == total)" + ); +} diff --git a/crates/fula-client/tests/upload_progress_real_server_e2e.rs b/crates/fula-client/tests/upload_progress_real_server_e2e.rs new file mode 100644 index 0000000..fb0f8d3 --- /dev/null +++ b/crates/fula-client/tests/upload_progress_real_server_e2e.rs @@ -0,0 +1,172 @@ +//! Real-server E2E for chunk-granular upload progress (FxFiles "show upload +//! %"). Uploads a LARGE (multi-chunk) encrypted file against the LIVE master +//! and asserts the SDK emits cumulative byte progress that reaches 100%, on +//! BOTH the web path (`put_object_flat_with_progress`) and the native +//! resumable path (`put_object_encrypted_resumable_with_cancel_and_progress`). +//! Each upload is round-tripped byte-for-byte so progress can't pass while +//! the upload itself is broken. +//! +//! `#[ignore]` — needs network + real credentials. Run (PowerShell, env from +//! `e2e-credentials.env`): +//! +//! ```powershell +//! cargo test -p fula-client --test upload_progress_real_server_e2e --release ` +//! -- --ignored --nocapture +//! ``` +//! +//! Required env: `FULA_S3`, `FULA_JWT`, `FULA_TEST_PROVIDER`, +//! `FULA_TEST_OAUTH_SUB`, `FULA_TEST_EMAIL` (the Mode A derivation triple — +//! Argon2id("fula-files-v1", "{provider}:{sub}:{email}"), auth_service.dart +//! byte-for-byte). Fresh, uniquely-named buckets keep the test self-contained. + +#![cfg(not(target_arch = "wasm32"))] + +use bytes::Bytes; +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; +use std::sync::{Arc, Mutex}; + +fn env(name: &str) -> String { + std::env::var(name).unwrap_or_else(|_| panic!("missing required env {name}")) +} + +fn make_client() -> EncryptedClient { + let s3 = env("FULA_S3"); + let jwt = env("FULA_JWT"); + let input = format!( + "{}:{}:{}", + env("FULA_TEST_PROVIDER"), + env("FULA_TEST_OAUTH_SUB"), + env("FULA_TEST_EMAIL"), + ); + // Mode A secret = Argon2id("fula-files-v1", "{provider}:{sub}:{email}"). + let key = fula_crypto::hashing::derive_key_argon2id("fula-files-v1", input.as_bytes()); + let secret = SecretKey::from_bytes(&key).expect("32-byte secret from Argon2id"); + + let mut config = Config::new(&s3).with_token(&jwt); + config.walkable_v8_writer_enabled = true; // match FxFiles production + EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)) + .expect("EncryptedClient::new") +} + +fn fresh_bucket(tag: &str) -> String { + let epoch = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + format!("e2e-progress-{tag}-{epoch}-v8") +} + +fn payload(size: usize) -> Vec { + let mut data = vec![0u8; size]; + for (i, b) in data.iter_mut().enumerate() { + *b = (i % 251) as u8; // non-trivial pattern => meaningful round-trip + } + data +} + +fn progress_collector() -> (Arc>>, Arc) { + let events: Arc>> = Arc::new(Mutex::new(Vec::new())); + let ev = events.clone(); + let cb: Arc = + Arc::new(move |done, tot| ev.lock().unwrap().push((done, tot))); + (events, cb) +} + +fn assert_progress_reaches_100(evs: &[(u64, u64)], total: u64) { + assert!(!evs.is_empty(), "progress must be reported at least once"); + assert!( + evs.iter().all(|(_, t)| *t == total), + "every event's total must equal the file size ({total})" + ); + for (done, _) in evs { + assert!(*done <= total, "cumulative bytes must not exceed total"); + } + let max_done = evs.iter().map(|(d, _)| *d).max().unwrap(); + assert_eq!(max_done, total, "progress must reach 100% (max cumulative == total)"); +} + +/// Web path: `put_object_flat_with_progress` (what FxFiles web uses). +#[tokio::test] +#[ignore = "real-server E2E; needs FULA_S3 + FULA_JWT + Mode A triple"] +async fn web_path_progress_reaches_100_against_real_master() { + let client = make_client(); + let bucket = fresh_bucket("web"); + if let Err(e) = client.create_bucket(&bucket).await { + eprintln!("[progress e2e] create_bucket({bucket}) returned {e} — continuing"); + } + + // 3 MiB > CHUNKED_THRESHOLD (768 KB) ⇒ multi-chunk concurrent upload. + let data = payload(3 * 1024 * 1024); + let total = data.len() as u64; + let key_path = "/promo-web-progress.bin"; + + let (events, cb) = progress_collector(); + client + .put_object_flat_with_progress( + &bucket, + key_path, + Bytes::from(data.clone()), + Some("application/octet-stream"), + cb, + ) + .await + .expect("web-path chunked upload must succeed against the live master"); + + let evs = events.lock().unwrap().clone(); + eprintln!("[progress e2e] web path: {} events, max={}", evs.len(), + evs.iter().map(|(d, _)| *d).max().unwrap_or(0)); + assert_progress_reaches_100(&evs, total); + + let got = client.get_object_flat(&bucket, key_path).await.expect("download"); + assert_eq!(&got[..], &data[..], "round-trip bytes must match"); + + let _ = client.delete_object_flat(&bucket, key_path).await; + let _ = client.delete_bucket(&bucket).await; + eprintln!("[progress e2e] web path OK (bucket {bucket})"); +} + +/// Native path: `put_object_encrypted_resumable_with_cancel_and_progress` +/// (what FxFiles native uses, via the resumable bridge). +#[tokio::test] +#[ignore = "real-server E2E; needs FULA_S3 + FULA_JWT + Mode A triple"] +async fn native_resumable_progress_reaches_100_against_real_master() { + let client = make_client(); + let bucket = fresh_bucket("native"); + if let Err(e) = client.create_bucket(&bucket).await { + eprintln!("[progress e2e] create_bucket({bucket}) returned {e} — continuing"); + } + + let data = payload(3 * 1024 * 1024); + let total = data.len() as u64; + let key_path = "/promo-native-progress.bin"; + + let manifest_dir = tempfile::TempDir::new().expect("manifest tempdir"); + let manifest_path = manifest_dir.path().join("e2e.manifest"); + + let (events, cb) = progress_collector(); + client + .put_object_encrypted_resumable_with_cancel_and_progress( + &bucket, + key_path, + Bytes::from(data.clone()), + Some("application/octet-stream"), + &manifest_path, + None, // no cancel + Some(cb), // progress + ) + .await + .expect("native resumable chunked upload must succeed against the live master"); + + let evs = events.lock().unwrap().clone(); + eprintln!("[progress e2e] native path: {} events, max={}", evs.len(), + evs.iter().map(|(d, _)| *d).max().unwrap_or(0)); + assert_progress_reaches_100(&evs, total); + + let got = client.get_object_flat(&bucket, key_path).await.expect("download"); + assert_eq!(&got[..], &data[..], "round-trip bytes must match"); + + let _ = client.delete_object_flat(&bucket, key_path).await; + let _ = client.delete_bucket(&bucket).await; + eprintln!("[progress e2e] native path OK (bucket {bucket})"); +} diff --git a/crates/fula-flutter/src/api/forest.rs b/crates/fula-flutter/src/api/forest.rs index a26b52f..29edbe0 100644 --- a/crates/fula-flutter/src/api/forest.rs +++ b/crates/fula-flutter/src/api/forest.rs @@ -545,6 +545,180 @@ pub async fn get_file_size(_file_path: String) -> anyhow::Result { anyhow::bail!("get_file_size is not supported on WASM; use the browser File API in Dart") } +// ============================================================================ +// Upload progress (polling) — real per-chunk percentage for FxFiles +// ============================================================================ +// +// The SDK reports cumulative byte progress via an `Arc` +// callback invoked as each content chunk's PUT completes (web + native). +// Rather than a Dart `Stream` (FRB `StreamSink`), which has never been +// exercised through this repo's external codegen/publish pipeline, progress +// is surfaced with the SAME opaque-handle + polling pattern already proven +// for `MasterHealthEvent` and `CancelHandle`: the app creates a +// [`ProgressHandle`], passes it to a `_with_progress` upload, and reads the +// latest value on a timer via [`poll_progress`] while the upload future is +// pending. The handle's `Arc` lifecycle is the cleanup — no global registry, +// no key collisions, no leak on early-return/panic. + +struct ProgressState { + /// Highest cumulative bytes reported so far (monotonic via `fetch_max`). + uploaded: std::sync::atomic::AtomicU64, + /// Total bytes of the upload (constant once the first chunk reports). + total: std::sync::atomic::AtomicU64, +} + +/// Opaque handle to an upload's live progress. Cheap to clone (clones the +/// inner `Arc`); the upload tasks and the poller observe the same counters. +#[derive(Clone)] +pub struct ProgressHandle { + inner: std::sync::Arc, +} + +/// Create a fresh progress handle (0 / 0). Pass it to a `_with_progress` +/// upload function and read it via [`poll_progress`]. +pub async fn create_progress_handle() -> ProgressHandle { + ProgressHandle { + inner: std::sync::Arc::new(ProgressState { + uploaded: std::sync::atomic::AtomicU64::new(0), + total: std::sync::atomic::AtomicU64::new(0), + }), + } +} + +/// Read the latest progress for an in-flight (or finished) upload. Safe to +/// call any time, from any thread; returns 0% before the first chunk lands. +/// +/// **100% caveat:** cumulative bytes reach `total` when the LAST content +/// chunk's PUT returns — BEFORE the index PUT + forest-flush tail. UIs that +/// show a determinate bar should cap at <100% until their own completion +/// signal (the upload future resolving), mirroring the mobile clamp. +pub async fn poll_progress(handle: &ProgressHandle) -> UploadProgress { + let uploaded = handle.inner.uploaded.load(std::sync::atomic::Ordering::Relaxed); + let total = handle.inner.total.load(std::sync::atomic::Ordering::Relaxed); + UploadProgress::new(uploaded, total, 0, 0) +} + +/// Build the SDK progress callback that feeds a [`ProgressHandle`]. Private +/// (not an FRB binding). `fetch_max` keeps the displayed value monotonic +/// even though up to 16 concurrent chunk tasks report cumulative values in +/// nondeterministic order; `total` is the same on every call. +fn progress_cb(handle: &ProgressHandle) -> std::sync::Arc { + let p = handle.inner.clone(); + std::sync::Arc::new(move |done, tot| { + // Store `total` first so a poll landing mid-callback never reads a + // nonzero `uploaded` against a still-zero `total` (transient 0% blip). + p.total.store(tot, std::sync::atomic::Ordering::Relaxed); + p.uploaded.fetch_max(done, std::sync::atomic::Ordering::Relaxed); + }) +} + +/// [`put_flat`] with live progress (web + native). Drive a percentage by +/// polling [`poll_progress`] on the passed [`ProgressHandle`] while this +/// future runs. +pub async fn put_flat_with_progress( + client: &EncryptedClientHandle, + bucket: String, + path: String, + data: Vec, + content_type: Option, + progress: &ProgressHandle, +) -> anyhow::Result { + let cb = progress_cb(progress); + let guard = client.inner.write().await; + let result = guard + .put_object_flat_with_progress(&bucket, &path, Bytes::from(data), content_type.as_deref(), cb) + .await?; + Ok(result.into()) +} + +/// [`put_flat_resumable_from_path_cancellable`] with live progress (native). +#[cfg(not(target_arch = "wasm32"))] +pub async fn put_flat_resumable_from_path_with_progress( + client: &EncryptedClientHandle, + bucket: String, + path: String, + file_path: String, + manifest_path: String, + content_type: Option, + cancel: &CancelHandle, + progress: &ProgressHandle, +) -> anyhow::Result { + let data = tokio::fs::read(&file_path) + .await + .with_context(|| format!("Failed to read file: {}", file_path))?; + let cb = progress_cb(progress); + let guard = client.inner.read().await; + let manifest = std::path::PathBuf::from(manifest_path); + let result = guard + .put_object_encrypted_resumable_with_cancel_and_progress( + &bucket, + &path, + Bytes::from(data), + content_type.as_deref(), + &manifest, + Some(cancel.inner.clone()), + Some(cb), + ) + .await?; + Ok(result.into()) +} + +#[cfg(target_arch = "wasm32")] +pub async fn put_flat_resumable_from_path_with_progress( + _client: &EncryptedClientHandle, + _bucket: String, + _path: String, + _file_path: String, + _manifest_path: String, + _content_type: Option, + _cancel: &CancelHandle, + _progress: &ProgressHandle, +) -> anyhow::Result { + anyhow::bail!( + "put_flat_resumable_from_path_with_progress is not supported on WASM; \ + use put_flat_with_progress from WASM" + ) +} + +/// [`resume_flat_upload_from_path_cancellable`] with live progress (native). +/// Progress continues from the chunks the interrupted attempt already +/// uploaded (the SDK seeds the counter), so the bar resumes mid-way. +#[cfg(not(target_arch = "wasm32"))] +pub async fn resume_flat_upload_from_path_with_progress( + client: &EncryptedClientHandle, + manifest_path: String, + file_path: String, + cancel: &CancelHandle, + progress: &ProgressHandle, +) -> anyhow::Result { + let data = tokio::fs::read(&file_path) + .await + .with_context(|| format!("Failed to read file: {}", file_path))?; + let cb = progress_cb(progress); + let guard = client.inner.read().await; + let manifest = std::path::PathBuf::from(manifest_path); + let result = guard + .resume_upload_with_cancel_and_progress( + &manifest, + &data, + Some(cancel.inner.clone()), + Some(cb), + ) + .await?; + Ok(result.into()) +} + +#[cfg(target_arch = "wasm32")] +pub async fn resume_flat_upload_from_path_with_progress( + _client: &EncryptedClientHandle, + _manifest_path: String, + _file_path: String, + _cancel: &CancelHandle, + _progress: &ProgressHandle, +) -> anyhow::Result { + anyhow::bail!("resume_flat_upload_from_path_with_progress is not supported on WASM") +} + // ============================================================================ // Subtree Operations (for Sharing) // ============================================================================