Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ name = "encrypted_upload_test"
path = "examples/encrypted_upload_test.rs"

[workspace.package]
version = "0.6.10"
version = "0.6.11"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/functionland/fula-api"
Expand Down
163 changes: 161 additions & 2 deletions crates/fula-client/src/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6665,7 +6665,32 @@ impl EncryptedClient {
let lock = self.bucket_write_lock(bucket);
let _guard = lock.lock().await;
let result = self
.put_object_flat_deferred_locked(bucket, key, data, content_type)
.put_object_flat_deferred_locked(bucket, key, data, content_type, None)
.await?;
self.flush_forest_locked(bucket).await?;
Ok(result)
}

/// Like [`put_object_flat`], but reports cumulative upload progress
/// (`bytes_uploaded`, `total_bytes`) via `progress` as each content chunk's
/// PUT completes — the real per-chunk progress FxFiles uses to drive an
/// upload percentage on web + native (the SDK previously only exposed a
/// completion event, leaving apps with a time-based estimate). The callback
/// is `Send + Sync` so it can be invoked from the concurrent chunk-upload
/// tasks on both native (multi-thread) and wasm. Non-chunked (small) files
/// emit a single 100% event on completion via the caller.
pub async fn put_object_flat_with_progress(
&self,
bucket: &str,
key: &str,
data: impl Into<Bytes>,
content_type: Option<&str>,
progress: Arc<dyn Fn(u64, u64) + Send + Sync>,
) -> Result<PutObjectResult> {
let lock = self.bucket_write_lock(bucket);
let _guard = lock.lock().await;
let result = self
.put_object_flat_deferred_locked(bucket, key, data, content_type, Some(progress))
.await?;
self.flush_forest_locked(bucket).await?;
Ok(result)
Expand Down Expand Up @@ -6701,7 +6726,7 @@ impl EncryptedClient {
) -> Result<PutObjectResult> {
let lock = self.bucket_write_lock(bucket);
let _guard = lock.lock().await;
self.put_object_flat_deferred_locked(bucket, key, data, content_type)
self.put_object_flat_deferred_locked(bucket, key, data, content_type, None)
.await
}

Expand All @@ -6718,6 +6743,7 @@ impl EncryptedClient {
key: &str,
data: impl Into<Bytes>,
content_type: Option<&str>,
progress: Option<Arc<dyn Fn(u64, u64) + Send + Sync>>,
) -> Result<PutObjectResult> {
let data = data.into();
let original_size = data.len() as u64;
Expand Down Expand Up @@ -6819,6 +6845,7 @@ impl EncryptedClient {
&wrapped_dek,
&encrypted_meta,
kek_version,
progress,
).await?
} else {
// SINGLE OBJECT: File is small enough for one block
Expand Down Expand Up @@ -7058,6 +7085,7 @@ impl EncryptedClient {
wrapped_dek: &EncryptedData,
encrypted_meta: &EncryptedPrivateMetadata,
kek_version: u32,
progress: Option<Arc<dyn Fn(u64, u64) + Send + Sync>>,
) -> Result<(PutObjectResult, String, Option<cid::Cid>)> {
// Create chunked encoder with AAD binding chunks to storage key
let aad_prefix = format!("fula:v4:chunk:{}", storage_key);
Expand Down Expand Up @@ -7088,6 +7116,12 @@ impl EncryptedClient {
// same code runs on wasm32 (where tokio has no multi-thread runtime).
use futures::StreamExt;
let pinning = self.pinning.clone();
// Real progress (FxFiles): cumulative bytes reported per completed
// chunk. Count-proportional to total so it lands exactly on
// `total_bytes` at the final chunk, regardless of completion order.
let total_bytes = data.len() as u64;
let num_chunks = (chunked_metadata.num_chunks as u64).max(1);
let uploaded_chunks = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let futs = all_chunks.into_iter().map(|chunk| {
let chunk_key = ChunkedFileMetadata::chunk_key(storage_key, chunk.index);
let chunk_metadata = ObjectMetadata::new()
Expand All @@ -7111,6 +7145,8 @@ impl EncryptedClient {
let bucket = bucket.to_string();
let pinning = pinning.clone();
let chunk_key_ret = chunk_key.clone();
let progress_cb = progress.clone();
let uploaded_chunks = uploaded_chunks.clone();

async move {
// FxFiles #50: the content-chunk PUT had no retry on EITHER
Expand Down Expand Up @@ -7155,6 +7191,18 @@ impl EncryptedClient {
}
})
.await?;
// Real upload progress (FxFiles): emit cumulative bytes once
// this chunk's PUT (with retry) has succeeded. Count-
// proportional to the file size so the final chunk lands on
// exactly `total_bytes` (chunk plaintext sizes vary; the count
// is monotonic and exact at completion). Invoked from the
// concurrent upload tasks, hence the Send+Sync callback.
if let Some(ref cb) = progress_cb {
let done = uploaded_chunks
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
cb(done.saturating_mul(total_bytes) / num_chunks, total_bytes);
}
// W.9.4-A2: verify master's etag-attested CID against
// the pre-computed BLAKE3(ciphertext). Mismatch
// soft-fails to None — chunk PUT succeeded, only the
Expand Down Expand Up @@ -7360,6 +7408,38 @@ impl EncryptedClient {
content_type: Option<&str>,
manifest_path: &std::path::Path,
cancel: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
) -> Result<PutObjectResult> {
// Thin wrapper preserving the pre-progress public signature
// (external test callers + the fula-flutter bridge depend on it).
// Delegates to the progress-aware body with no callback.
self.put_object_encrypted_resumable_with_cancel_and_progress(
bucket, key, data, content_type, manifest_path, cancel, None,
)
.await
}

/// Like [`put_object_encrypted_resumable_with_cancel`] but also reports
/// **cumulative byte progress** as each content chunk's PUT completes
/// (FxFiles upload-% — the SDK previously only exposed a time estimate).
///
/// `progress(bytes_uploaded, total_bytes)` is invoked from the chunk
/// upload tasks (up to `MAX_CONCURRENT_CHUNK_UPLOADS` concurrently), so
/// the callback must be `Send + Sync`. `bytes_uploaded` is a count-
/// proportional cumulative estimate (`completed_chunks · total / N`); it
/// reaches `total_bytes` when the last chunk's PUT returns, BEFORE the
/// index PUT + forest-flush tail — UIs that show a bar should cap at
/// <100% until their own completion signal. Pass `None` for `progress`
/// for behaviour identical to the wrapper above.
#[cfg(not(target_arch = "wasm32"))]
pub async fn put_object_encrypted_resumable_with_cancel_and_progress(
&self,
bucket: &str,
key: &str,
data: impl Into<Bytes>,
content_type: Option<&str>,
manifest_path: &std::path::Path,
cancel: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
progress: Option<std::sync::Arc<dyn Fn(u64, u64) + Send + Sync>>,
) -> Result<PutObjectResult> {
// Issue #16 extension: serialize the resumable write path against
// other same-bucket writers (`put_object_flat`, `flush_forest`,
Expand Down Expand Up @@ -7473,6 +7553,13 @@ impl EncryptedClient {
let semaphore = Arc::new(tokio::sync::Semaphore::new(Self::MAX_CONCURRENT_CHUNK_UPLOADS));
let mut handles = Vec::with_capacity(all_chunks.len());

// FxFiles upload-%: count-proportional cumulative progress. Each
// completed chunk advances the bar by `total/N`; emitted from inside
// the spawned tasks so it reflects real concurrent completion order.
let progress_total_bytes = original_size;
let progress_num_chunks = (num_chunks_total as u64).max(1);
let uploaded_chunks = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));

for chunk in all_chunks {
let chunk_key = ChunkedFileMetadata::chunk_key(&storage_key, chunk.index);
let chunk_key_ret = chunk_key.clone();
Expand All @@ -7499,6 +7586,11 @@ impl EncryptedClient {
// each chunk's PUT can short-circuit cooperatively.
let cancel_for_task = cancel.clone();

// FxFiles upload-%: per-task progress handles (callback + shared
// counter); `progress_total_bytes`/`progress_num_chunks` are Copy.
let progress_cb = progress.clone();
let uploaded_chunks = uploaded_chunks.clone();

let handle = tokio::spawn(async move {
// Issue #18: check BEFORE acquiring the permit so cancelled
// chunks don't even hold a concurrency slot. Tasks already
Expand Down Expand Up @@ -7545,6 +7637,18 @@ impl EncryptedClient {
),
_ => None,
};
// FxFiles upload-%: advance cumulative progress now this
// chunk's PUT completed. Count-proportional so the final
// chunk's emit equals `total_bytes` (100%).
if let Some(ref cb) = progress_cb {
let done = uploaded_chunks
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
cb(
done.saturating_mul(progress_total_bytes) / progress_num_chunks,
progress_total_bytes,
);
}
Ok::<(u32, String, Option<cid::Cid>), ClientError>((chunk_idx, chunk_key_ret, chunk_cid))
});
handles.push(handle);
Expand Down Expand Up @@ -7960,6 +8064,31 @@ impl EncryptedClient {
manifest_path: &std::path::Path,
data: &[u8],
cancel: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
) -> Result<PutObjectResult> {
// Thin wrapper preserving the pre-progress public signature
// (external test callers + the fula-flutter bridge depend on it).
self.resume_upload_with_cancel_and_progress(manifest_path, data, cancel, None)
.await
}

/// Like [`resume_upload_with_cancel`] but reports cumulative byte
/// progress as each re-uploaded chunk completes (FxFiles upload-%).
///
/// The cumulative counter is **seeded with the already-uploaded chunk
/// count** from the manifest, so the bar continues from where the
/// interrupted attempt stopped and still reaches `total_bytes` even
/// though only the remaining chunks are re-PUT. The callback runs from
/// the chunk tasks (must be `Send + Sync`); see
/// [`put_object_encrypted_resumable_with_cancel_and_progress`] for the
/// premature-100% / completion-cap note. A manifest with nothing left
/// (`remaining() == 0`) finalizes without emitting.
#[cfg(not(target_arch = "wasm32"))]
pub async fn resume_upload_with_cancel_and_progress(
&self,
manifest_path: &std::path::Path,
data: &[u8],
cancel: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
progress: Option<std::sync::Arc<dyn Fn(u64, u64) + Send + Sync>>,
) -> Result<PutObjectResult> {
// Issue #16 + #17: acquire the per-bucket write mutex AFTER an
// initial load to identify the bucket (the name lives in the
Expand Down Expand Up @@ -8089,6 +8218,18 @@ impl EncryptedClient {
let semaphore = Arc::new(tokio::sync::Semaphore::new(Self::MAX_CONCURRENT_CHUNK_UPLOADS));
let mut handles = Vec::new();

// FxFiles upload-%: seed the cumulative counter with the chunks the
// interrupted attempt already uploaded so the bar continues from
// where it stopped and still reaches `total_bytes` (only the
// remaining chunks are re-PUT below). `total_size` read here, before
// the post-upload `populate_chunk_cids` mutation; it's a Copy u64.
let progress_total_bytes = chunked_meta.total_size;
let progress_num_chunks = (manifest.num_chunks as u64).max(1);
let progress_already_uploaded =
(manifest.num_chunks as u64).saturating_sub(manifest.remaining() as u64);
let uploaded_chunks =
std::sync::Arc::new(std::sync::atomic::AtomicU64::new(progress_already_uploaded));

for mc in &manifest.chunks {
if mc.uploaded {
continue;
Expand Down Expand Up @@ -8134,6 +8275,11 @@ impl EncryptedClient {
// each chunk's PUT can short-circuit cooperatively.
let cancel_for_task = cancel.clone();

// FxFiles upload-%: per-task progress handles (callback + shared
// counter); `progress_total_bytes`/`progress_num_chunks` are Copy.
let progress_cb = progress.clone();
let uploaded_chunks = uploaded_chunks.clone();

let handle = tokio::spawn(async move {
// Issue #18: check BEFORE acquiring the permit so cancelled
// chunks don't even hold a concurrency slot. Same semantics
Expand Down Expand Up @@ -8170,6 +8316,19 @@ impl EncryptedClient {
),
_ => None,
};
// FxFiles upload-%: advance cumulative progress (seeded with
// already-uploaded chunks) now this re-uploaded chunk's PUT
// completed. Reaches `total_bytes` when the last remaining
// chunk lands.
if let Some(ref cb) = progress_cb {
let done = uploaded_chunks
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
cb(
done.saturating_mul(progress_total_bytes) / progress_num_chunks,
progress_total_bytes,
);
}
Ok::<(u32, String, Option<cid::Cid>), ClientError>((chunk_index, chunk_key_ret, chunk_cid))
});
handles.push(handle);
Expand Down
Loading
Loading