Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

439 changes: 439 additions & 0 deletions crates/fula-client/src/encryption.rs

Large diffs are not rendered by default.

154 changes: 154 additions & 0 deletions crates/fula-client/tests/streaming_upload_e2e.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
//! E2E (real server): the PUSH-model streaming upload must produce a normally
//! downloadable, BYTE-EXACT object on the real gateway. Validates P2 of
//! docs/web-streaming-resumable-upload-plan.md against production — not just the
//! hermetic mock in streaming_upload_roundtrip.rs.
//!
//! Drives the exact sequence the FRB handle uses:
//! streaming_begin -> plan-only ChunkedEncoder (pass 1) -> streaming_finalize_plan
//! -> streaming_put_chunk loop (pass 2) -> streaming_finish
//! then downloads via get_object_flat and asserts byte-exact. The file is large
//! enough (~50 MB, ~200 chunks at 256 KB) that the index metadata exceeds the
//! gateway's 16 KB header budget, so this also exercises header_safe_enc_metadata
//! stripping + the body/forest fallback on the real server.
//!
//! `#[ignore]` — needs network + real credentials. Run (PowerShell, env loaded
//! from e2e-credentials.env):
//! cargo test -p fula-client --test streaming_upload_e2e --release -- --ignored --nocapture
//!
//! Required env: FULA_S3, FULA_JWT, FULA_TEST_PROVIDER, FULA_TEST_OAUTH_SUB,
//! FULA_TEST_EMAIL (the Mode A derivation triple).

#![cfg(not(target_arch = "wasm32"))]

use fula_client::{Config, EncryptedClient, EncryptionConfig};
use fula_crypto::chunked::ChunkedEncoder;
use fula_crypto::keys::SecretKey;

fn env(name: &str) -> String {
std::env::var(name).unwrap_or_else(|_| panic!("missing required env {name}"))
}

#[tokio::test]
#[ignore = "real-server; needs FULA_S3 + FULA_JWT + Mode A triple"]
async fn streaming_upload_large_file_roundtrips_on_real_server() {
let s3 = env("FULA_S3");
let jwt = env("FULA_JWT");
let input = format!(
"{}:{}:{}",
env("FULA_TEST_PROVIDER"),
env("FULA_TEST_OAUTH_SUB"),
env("FULA_TEST_EMAIL"),
);
let kek = fula_crypto::hashing::derive_key_argon2id("fula-files-v1", input.as_bytes());
let secret = SecretKey::from_bytes(&kek).expect("32-byte secret from Argon2id");

let mut config = Config::new(&s3).with_token(&jwt);
// Match FxFiles production: stamps chunk_cids into the index metadata.
config.walkable_v8_writer_enabled = true;
let client = EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret))
.expect("EncryptedClient::new");

let epoch = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let bucket = format!("e2e-streaming-{epoch}-v8");
eprintln!("[streaming_e2e] BUCKET={bucket}");
if let Err(e) = client.create_bucket(&bucket).await {
eprintln!("[streaming_e2e] create_bucket({bucket}) -> {e} (continuing)");
}

// ~50 MB -> ~200 chunks at 256 KB -> index metadata over the 16 KB header
// budget (exercises header_safe_enc_metadata stripping + body fallback).
let size = 50 * 1024 * 1024;
let mut data = vec![0u8; size];
for (i, b) in data.iter_mut().enumerate() {
*b = ((i * 31 + 7) % 251) as u8;
}
let key_path = "/big-streamed.bin";

// ---- streaming upload (push model; mirrors the FRB handle) ----
eprintln!("[streaming_e2e] streaming_begin...");
let (storage_key, dek, wrapped_dek, kek_version) = client
.streaming_begin(&bucket, key_path)
.await
.expect("streaming_begin");

// pass 1: plan-only encoder (default 256 KB chunks, matching production),
// fed in arbitrary 1 MiB streaming slices.
let aad_prefix = format!("fula:v4:chunk:{}", storage_key);
let mut encoder =
ChunkedEncoder::with_aad(dek.clone(), aad_prefix.into_bytes()).into_plan_only();
eprintln!("[streaming_e2e] pass 1 (plan)...");
for slice in data.chunks(1024 * 1024) {
encoder.update(slice).expect("plan update");
}
let (chunked_metadata, private_meta, encrypted_meta) = client
.streaming_finalize_plan(encoder, &dek, &storage_key, key_path, Some("application/octet-stream"))
.expect("streaming_finalize_plan");

