Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ name = "encrypted_upload_test"
path = "examples/encrypted_upload_test.rs"

[workspace.package]
version = "0.6.13"
version = "0.6.14"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/functionland/fula-api"
Expand Down
68 changes: 65 additions & 3 deletions crates/fula-client/src/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6856,7 +6856,7 @@ impl EncryptedClient {
let lock = self.bucket_write_lock(bucket);
let _guard = lock.lock().await;
let result = self
.put_object_flat_deferred_locked(bucket, key, data, content_type, None)
.put_object_flat_deferred_locked(bucket, key, data, content_type, None, None)
.await?;
self.flush_forest_locked(bucket).await?;
Ok(result)
Expand All @@ -6881,7 +6881,40 @@ impl EncryptedClient {
let lock = self.bucket_write_lock(bucket);
let _guard = lock.lock().await;
let result = self
.put_object_flat_deferred_locked(bucket, key, data, content_type, Some(progress))
.put_object_flat_deferred_locked(bucket, key, data, content_type, Some(progress), None)
.await?;
self.flush_forest_locked(bucket).await?;
Ok(result)
}

/// [`put_object_flat_with_progress`] with cooperative cancellation, for
/// web (and native) callers that need to abort an in-flight large upload.
/// `cancel` is checked between chunk PUTs: chunks already in flight
/// (≤ MAX_CONCURRENT_CHUNK_UPLOADS) finish, later chunks short-circuit, the
/// uploaded chunks are deleted, and the call returns
/// `ClientError::Cancelled`. NON-resumable: a cancelled upload restarts from
/// scratch (wasm resumable support lands separately). Mirrors the native
/// resumable path's `Arc<AtomicBool>` cancel semantics.
pub async fn put_object_flat_with_progress_cancellable(
&self,
bucket: &str,
key: &str,
data: impl Into<Bytes>,
content_type: Option<&str>,
progress: Arc<dyn Fn(u64, u64) + Send + Sync>,
cancel: Arc<std::sync::atomic::AtomicBool>,
) -> Result<PutObjectResult> {
let lock = self.bucket_write_lock(bucket);
let _guard = lock.lock().await;
let result = self
.put_object_flat_deferred_locked(
bucket,
key,
data,
content_type,
Some(progress),
Some(cancel),
)
.await?;
self.flush_forest_locked(bucket).await?;
Ok(result)
Expand Down Expand Up @@ -6917,7 +6950,7 @@ impl EncryptedClient {
) -> Result<PutObjectResult> {
let lock = self.bucket_write_lock(bucket);
let _guard = lock.lock().await;
self.put_object_flat_deferred_locked(bucket, key, data, content_type, None)
self.put_object_flat_deferred_locked(bucket, key, data, content_type, None, None)
.await
}

Expand All @@ -6935,6 +6968,7 @@ impl EncryptedClient {
data: impl Into<Bytes>,
content_type: Option<&str>,
progress: Option<Arc<dyn Fn(u64, u64) + Send + Sync>>,
cancel: Option<Arc<std::sync::atomic::AtomicBool>>,
) -> Result<PutObjectResult> {
let data = data.into();
let original_size = data.len() as u64;
Expand Down Expand Up @@ -7037,6 +7071,7 @@ impl EncryptedClient {
&encrypted_meta,
kek_version,
progress,
cancel,
).await?
} else {
// SINGLE OBJECT: File is small enough for one block
Expand Down Expand Up @@ -7277,6 +7312,10 @@ impl EncryptedClient {
encrypted_meta: &EncryptedPrivateMetadata,
kek_version: u32,
progress: Option<Arc<dyn Fn(u64, u64) + Send + Sync>>,
// Cooperative cancel (web + native). `Some` = the chunk loop checks it
// between PUTs; `None` = uncancellable (existing callers). Mirrors the
// native resumable path's `Arc<AtomicBool>` flag.
cancel: Option<Arc<std::sync::atomic::AtomicBool>>,
) -> Result<(PutObjectResult, String, Option<cid::Cid>)> {
// Create chunked encoder with AAD binding chunks to storage key
let aad_prefix = format!("fula:v4:chunk:{}", storage_key);
Expand Down Expand Up @@ -7338,8 +7377,19 @@ impl EncryptedClient {
let chunk_key_ret = chunk_key.clone();
let progress_cb = progress.clone();
let uploaded_chunks = uploaded_chunks.clone();
let cancel = cancel.clone();

async move {
// Cooperative cancel (mirrors the native resumable path): a
// chunk that hasn't started yet short-circuits when the flag is
// set; chunks already in flight (≤ MAX_CONCURRENT_CHUNK_UPLOADS)
// run to completion, and the post-loop compensating-delete
// cleans up everything that did upload.
if let Some(ref c) = cancel {
if c.load(std::sync::atomic::Ordering::Relaxed) {
return Err(ClientError::Cancelled);
}
}
// FxFiles #50: the content-chunk PUT had no retry on EITHER
// target — the native blob-backend retry only wraps forest
// nodes, not content chunks. One sporadic chunk-PUT drop
Expand Down Expand Up @@ -7450,6 +7500,18 @@ impl EncryptedClient {
return Err(err);
}

// Cancelled after the chunks uploaded but before the index PUT: the
// chunks are unreferenced (no index object points at them yet), so
// delete them and abort — same cleanup as the failure path above.
if let Some(ref c) = cancel {
if c.load(std::sync::atomic::Ordering::Relaxed) {
for key in &uploaded_keys {
let _ = self.inner.delete_object(bucket, key).await;
}
return Err(ClientError::Cancelled);
}
}

// W.9.4-A2: stamp the per-chunk CID Vec into the metadata
// BEFORE serializing the index body. When walkable_v8 is off,
// chunk_cids is all-None and `populate_chunk_cids` writes an
Expand Down
47 changes: 47 additions & 0 deletions crates/fula-client/tests/chunk_put_progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,50 @@ async fn chunked_put_reports_cumulative_progress() {
let max_done = evs.iter().map(|(d, _)| *d).max().unwrap();
assert_eq!(max_done, total, "progress must reach 100% (max cumulative == total)");
}

/// 0.6.14 wasm upload-cancel: a pre-set cancel flag must abort the chunked
/// upload with `ClientError::Cancelled` — every chunk short-circuits at the
/// closure-start check before its PUT (mirrors the native resumable cancel).
#[tokio::test]
async fn chunked_put_cancellable_aborts_when_flag_preset() {
use std::sync::atomic::AtomicBool;

let server = MockServer::start().await;
Mock::given(method("PUT")).respond_with(EtagResponder).mount(&server).await;
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
Mock::given(method("HEAD"))
.respond_with(ResponseTemplate::new(200))
.mount(&server)
.await;

let mut config = Config::new(&server.uri()).with_token("test-jwt");
config.walkable_v8_writer_enabled = true;
let secret = SecretKey::generate();
let client = EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret))
.expect("EncryptedClient::new");

// Multi-chunk file (2 MiB > 768 KB threshold).
let data = vec![0xABu8; 2 * 1024 * 1024];
// Pre-cancelled: the flag is already set before the upload starts.
let cancel = Arc::new(AtomicBool::new(true));
let progress: Arc<dyn Fn(u64, u64) + Send + Sync> = Arc::new(|_, _| {});

let result = client
.put_object_flat_with_progress_cancellable(
"videos-v8",
"/cancelled.bin",
Bytes::from(data),
Some("application/octet-stream"),
progress,
cancel,
)
.await;

assert!(
matches!(result, Err(fula_client::ClientError::Cancelled)),
"a pre-set cancel flag must abort the chunked upload with Cancelled",
);
}
31 changes: 31 additions & 0 deletions crates/fula-flutter/src/api/forest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,37 @@ pub async fn put_flat_with_progress(
Ok(result.into())
}

/// [`put_flat_with_progress`] with cooperative cancellation (web + native).
/// Non-resumable: triggering `cancel` aborts the in-flight upload (chunks
/// already in flight finish, later chunks short-circuit, uploaded chunks are
/// cleaned up) and the call returns a `Cancelled` error. A cancelled upload
/// restarts from scratch — wasm resumable support lands separately. This is
/// web's cancel path (the native resumable+manifest cancel is the
/// `*_from_path` family, which is native-only).
pub async fn put_flat_with_progress_cancellable(
client: &EncryptedClientHandle,
bucket: String,
path: String,
data: Vec<u8>,
content_type: Option<String>,
progress: &ProgressHandle,
cancel: &CancelHandle,
) -> anyhow::Result<PutResult> {
let cb = progress_cb(progress);
let guard = client.inner.write().await;
let result = guard
.put_object_flat_with_progress_cancellable(
&bucket,
&path,
Bytes::from(data),
content_type.as_deref(),
cb,
cancel.inner.clone(),
)
.await?;
Ok(result.into())
}

/// [`put_flat_resumable_from_path_cancellable`] with live progress (native).
#[cfg(not(target_arch = "wasm32"))]
pub async fn put_flat_resumable_from_path_with_progress(
Expand Down
Loading