Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ name = "encrypted_upload_test"
path = "examples/encrypted_upload_test.rs"

[workspace.package]
version = "0.6.9"
version = "0.6.10"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/functionland/fula-api"
Expand Down
4 changes: 4 additions & 0 deletions crates/fula-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ reqwest = { version = "0.12", default-features = false, features = ["json"] }
uuid = { version = "1.11", features = ["v4", "serde", "js"] }
# MultipartAbortGuard Drop impl spawns the abort future on wasm
wasm-bindgen-futures = "0.4"
# Backoff sleep for retry_idempotent on wasm — the browser has no tokio
# reactor. `futures` feature exposes TimeoutFuture (a setTimeout-backed Future)
# so the per-chunk upload retry (FxFiles #50) actually sleeps between attempts.
gloo-timers = { version = "0.3", features = ["futures"] }
# Walkable-v8 (W.9.2): `S3BlobBackend::put` parses master's PUT-response
# ETag into a `Cid` on every target so the cross-platform CID-stamping
# seam stays symmetric. cid 0.11 is wasm-compatible; default-features
Expand Down
59 changes: 42 additions & 17 deletions crates/fula-client/src/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7113,23 +7113,48 @@ impl EncryptedClient {
let chunk_key_ret = chunk_key.clone();

async move {
let put_result = if let Some(ref pin) = pinning {
client.put_object_with_metadata_and_pinning(
&bucket,
&chunk_key,
chunk.ciphertext,
Some(chunk_metadata),
&pin.endpoint,
&pin.token,
).await?
} else {
client.put_object_with_metadata(
&bucket,
&chunk_key,
chunk.ciphertext,
Some(chunk_metadata),
).await?
};
// FxFiles #50: the content-chunk PUT had no retry on EITHER
// target — the native blob-backend retry only wraps forest
// nodes, not content chunks. One sporadic chunk-PUT drop
// (ERR_CONNECTION_CLOSED / transient 5xx under the 16-wide
// concurrent burst) thus failed the whole large upload (worst
// on the web, where the retry was wasm-gated). Retry each chunk
// PUT on transient errors — safe because the chunk key is
// content-addressed (idempotent). Per-attempt clones are cheap
// (Arc / Bytes / small structs).
let ciphertext = bytes::Bytes::from(chunk.ciphertext);
let put_result = crate::multipart::retry_idempotent(4, || {
let client = client.clone();
let bucket = bucket.clone();
let chunk_key = chunk_key.clone();
let pinning = pinning.clone();
let chunk_metadata = chunk_metadata.clone();
let body = ciphertext.clone();
async move {
if let Some(ref pin) = pinning {
client
.put_object_with_metadata_and_pinning(
&bucket,
&chunk_key,
body,
Some(chunk_metadata),
&pin.endpoint,
&pin.token,
)
.await
} else {
client
.put_object_with_metadata(
&bucket,
&chunk_key,
body,
Some(chunk_metadata),
)
.await
}
}
})
.await?;
// W.9.4-A2: verify master's etag-attested CID against
// the pre-computed BLAKE3(ciphertext). Mismatch
// soft-fails to None — chunk PUT succeeded, only the
Expand Down
73 changes: 39 additions & 34 deletions crates/fula-client/src/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,54 +415,59 @@ pub async fn upload_large_file(
/// an orphan upload) or `complete_upload` (separately handled — a completed
/// upload replies `NoSuchUpload` on retry, which callers must treat as
/// success-equivalent). (NEW-F3.)
async fn retry_idempotent<F, Fut, T>(max_attempts: usize, mut op: F) -> Result<T>
pub(crate) async fn retry_idempotent<F, Fut, T>(max_attempts: usize, mut op: F) -> Result<T>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
// WASM target has no reliable sleep primitive available from this crate
// (no tokio reactor / gloo-timers dep), so a retry loop would hammer the
// server in a tight cycle — worse than no retry. Collapse to
// single-attempt on WASM; native callers get the backoff path.
#[cfg(target_arch = "wasm32")]
{
let _ = max_attempts;
return op().await;
}
#[cfg(not(target_arch = "wasm32"))]
{
let mut attempt = 0usize;
loop {
attempt += 1;
match op().await {
Ok(v) => return Ok(v),
Err(e) if attempt >= max_attempts || !is_transient(&e) => return Err(e),
Err(e) => {
let backoff_ms = std::cmp::min(5_000u64, 100u64 * (1u64 << (attempt as u32 - 1)));
tracing::debug!(
attempt,
max_attempts,
backoff_ms,
error = %e,
"multipart: transient error, retrying"
);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
}
// Unified retry for native AND wasm — the only per-target difference is the
// sleep primitive (see [`retry_backoff_sleep`]). wasm previously collapsed
// to a single attempt for lack of a sleep dep, which left the chunked
// content-upload path (`encryption.rs::put_object_chunked_internal`) with
// NO retry on the web: one sporadic chunk-PUT drop (ERR_CONNECTION_CLOSED /
// transient 5xx under the 16-wide concurrent burst) failed the whole large
// upload while small (1-chunk) files were fine (FxFiles #50). gloo-timers
// gives wasm a real backoff so the retry actually fires there.
let mut attempt = 0usize;
loop {
attempt += 1;
match op().await {
Ok(v) => return Ok(v),
Err(e) if attempt >= max_attempts || !is_transient(&e) => return Err(e),
Err(e) => {
let backoff_ms = std::cmp::min(5_000u64, 100u64 * (1u64 << (attempt as u32 - 1)));
tracing::debug!(
attempt,
max_attempts,
backoff_ms,
error = %e,
"retry_idempotent: transient error, retrying"
);
retry_backoff_sleep(backoff_ms).await;
}
}
}
}

/// Backoff sleep for [`retry_idempotent`]. tokio on native; gloo-timers on
/// wasm32 — the browser has no tokio reactor and the wasm build's tokio
/// feature set excludes `tokio::time`.
async fn retry_backoff_sleep(ms: u64) {
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(Duration::from_millis(ms)).await;
#[cfg(target_arch = "wasm32")]
gloo_timers::future::TimeoutFuture::new(ms as u32).await;
}

/// Classify a ClientError as retry-safe. Network-level failures (connect,
/// timeout, decode) are always transient. S3-reported statuses 429/500/502/503/504
/// are transient; everything else (auth, 4xx policy errors, malformed response)
/// is not.
///
/// Native-only: `retry_idempotent` short-circuits to a single attempt on
/// wasm32 (no sleep primitive), so the classifier has no caller there.
/// Shared with `S3BlobBackend::{get, put}` in `encryption.rs` so the
/// blob-backend retry loop matches the same transient set.
#[cfg(not(target_arch = "wasm32"))]
/// Used on BOTH targets now: `retry_idempotent` retries on wasm too (FxFiles
/// #50), and it is shared with `S3BlobBackend::{get, put}` in `encryption.rs`
/// so the blob-backend retry loop matches the same transient set. The body is
/// wasm-aware — reqwest's `is_connect` is native-only (see the cfg inside).
pub(crate) fn is_transient(err: &ClientError) -> bool {
match err {
ClientError::Http(e) => {
Expand Down
113 changes: 113 additions & 0 deletions crates/fula-client/tests/chunk_put_retries_transient.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! Regression test (FxFiles issue #50 follow-up): content-chunk PUTs must be
//! retried on a transient failure — on web AND native.
//!
//! The chunked content-upload path
//! `put_object_flat` → `put_object_chunked_internal`
//! → `FulaClient::put_object_with_metadata` → `request()`
//! does a single `send()` with NO retry on either target. The native
//! blob-backend retry loop (`S3BlobBackend::put`) only wraps the
//! `__fula_forest_v7_nodes/` class, and `retry_idempotent` only wraps the
//! S3-multipart path — neither covers the per-file content chunks. So a single
//! sporadic chunk-PUT drop (`ERR_CONNECTION_CLOSED` / 503 under the 16-wide
//! concurrent burst) fails the whole large upload.
//!
//! This drives a >768 KB (therefore chunked) encrypted upload through a
//! wiremock master that injects exactly ONE 503 on the first content/chunk PUT
//! and asserts the upload still succeeds. RED before the fix (no chunk retry),
//! GREEN after wrapping the chunk PUT in `retry_idempotent`.

#![cfg(not(target_arch = "wasm32"))]

use bytes::Bytes;
use cid::multihash::Multihash;
use cid::Cid;
use fula_client::{Config, EncryptedClient, EncryptionConfig};
use fula_crypto::keys::SecretKey;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use wiremock::matchers::method;
use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate};

/// BLAKE3-raw CID of the body, returned as the ETag so the SDK's W.9.3 /
/// W.9.4 self-verify accepts the (retried) PUT response.
fn blake3_raw_cid(data: &[u8]) -> Cid {
let h = blake3::hash(data);
let mh = Multihash::<64>::wrap(0x1e, h.as_bytes()).expect("blake3 multihash wrap");
Cid::new_v1(0x55, mh)
}

/// Injects ONE 503 on the first CONTENT PUT (the content chunks upload before
/// the per-file index object, and forest blobs live under `__fula_forest_*`,
/// so the first non-forest PUT is a chunk). Everything else gets 200 + a
/// matching ETag.
struct FlakeFirstChunkPut {
injected: Arc<AtomicBool>,
injected_count: Arc<AtomicUsize>,
}

impl Respond for FlakeFirstChunkPut {
fn respond(&self, req: &Request) -> ResponseTemplate {
let is_forest = req.url.path().contains("__fula_forest_");
if !is_forest && !self.injected.swap(true, Ordering::SeqCst) {
self.injected_count.fetch_add(1, Ordering::SeqCst);
return ResponseTemplate::new(503);
}
let cid = blake3_raw_cid(&req.body);
ResponseTemplate::new(200).insert_header("ETag", cid.to_string())
}
}

#[tokio::test]
async fn chunked_upload_survives_one_transient_chunk_put() {
let server = MockServer::start().await;
let injected = Arc::new(AtomicBool::new(false));
let injected_count = Arc::new(AtomicUsize::new(0));

Mock::given(method("PUT"))
.respond_with(FlakeFirstChunkPut {
injected: injected.clone(),
injected_count: injected_count.clone(),
})
.mount(&server)
.await;
// Fresh bucket → 404 GET → cold v7 forest bootstrap.
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
// The pre-fix failure path best-effort DELETEs already-uploaded chunks.
Mock::given(method("DELETE"))
.respond_with(ResponseTemplate::new(204))
.mount(&server)
.await;
Mock::given(method("HEAD"))
.respond_with(ResponseTemplate::new(200))
.mount(&server)
.await;

let mut config = Config::new(&server.uri()).with_token("test-jwt");
config.walkable_v8_writer_enabled = true;
let secret = SecretKey::generate();
let enc_config = EncryptionConfig::from_secret_key(secret);
let client = EncryptedClient::new(config, enc_config).expect("EncryptedClient::new");

// 2 MiB > CHUNKED_THRESHOLD (768 KB) ⇒ chunked upload: several concurrent
// chunk PUTs, one of which gets the injected 503.
let data = vec![0xABu8; 2 * 1024 * 1024];

let result = client
.put_object_flat("videos-v8", "/promo.mp4", Bytes::from(data), Some("video/mp4"))
.await;

assert_eq!(
injected_count.load(Ordering::SeqCst),
1,
"test must inject exactly one transient chunk 503"
);
assert!(
result.is_ok(),
"chunked upload must survive one transient chunk-PUT 503 via per-chunk \
retry; got error: {:?}",
result.err()
);
}
Loading
Loading