let cs = chunked_metadata.chunk_size as usize;
let num_chunks = chunked_metadata.num_chunks as usize;
eprintln!("[streaming_e2e] {num_chunks} chunks @ {cs} bytes; pass 2 (upload)...");
assert!(
num_chunks > 120,
"need >120 chunks to exceed the 16 KB header budget (got {num_chunks})"
);

// pass 2: upload each chunk from its committed nonce.
let mut chunk_cids = vec![None; num_chunks];
for i in 0..num_chunks {
let start = i * cs;
let end = ((i + 1) * cs).min(data.len());
let (_chunk_key, cid) = client
.streaming_put_chunk(
&bucket,
&storage_key,
&chunked_metadata,
i as u32,
&data[start..end],
&dek,
)
.await
.unwrap_or_else(|e| panic!("streaming_put_chunk[{i}] failed: {e}"));
chunk_cids[i] = cid;
if i % 50 == 0 {
eprintln!("[streaming_e2e] uploaded chunk {i}/{num_chunks}");
}
}

eprintln!("[streaming_e2e] streaming_finish...");
client
.streaming_finish(
&bucket,
key_path,
&storage_key,
&wrapped_dek,
&encrypted_meta,
kek_version,
&private_meta,
chunked_metadata,
chunk_cids,
)
.await
.expect("streaming_finish");
eprintln!("[streaming_e2e] upload OK");

// ---- download + verify byte-exact (the gate) ----
eprintln!("[streaming_e2e] downloading via get_object_flat...");
let downloaded = client
.get_object_flat(&bucket, key_path)
.await
.expect("get_object_flat");
assert_eq!(downloaded.len(), data.len(), "round-trip length mismatch");
assert_eq!(
downloaded.as_ref(),
data.as_slice(),
"streaming upload -> download MUST be byte-exact on the real gateway"
);
eprintln!(
"[streaming_e2e] OK: {} bytes ({} chunks) round-tripped byte-exact via streaming on the real gateway",
data.len(),
num_chunks
);
}
187 changes: 187 additions & 0 deletions crates/fula-client/tests/streaming_upload_roundtrip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
//! Hermetic round-trip test for the PUSH-model streaming upload (P2 of
//! docs/web-streaming-resumable-upload-plan.md).
//!
//! Drives the streaming methods exactly as the FRB handle will —
//! streaming_begin
//! -> plan-only ChunkedEncoder (pass 1, fed arbitrary streaming slices)
//! -> streaming_finalize_plan
//! -> streaming_put_chunk loop (pass 2, encrypt-from-stored-nonce)
//! -> streaming_finish (index + forest register + flush)
//! — against a STATEFUL wiremock that stores PUT bodies and serves them back on
//! GET, then downloads via `get_object_flat` and asserts BYTE-EXACT recovery.
//!
//! This is the P2 gate: it proves the streaming path produces a normally
//! downloadable, decryptable object. No network / no credentials.

#![cfg(not(target_arch = "wasm32"))]

use cid::multihash::Multihash;
use cid::Cid;
use fula_client::{Config, EncryptedClient, EncryptionConfig};
use fula_crypto::chunked::ChunkedEncoder;
use fula_crypto::keys::SecretKey;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use wiremock::matchers::method;
use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate};

/// BLAKE3-raw CID of `data`, returned as the ETag so the SDK's walkable-v8
/// post-PUT self-verify accepts the response.
fn blake3_raw_cid(data: &[u8]) -> Cid {
let h = blake3::hash(data);
let mh = Multihash::<64>::wrap(0x1e, h.as_bytes()).expect("blake3 multihash wrap");
Cid::new_v1(0x55, mh)
}

type Store = Arc<Mutex<HashMap<String, Vec<u8>>>>;

/// PUT: store the body keyed by URL path; ETag = BLAKE3-raw CID(body).
struct StorePut {
store: Store,
}
impl Respond for StorePut {
fn respond(&self, req: &Request) -> ResponseTemplate {
let path = req.url.path().to_string();
let cid = blake3_raw_cid(&req.body);
self.store.lock().unwrap().insert(path, req.body.clone());
ResponseTemplate::new(200).insert_header("ETag", cid.to_string())
}
}

/// GET: serve the stored body (200 + matching ETag) or 404.
struct ServeGet {
store: Store,
}
impl Respond for ServeGet {
fn respond(&self, req: &Request) -> ResponseTemplate {
match self.store.lock().unwrap().get(req.url.path()) {
Some(body) => ResponseTemplate::new(200)
.insert_header("ETag", blake3_raw_cid(body).to_string())
.set_body_bytes(body.clone()),
None => ResponseTemplate::new(404),
}
}
}

/// HEAD: 200 if stored, else 404 (cold-bootstrap probes a fresh bucket).
struct HeadProbe {
store: Store,
}
impl Respond for HeadProbe {
fn respond(&self, req: &Request) -> ResponseTemplate {
if self.store.lock().unwrap().contains_key(req.url.path()) {
ResponseTemplate::new(200)
} else {
ResponseTemplate::new(404)
}
}
}

#[tokio::test]
async fn streaming_upload_roundtrip_byte_exact() {
let server = MockServer::start().await;
let store: Store = Arc::new(Mutex::new(HashMap::new()));
Mock::given(method("PUT"))
.respond_with(StorePut { store: store.clone() })
.mount(&server)
.await;
Mock::given(method("GET"))
.respond_with(ServeGet { store: store.clone() })
.mount(&server)
.await;
Mock::given(method("HEAD"))
.respond_with(HeadProbe { store: store.clone() })
.mount(&server)
.await;
Mock::given(method("DELETE"))
.respond_with(ResponseTemplate::new(204))
.mount(&server)
.await;

let mut config = Config::new(&server.uri()).with_token("test-jwt");
config.walkable_v8_writer_enabled = true;
let secret = SecretKey::generate();
let enc_config = EncryptionConfig::from_secret_key(secret);
let client = EncryptedClient::new(config, enc_config).expect("EncryptedClient::new");

let bucket = "videos-v8";
let key = "/promo.mp4";
// Multi-chunk with a sub-chunk tail; non-trivial byte pattern.
let mut data = vec![0u8; 700 * 1024 + 123];
for (i, b) in data.iter_mut().enumerate() {
*b = (i % 251) as u8;
}

// ---- streaming upload (push model; mirrors the FRB handle) ----
let (storage_key, dek, wrapped_dek, kek_version) =
client.streaming_begin(bucket, key).await.expect("streaming_begin");

// pass 1: plan-only encoder, fed arbitrary streaming slices.
let aad_prefix = format!("fula:v4:chunk:{}", storage_key);
let mut encoder = ChunkedEncoder::with_aad_and_chunk_size(
dek.clone(),
aad_prefix.into_bytes(),
64 * 1024,
)
.into_plan_only();
for slice in data.chunks(100 * 1024) {
encoder.update(slice).expect("plan update");
}
let (chunked_metadata, private_meta, encrypted_meta) = client
.streaming_finalize_plan(encoder, &dek, &storage_key, key, Some("video/mp4"))
.expect("streaming_finalize_plan");

let cs = chunked_metadata.chunk_size as usize;
let num_chunks = chunked_metadata.num_chunks as usize;
assert!(num_chunks > 1, "test needs a multi-chunk file (got {num_chunks})");

// pass 2: upload each chunk from its committed nonce.
let mut chunk_cids = vec![None; num_chunks];
for i in 0..num_chunks {
let start = i * cs;
let end = ((i + 1) * cs).min(data.len());
let (_chunk_key, cid) = client
.streaming_put_chunk(
bucket,
&storage_key,
&chunked_metadata,
i as u32,
&data[start..end],
&dek,
)
.await
.expect("streaming_put_chunk");
chunk_cids[i] = cid;
}

client
.streaming_finish(
bucket,
key,
&storage_key,
&wrapped_dek,
&encrypted_meta,
kek_version,
&private_meta,
chunked_metadata,
chunk_cids,
)
.await
.expect("streaming_finish");

// ---- download + verify byte-exact ----
let downloaded = client
.get_object_flat(bucket, key)
.await
.expect("get_object_flat");
assert_eq!(
downloaded.len(),
data.len(),
"round-trip length mismatch"
);
assert_eq!(
downloaded.as_ref(),
data.as_slice(),
"streaming upload -> download must be byte-exact"
);
}
Loading
Loading