diff --git a/Cargo.lock b/Cargo.lock index 2aa7788..3cf2b43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1457,6 +1457,20 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "ecdsa" +version = "0.16.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee27f32b5c5292967d2d4a9d7f1e0b0aed2c15daded5a60300e4abb9d8020bca" +dependencies = [ + "der", + "digest", + "elliptic-curve", + "rfc6979", + "signature", + "spki", +] + [[package]] name = "ed25519" version = "2.2.3" @@ -1504,6 +1518,7 @@ dependencies = [ "generic-array", "group", "hkdf", + "pkcs8", "rand_core 0.6.4", "sec1", "subtle", @@ -1827,6 +1842,7 @@ dependencies = [ "hex", "hyper 1.8.1", "jsonwebtoken", + "k256", "mime", "mime_guess", "oauth2", @@ -1839,6 +1855,7 @@ dependencies = [ "serde", "serde_json", "sha2", + "sha3", "sqlx", "tempfile", "test-log", @@ -2982,6 +2999,20 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "k256" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6e3919bbaa2945715f0bb6d3934a173d1e9a59ac23767fbaaef277265a7411b" +dependencies = [ + "cfg-if", + "ecdsa", + "elliptic-curve", + "once_cell", + "sha2", + "signature", +] + [[package]] name = "keccak" version = "0.1.5" @@ -4527,6 +4558,16 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "rfc6979" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dd2a808d456c4a54e300a23e9f5a67e122c3024119acbfd73e3bf664491cb2" +dependencies = [ + "hmac", + "subtle", +] + [[package]] name = "ring" version = "0.17.14" @@ -4739,6 +4780,7 @@ dependencies = [ "base16ct", "der", "generic-array", + "pkcs8", "subtle", "zeroize", ] diff --git a/crates/fula-cli/Cargo.toml b/crates/fula-cli/Cargo.toml index 870e97f..3952dd8 100644 --- a/crates/fula-cli/Cargo.toml +++ b/crates/fula-cli/Cargo.toml @@ -90,6 +90,11 @@ reqwest = { workspace = true } # block-by-cid). Same workspace sqlx the project already pins; used ONLY when # FULA_PINS_DATABASE_URL is configured. sqlx = { workspace = true } +# FM-4 (Phase 2.5): EIP-712 wallet-signature auth — secp256k1 signature +# recovery (RustCrypto; pure Rust) + keccak256. Only exercised behind +# FULA_EIP712_AUTH; chosen over ethers/alloy to keep the dependency minimal. +k256 = { version = "0.13", features = ["ecdsa"] } +sha3 = "0.10" [dev-dependencies] criterion = { workspace = true } diff --git a/crates/fula-cli/src/auth_eip712.rs b/crates/fula-cli/src/auth_eip712.rs new file mode 100644 index 0000000..445eb83 --- /dev/null +++ b/crates/fula-cli/src/auth_eip712.rs @@ -0,0 +1,299 @@ +//! FM-4 (Phase 2.5) — portable identity: EIP-712 wallet-signature auth. +//! +//! Federated masters must authenticate the SAME user identically without +//! sharing master-local secrets. A wallet signature is self-certifying: +//! any master can verify it offline (no JWT secret, no session table), so +//! one identity works everywhere — and it is the same wallet the staking / +//! MasterRegistry phases key on. +//! +//! Token format (sent as the Bearer value; the prefix routes it): +//! +//! fula-eip712.. +//! +//! payload JSON: {"wallet":"0x..","iat":,"exp":} +//! +//! The signature is an EIP-712 typed signature over: +//! domain = { name: "Fula Gateway", version: "1" } (chain-agnostic) +//! message = FulaAuth { wallet: address, iat: uint256, exp: uint256 } +//! +//! Verification: recover the secp256k1 signer from the digest, require it to +//! equal `payload.wallet`, require `iat - 300 ≤ now ≤ exp` and a lifetime of +//! at most 24 h. The resulting session's user id IS the lowercase wallet +//! address — every master derives the same hashed user id from it. +//! +//! Scope: fixed to `storage:*`. Replay within the validity window is +//! accepted (bearer-token semantics, same as a JWT); short `exp` is the +//! mitigation, and the existing revocation deny-list applies to the raw +//! token string unchanged. +//! +//! Additive: the legacy JWT/session path is untouched; this path only +//! activates behind `FULA_EIP712_AUTH` AND the `fula-eip712.` prefix. +//! +//! Known v0 limitation (documented in #32): per-PUT registry pinning +//! forwards the bearer to the pinning service, which only knows ITS issued +//! sessions — an EIP-712 bearer 401s there, so registry persistence +//! requires the pinning service to accept this scheme too (follow-up). +//! Storage-only Stage-B operators (no pinning service) are unaffected. + +use crate::error::ApiError; +use crate::state::UserSession; +use base64::engine::general_purpose::URL_SAFE_NO_PAD; +use base64::Engine; +use k256::ecdsa::{RecoveryId, Signature, VerifyingKey}; +use serde::Deserialize; +use sha3::{Digest, Keccak256}; + +pub const TOKEN_PREFIX: &str = "fula-eip712."; + +/// Maximum allowed token lifetime (exp - iat). +const MAX_LIFETIME_SECS: i64 = 24 * 60 * 60; +/// Allowed clock skew on `iat`. +const IAT_SKEW_SECS: i64 = 300; + +#[derive(Debug, Deserialize)] +struct Eip712Payload { + wallet: String, + iat: i64, + exp: i64, +} + +fn keccak(data: &[u8]) -> [u8; 32] { + let mut h = Keccak256::new(); + h.update(data); + h.finalize().into() +} + +/// keccak256("EIP712Domain(string name,string version)") with our values. +fn domain_separator() -> [u8; 32] { + let type_hash = keccak(b"EIP712Domain(string name,string version)"); + let name_hash = keccak(b"Fula Gateway"); + let version_hash = keccak(b"1"); + let mut enc = Vec::with_capacity(96); + enc.extend_from_slice(&type_hash); + enc.extend_from_slice(&name_hash); + enc.extend_from_slice(&version_hash); + keccak(&enc) +} + +/// keccak256(typeHash ‖ wallet-as-uint256 ‖ iat ‖ exp) +fn struct_hash(wallet: &[u8; 20], iat: i64, exp: i64) -> [u8; 32] { + let type_hash = keccak(b"FulaAuth(address wallet,uint256 iat,uint256 exp)"); + let mut enc = Vec::with_capacity(128); + enc.extend_from_slice(&type_hash); + let mut addr32 = [0u8; 32]; + addr32[12..].copy_from_slice(wallet); + enc.extend_from_slice(&addr32); + let mut int32 = [0u8; 32]; + int32[24..].copy_from_slice(&(iat as u64).to_be_bytes()); + enc.extend_from_slice(&int32); + let mut exp32 = [0u8; 32]; + exp32[24..].copy_from_slice(&(exp as u64).to_be_bytes()); + enc.extend_from_slice(&exp32); + keccak(&enc) +} + +/// The EIP-712 signing digest: keccak256(0x1901 ‖ domainSeparator ‖ structHash). +pub fn signing_digest(wallet: &[u8; 20], iat: i64, exp: i64) -> [u8; 32] { + let mut enc = Vec::with_capacity(66); + enc.extend_from_slice(&[0x19, 0x01]); + enc.extend_from_slice(&domain_separator()); + enc.extend_from_slice(&struct_hash(wallet, iat, exp)); + keccak(&enc) +} + +fn parse_wallet(s: &str) -> Result<[u8; 20], ApiError> { + let hexpart = s.strip_prefix("0x").unwrap_or(s); + let bytes = hex::decode(hexpart) + .map_err(|_| bad("wallet is not valid hex"))?; + bytes + .try_into() + .map_err(|_| bad("wallet must be 20 bytes")) +} + +fn bad(msg: &str) -> ApiError { + ApiError::s3(crate::error::S3ErrorCode::InvalidToken, msg) +} + +/// Verify a `fula-eip712.` bearer and mint the portable session. +pub fn verify_eip712_token(token: &str) -> Result { + let rest = token + .strip_prefix(TOKEN_PREFIX) + .ok_or_else(|| bad("not an eip712 token"))?; + let (payload_b64, sig_b64) = rest + .split_once('.') + .ok_or_else(|| bad("malformed eip712 token (need payload.signature)"))?; + + let payload_bytes = URL_SAFE_NO_PAD + .decode(payload_b64) + .map_err(|_| bad("payload is not base64url"))?; + let payload: Eip712Payload = + serde_json::from_slice(&payload_bytes).map_err(|_| bad("payload is not valid JSON"))?; + + let now = chrono::Utc::now().timestamp(); + if payload.exp <= now { + return Err(bad("eip712 token expired")); + } + if payload.iat > now + IAT_SKEW_SECS { + return Err(bad("eip712 token iat is in the future")); + } + if payload.exp - payload.iat > MAX_LIFETIME_SECS { + return Err(bad("eip712 token lifetime exceeds 24h")); + } + + let wallet = parse_wallet(&payload.wallet)?; + let sig_bytes = URL_SAFE_NO_PAD + .decode(sig_b64) + .map_err(|_| bad("signature is not base64url"))?; + if sig_bytes.len() != 65 { + return Err(bad("signature must be 65 bytes (r‖s‖v)")); + } + let v = sig_bytes[64]; + let rec_id = RecoveryId::try_from(match v { + 0 | 1 => v, + 27 | 28 => v - 27, + _ => return Err(bad("invalid recovery id")), + }) + .map_err(|_| bad("invalid recovery id"))?; + let signature = + Signature::from_slice(&sig_bytes[..64]).map_err(|_| bad("invalid signature bytes"))?; + + let digest = signing_digest(&wallet, payload.iat, payload.exp); + let recovered = VerifyingKey::recover_from_prehash(&digest, &signature, rec_id) + .map_err(|_| bad("signature recovery failed"))?; + + // Ethereum address = last 20 bytes of keccak256(uncompressed pubkey[1..]). + let pub_uncompressed = recovered.to_encoded_point(false); + let addr_hash = keccak(&pub_uncompressed.as_bytes()[1..]); + let recovered_addr: [u8; 20] = addr_hash[12..].try_into().expect("20 bytes"); + + if recovered_addr != wallet { + return Err(bad("signature does not match the claimed wallet")); + } + + let wallet_lower = format!("0x{}", hex::encode(wallet)); + let expires_at = chrono::DateTime::from_timestamp(payload.exp, 0) + .unwrap_or_else(|| chrono::Utc::now() + chrono::Duration::hours(1)); + + Ok(UserSession::new( + wallet_lower, + None, + vec!["storage:*".to_string()], + expires_at, + token.to_string(), + )) +} + +#[cfg(test)] +mod tests { + use super::*; + use k256::ecdsa::SigningKey; + + fn test_key() -> SigningKey { + SigningKey::from_slice(&[0x42u8; 32]).expect("static test key") + } + + fn addr_of(key: &SigningKey) -> [u8; 20] { + let pubkey = key.verifying_key().to_encoded_point(false); + let h = keccak(&pubkey.as_bytes()[1..]); + h[12..].try_into().unwrap() + } + + fn mint(key: &SigningKey, wallet: [u8; 20], iat: i64, exp: i64) -> String { + let digest = signing_digest(&wallet, iat, exp); + let (sig, rec) = key.sign_prehash_recoverable(&digest).unwrap(); + let mut sig65 = sig.to_bytes().to_vec(); + sig65.push(rec.to_byte()); + let payload = serde_json::json!({ + "wallet": format!("0x{}", hex::encode(wallet)), + "iat": iat, + "exp": exp, + }); + format!( + "{}{}.{}", + TOKEN_PREFIX, + URL_SAFE_NO_PAD.encode(payload.to_string()), + URL_SAFE_NO_PAD.encode(sig65) + ) + } + + #[test] + fn valid_signature_yields_portable_wallet_session() { + let key = test_key(); + let wallet = addr_of(&key); + let now = chrono::Utc::now().timestamp(); + let token = mint(&key, wallet, now - 10, now + 3600); + + let session = verify_eip712_token(&token).expect("valid token"); + assert_eq!(session.user_id, format!("0x{}", hex::encode(wallet))); + assert!(session.can_write(), "storage:* must grant write"); + } + + #[test] + fn wrong_wallet_claim_is_rejected() { + let key = test_key(); + let now = chrono::Utc::now().timestamp(); + // Sign for OUR address but claim a different wallet in the payload. + let claimed = [0x11u8; 20]; + let digest = signing_digest(&claimed, now - 10, now + 3600); + let (sig, rec) = key.sign_prehash_recoverable(&digest).unwrap(); + let mut sig65 = sig.to_bytes().to_vec(); + sig65.push(rec.to_byte()); + let payload = serde_json::json!({ + "wallet": format!("0x{}", hex::encode(claimed)), + "iat": now - 10, + "exp": now + 3600, + }); + let token = format!( + "{}{}.{}", + TOKEN_PREFIX, + URL_SAFE_NO_PAD.encode(payload.to_string()), + URL_SAFE_NO_PAD.encode(sig65) + ); + assert!(verify_eip712_token(&token).is_err()); + } + + #[test] + fn tampered_payload_is_rejected() { + let key = test_key(); + let wallet = addr_of(&key); + let now = chrono::Utc::now().timestamp(); + let token = mint(&key, wallet, now - 10, now + 3600); + // Re-encode with a longer exp but the ORIGINAL signature. + let parts: Vec<&str> = token.trim_start_matches(TOKEN_PREFIX).split('.').collect(); + let forged_payload = serde_json::json!({ + "wallet": format!("0x{}", hex::encode(wallet)), + "iat": now - 10, + "exp": now + 7200, + }); + let forged = format!( + "{}{}.{}", + TOKEN_PREFIX, + URL_SAFE_NO_PAD.encode(forged_payload.to_string()), + parts[1] + ); + assert!(verify_eip712_token(&forged).is_err()); + } + + #[test] + fn expired_and_overlong_tokens_are_rejected() { + let key = test_key(); + let wallet = addr_of(&key); + let now = chrono::Utc::now().timestamp(); + assert!(verify_eip712_token(&mint(&key, wallet, now - 7200, now - 3600)).is_err()); + assert!(verify_eip712_token(&mint(&key, wallet, now, now + 90_000)).is_err()); + } + + #[test] + fn same_token_verifies_identically_with_no_local_state() { + // The portability property: verification uses NOTHING but the token — + // two "masters" (two verify calls with no shared setup) agree. + let key = test_key(); + let wallet = addr_of(&key); + let now = chrono::Utc::now().timestamp(); + let token = mint(&key, wallet, now - 10, now + 600); + let a = verify_eip712_token(&token).unwrap(); + let b = verify_eip712_token(&token).unwrap(); + assert_eq!(a.user_id, b.user_id); + assert_eq!(a.hashed_user_id, b.hashed_user_id); + } +} diff --git a/crates/fula-cli/src/config.rs b/crates/fula-cli/src/config.rs index 0720620..8fd10ef 100644 --- a/crates/fula-cli/src/config.rs +++ b/crates/fula-cli/src/config.rs @@ -58,6 +58,25 @@ pub struct GatewayConfig { pub cors_origins: Vec, /// Path to store bucket registry CID for persistence pub registry_cid_path: Option, + /// Phase 2 (decentralized ingress): accept empty-body chunk PUTs carrying + /// `x-amz-meta-fula-remote-cid` — the chunk's verified bytes were already + /// streamed to a fula-ingest node; this master records only the + /// key→cid mapping (after confirming block presence via the blockstore). + /// Advertised to clients via GET /fula/capabilities so old/flag-off + /// masters are never sent the protocol. Default OFF. + #[serde(default)] + pub remote_cid_put_enabled: bool, + /// FM-1 (Phase 2.5): arbitrate bucket-root flushes through the shared + /// Postgres so CONCURRENT federated masters can serve writes without + /// lost updates. Needs the pins-DB (POSTGRES_* env). Default OFF. + #[serde(default)] + pub bucket_root_cas_enabled: bool, + /// FM-4 (Phase 2.5): accept self-certifying EIP-712 wallet-signature + /// bearers (`fula-eip712.` prefix) — one identity on every federated + /// master with no shared secret. Additive; legacy JWT auth untouched. + /// Default OFF. + #[serde(default)] + pub eip712_auth_enabled: bool, /// Storage API URL for balance/quota checking before uploads pub storage_api_url: Option, /// Admin JWT secret for admin API authentication (separate from user JWT) @@ -198,6 +217,9 @@ impl Default for GatewayConfig { cors_enabled: true, cors_origins: vec!["*".to_string()], registry_cid_path: Some("/var/lib/fula-gateway/registry.cid".to_string()), + remote_cid_put_enabled: false, + bucket_root_cas_enabled: false, + eip712_auth_enabled: false, storage_api_url: None, admin_jwt_secret: None, admin_api_enabled: false, diff --git a/crates/fula-cli/src/handlers/object.rs b/crates/fula-cli/src/handlers/object.rs index 6aa9c9b..262fd80 100644 --- a/crates/fula-cli/src/handlers/object.rs +++ b/crates/fula-cli/src/handlers/object.rs @@ -129,8 +129,104 @@ pub async fn put_object( } } - // Store the data - let cid = state.block_store.put_block(&body).await?; + // Phase 2 (decentralized ingress): empty-body mapping PUT. The chunk's + // verified bytes were already streamed to a fula-ingest node; record only + // the key→cid mapping here. Triple-gated: server flag (advertised via + // /fula/capabilities, so well-behaved clients never send this to a + // flag-off master), the header, and an empty body. The declared CID must + // be raw(0x55)+blake3(0x1e) — the only addressing the ingest verifies — + // and the block must be PRESENT in the blockstore (bitswap pulls it from + // the ingest peer); absent ⇒ retryable 409 so the client falls back to a + // full-bytes PUT. Storing nothing here is the point: the CID is + // self-certifying, so reads stay tamper-evident end-to-end. + let remote_cid_hdr = headers + .get("x-amz-meta-fula-remote-cid") + .and_then(|v| v.to_str().ok()) + .map(str::to_owned); + let cid = if let Some(declared) = remote_cid_hdr { + if !state.config.remote_cid_put_enabled { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "remote-cid PUT is not enabled on this master (probe /fula/capabilities)", + )); + } + if !body.is_empty() { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "remote-cid PUT requires an empty body", + )); + } + let declared_cid = cid::Cid::try_from(declared.as_str()).map_err(|e| { + ApiError::s3(S3ErrorCode::InvalidRequest, format!("invalid remote cid: {e}")) + })?; + if declared_cid.codec() != 0x55 || declared_cid.hash().code() != 0x1e { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "remote cid must be raw (0x55) + blake3 (0x1e) — the addressing fula-ingest verifies", + )); + } + // BOUNDED presence check: kubo's block lookup for an ABSENT cid is an + // unbounded network search (bitswap/DHT) — without a deadline this + // handler would hang for minutes per missing block. 5s is ample for a + // bitswap pull from the ingest peer (same box: instant; LAN/WAN peer: + // one round-trip); timeout ⇒ treat as absent ⇒ retryable 409, the + // client falls back to a full-bytes PUT. + match tokio::time::timeout( + std::time::Duration::from_secs(5), + state.block_store.has_block(&declared_cid), + ) + .await + { + Ok(Ok(true)) => {} + Ok(Ok(false)) => { + return Err(ApiError::s3( + S3ErrorCode::OperationAborted, + "declared block not present/retrievable yet — retry or fall back to a full-bytes PUT", + )); + } + Ok(Err(e)) => { + tracing::warn!(error = %e, cid = %declared_cid, "remote-cid presence check failed"); + return Err(ApiError::s3( + S3ErrorCode::OperationAborted, + "block presence check failed — retry or fall back to a full-bytes PUT", + )); + } + Err(_elapsed) => { + tracing::debug!(cid = %declared_cid, "remote-cid presence check timed out (treating as absent)"); + return Err(ApiError::s3( + S3ErrorCode::OperationAborted, + "block presence check timed out — retry or fall back to a full-bytes PUT", + )); + } + } + tracing::debug!(bucket = %bucket_name, key = %key, cid = %declared_cid, "remote-cid PUT accepted (bytes via ingest)"); + declared_cid + } else { + // Store the data + state.block_store.put_block(&body).await? + }; + // Phase 2: a mapping PUT has an empty body — the chunk's TRUE size (as + // stored on the ingest node) arrives in x-amz-meta-fula-remote-size so + // billing/list/stat record the real byte count, never 0. Only honored on + // the remote-cid path; bounded by max_body_size like a real body. + let object_size: u64 = match ( + headers.get("x-amz-meta-fula-remote-size").and_then(|v| v.to_str().ok()), + headers.get("x-amz-meta-fula-remote-cid").is_some(), + ) { + (Some(sz), true) => { + let parsed = sz.parse::().map_err(|_| { + ApiError::s3(S3ErrorCode::InvalidRequest, "invalid x-amz-meta-fula-remote-size") + })?; + if parsed == 0 || parsed > state.config.max_body_size as u64 { + return Err(ApiError::s3( + S3ErrorCode::InvalidRequest, + "x-amz-meta-fula-remote-size out of range", + )); + } + parsed + } + _ => body.len() as u64, + }; // Use CID as ETag (content-addressed identifier) // This is S3-compliant: AWS docs state "The ETag may or may not be an MD5 digest" @@ -165,7 +261,7 @@ pub async fn put_object( // COPY handler at line 694). Access control is unaffected — bucket- // level access is gated by `BucketMetadata.owner_id` which is // already correctly the hashed form. - let mut metadata = ObjectMetadata::new(cid, body.len() as u64, etag.clone()) + let mut metadata = ObjectMetadata::new(cid, object_size, etag.clone()) .with_owner(&session.hashed_user_id); if let Some(ct) = content_type { @@ -1399,6 +1495,11 @@ pub(crate) const FULA_CONTROL_HEADERS: &[&str] = &[ // as user_metadata on the object itself (would pollute every // forest-manifest object with a meaningless `=1` tag). "fula-forest-manifest", + // Phase 2 (decentralized ingress): remote-cid mapping PUT controls — + // consumed by the handler (declared CID + true byte size of the chunk + // stored on the ingest node); never persisted as object metadata. + "fula-remote-cid", + "fula-remote-size", ]; /// Returns `true` if the given x-amz-meta key (already stripped of diff --git a/crates/fula-cli/src/handlers/service.rs b/crates/fula-cli/src/handlers/service.rs index d8e70b4..cc84521 100644 --- a/crates/fula-cli/src/handlers/service.rs +++ b/crates/fula-cli/src/handlers/service.rs @@ -46,3 +46,16 @@ pub async fn health_check() -> impl IntoResponse { pub async fn healthz() -> impl IntoResponse { (StatusCode::OK, "ok") } + +/// GET /fula/capabilities — unauthenticated protocol-capability advertisement +/// (Phase 2). Clients probe this ONCE per instance before using optional +/// protocols. An old master 404s here, so a client never sends e.g. the +/// empty-body remote-cid PUT to a build that would misstore it as a real +/// zero-byte object. +pub async fn capabilities(State(state): State>) -> impl IntoResponse { + axum::Json(serde_json::json!({ + "remoteCidPut": state.config.remote_cid_put_enabled, + // FM-4: clients/operators can discover wallet-auth support the same way. + "eip712Auth": state.config.eip712_auth_enabled, + })) +} diff --git a/crates/fula-cli/src/lib.rs b/crates/fula-cli/src/lib.rs index d8e9a72..de7a86e 100644 --- a/crates/fula-cli/src/lib.rs +++ b/crates/fula-cli/src/lib.rs @@ -33,6 +33,7 @@ //! ``` pub mod auth; +pub mod auth_eip712; pub mod config; pub mod entries_store; pub mod error; @@ -49,6 +50,7 @@ pub mod pin_queue; pub mod pinning; pub mod recovery_fallback; pub mod revocation; +pub mod root_store_pg; pub mod routes; pub mod server; pub mod state; diff --git a/crates/fula-cli/src/main.rs b/crates/fula-cli/src/main.rs index bd1a8c1..03a0a75 100644 --- a/crates/fula-cli/src/main.rs +++ b/crates/fula-cli/src/main.rs @@ -92,6 +92,24 @@ struct Args { /// the local-retain backlog (the ongoing per-upload protection still runs). #[arg(long, env = "FULA_NO_LOCAL_RETAIN_BACKFILL")] no_local_retain_backfill: bool, + + /// Phase 2 (decentralized ingress): accept empty-body chunk PUTs carrying + /// x-amz-meta-fula-remote-cid (bytes pre-stored on a fula-ingest node). + /// Advertised to clients via GET /fula/capabilities. Default OFF. + #[arg(long, env = "FULA_REMOTE_CID_PUT")] + remote_cid_put: bool, + + /// FM-1 (Phase 2.5): arbitrate bucket-root flushes through the shared + /// Postgres (POSTGRES_* env) so concurrent federated masters never lose + /// each other's writes. Default OFF. + #[arg(long, env = "FULA_BUCKET_ROOT_CAS")] + bucket_root_cas: bool, + + /// FM-4 (Phase 2.5): accept EIP-712 wallet-signature bearers + /// (fula-eip712. prefix) — portable identity across federated masters. + /// Default OFF. + #[arg(long, env = "FULA_EIP712_AUTH")] + eip712_auth: bool, } #[tokio::main] @@ -163,6 +181,9 @@ async fn main() -> anyhow::Result<()> { cluster_peering_enabled: if args.no_cluster_peering { Some(false) } else { None }, local_retain_enabled: if args.no_local_retain { Some(false) } else { None }, local_retain_backfill: if args.no_local_retain_backfill { Some(false) } else { None }, + remote_cid_put_enabled: args.remote_cid_put, + bucket_root_cas_enabled: args.bucket_root_cas, + eip712_auth_enabled: args.eip712_auth, ..Default::default() }; diff --git a/crates/fula-cli/src/middleware.rs b/crates/fula-cli/src/middleware.rs index 68e1534..642df3b 100644 --- a/crates/fula-cli/src/middleware.rs +++ b/crates/fula-cli/src/middleware.rs @@ -73,17 +73,30 @@ pub async fn auth_middleware( // Extract JWT from either Bearer token or AWS Sig V4 format let token = extract_token_from_header(header, request.headers())?; - let secret = state.config.jwt_secret.as_ref() - .ok_or_else(|| ApiError::s3(S3ErrorCode::InternalError, "JWT secret not configured"))?; - - let claims = validate_token(&token, secret)?; - // Audit F3: honor a manually-revoked key. No-op unless the revocation - // deny-list is enabled (env switch + pins DB). Deny-list + fail-open, - // so a currently-valid token is never rejected. `token` is the raw - // JWT — the issuer hashes the same string into `api_keys.key_hash`. - crate::revocation::ensure_not_revoked(state.revocation.as_deref(), &token)?; - // Pass the raw JWT token to the session for forwarding to pinning service - claims_to_session(claims, token) + // FM-4 (Phase 2.5): portable wallet identity. Self-certifying — + // no master-local secret involved, so the SAME token works on + // every federated master. Additive: only the `fula-eip712.` + // prefix routes here, and only behind the flag; the legacy JWT + // path below is untouched. + if state.config.eip712_auth_enabled + && token.starts_with(crate::auth_eip712::TOKEN_PREFIX) + { + let session = crate::auth_eip712::verify_eip712_token(&token)?; + crate::revocation::ensure_not_revoked(state.revocation.as_deref(), &token)?; + session + } else { + let secret = state.config.jwt_secret.as_ref() + .ok_or_else(|| ApiError::s3(S3ErrorCode::InternalError, "JWT secret not configured"))?; + + let claims = validate_token(&token, secret)?; + // Audit F3: honor a manually-revoked key. No-op unless the revocation + // deny-list is enabled (env switch + pins DB). Deny-list + fail-open, + // so a currently-valid token is never rejected. `token` is the raw + // JWT — the issuer hashes the same string into `api_keys.key_hash`. + crate::revocation::ensure_not_revoked(state.revocation.as_deref(), &token)?; + // Pass the raw JWT token to the session for forwarding to pinning service + claims_to_session(claims, token) + } } None => { return Err(ApiError::s3( diff --git a/crates/fula-cli/src/root_store_pg.rs b/crates/fula-cli/src/root_store_pg.rs new file mode 100644 index 0000000..7b30cd2 --- /dev/null +++ b/crates/fula-cli/src/root_store_pg.rs @@ -0,0 +1,93 @@ +//! FM-1 (Phase 2.5) — Postgres-backed shared bucket-root arbiter. +//! +//! The Stage-A federated masters already share one Postgres (billing, +//! sessions, pins); the same database is the lowest-infrastructure arbiter +//! for bucket-root CAS. Table (pinning-service migration 020): +//! +//! bucket_roots(owner_id TEXT, bucket TEXT, root_cid TEXT NOT NULL, +//! version BIGINT NOT NULL DEFAULT 1, updated_at TIMESTAMPTZ, +//! PRIMARY KEY (owner_id, bucket)) +//! +//! CAS semantics: one statement, race-safe under READ COMMITTED — +//! INSERT .. ON CONFLICT (owner_id,bucket) DO UPDATE +//! SET root_cid = $new, version = version+1, updated_at = now() +//! WHERE bucket_roots.root_cid = $expected +//! rows_affected == 1 ⇒ Won (fresh row claimed, or expected matched); +//! rows_affected == 0 ⇒ another master moved the pointer ⇒ Conflict. +//! +//! Reuses the AppState `pins_db` sqlx pool — zero new dependencies. + +use async_trait::async_trait; +use cid::Cid; +use fula_core::root_pointer::{CasOutcome, RootPointerStore}; +use fula_core::{CoreError, Result}; +use sqlx::PgPool; + +pub struct PgRootStore { + pool: PgPool, +} + +impl PgRootStore { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl RootPointerStore for PgRootStore { + async fn cas_root( + &self, + owner_id: &str, + bucket: &str, + expected: &Cid, + new: &Cid, + ) -> Result { + let res = sqlx::query( + r#" + INSERT INTO bucket_roots (owner_id, bucket, root_cid, version, updated_at) + VALUES ($1, $2, $3, 1, NOW()) + ON CONFLICT (owner_id, bucket) DO UPDATE + SET root_cid = $3, version = bucket_roots.version + 1, updated_at = NOW() + WHERE bucket_roots.root_cid = $4 + "#, + ) + .bind(owner_id) + .bind(bucket) + .bind(new.to_string()) + .bind(expected.to_string()) + .execute(&self.pool) + .await + .map_err(|e| CoreError::StorageError(format!("bucket_roots CAS: {e}")))?; + + if res.rows_affected() == 1 { + return Ok(CasOutcome::Won); + } + + // Lost the race — report the pointer another master holds now. + let current: Option = sqlx::query_scalar( + "SELECT root_cid FROM bucket_roots WHERE owner_id = $1 AND bucket = $2", + ) + .bind(owner_id) + .bind(bucket) + .fetch_optional(&self.pool) + .await + .map_err(|e| CoreError::StorageError(format!("bucket_roots read: {e}")))?; + + Ok(CasOutcome::Conflict { + current: current.and_then(|s| s.parse::().ok()), + }) + } + + async fn get_root(&self, owner_id: &str, bucket: &str) -> Result> { + let current: Option = sqlx::query_scalar( + "SELECT root_cid FROM bucket_roots WHERE owner_id = $1 AND bucket = $2", + ) + .bind(owner_id) + .bind(bucket) + .fetch_optional(&self.pool) + .await + .map_err(|e| CoreError::StorageError(format!("bucket_roots read: {e}")))?; + + Ok(current.and_then(|s| s.parse::().ok())) + } +} diff --git a/crates/fula-cli/src/routes.rs b/crates/fula-cli/src/routes.rs index 09b256d..cd7c054 100644 --- a/crates/fula-cli/src/routes.rs +++ b/crates/fula-cli/src/routes.rs @@ -24,7 +24,12 @@ pub fn create_router(state: Arc) -> Router { let cors = create_cors_layer(&state.config.cors_origins); // Public routes that must bypass auth (e.g., container health checks) - let public = Router::new().route("/healthz", get(handlers::healthz)); + let public = Router::new() + .route("/healthz", get(handlers::healthz)) + // Phase 2: capability probe (no auth — the answer is not sensitive and + // clients must be able to probe before any authenticated operation). + .route("/fula/capabilities", get(handlers::capabilities)) + .with_state(state.clone()); // Phase 3.2 internal endpoints. Bearer-token-protected at the // handler level (see handlers::internal::authenticate). They diff --git a/crates/fula-cli/src/state.rs b/crates/fula-cli/src/state.rs index 7eede1a..611dccd 100644 --- a/crates/fula-cli/src/state.rs +++ b/crates/fula-cli/src/state.rs @@ -320,6 +320,21 @@ impl AppState { } }; + // FM-1 (Phase 2.5) — shared bucket-root CAS across federated masters. + // Requires BOTH the flag AND the shared Postgres (the same pool the + // recovery endpoints use). Flag on without a DB ⇒ loud warning + + // single-master behavior (never silently multi-master-unsafe). + if config.bucket_root_cas_enabled { + if let Some(ref pool) = pins_db { + bucket_manager.set_root_pointer_store(std::sync::Arc::new( + crate::root_store_pg::PgRootStore::new(pool.clone()), + )); + info!("✓ Bucket-root CAS enabled (FM-1): shared Postgres arbitrates multi-master flushes"); + } else { + warn!("FULA_BUCKET_ROOT_CAS is on but no Postgres is configured (POSTGRES_* / FULA_PINS_DATABASE_URL) — running WITHOUT shared root arbitration"); + } + } + // Audit F3 — JWT revocation deny-list. Allocated EMPTY here (= allow // all) only when the env switch is set AND a pins DB exists; the // background refresher (`revocation::spawn_if_enabled`, started in diff --git a/crates/fula-cli/tests/root_store_pg_it.rs b/crates/fula-cli/tests/root_store_pg_it.rs new file mode 100644 index 0000000..0c1efed --- /dev/null +++ b/crates/fula-cli/tests/root_store_pg_it.rs @@ -0,0 +1,118 @@ +//! FM-1 (Phase 2.5) — PgRootStore integration test against a REAL Postgres. +//! +//! Proves the shared bucket-root arbiter that makes concurrent federated +//! masters safe behaves correctly against the actual database + migration 020 +//! `bucket_roots` table — the exact CAS two live gateways perform on flush. +//! +//! Skips cleanly when POSTGRES_* is unset (local dev); the Phase 2.5 drill +//! runs it on the test master's stack DB. +//! +//! Run: POSTGRES_HOST=127.0.0.1 POSTGRES_DB=pinning_service \ +//! POSTGRES_USER=pinning_user POSTGRES_PASSWORD=… \ +//! cargo test -p fula-cli --test root_store_pg_it -- --ignored --nocapture + +#![cfg(not(target_arch = "wasm32"))] + +use cid::Cid; +use fula_core::root_pointer::{CasOutcome, RootPointerStore}; +use fula_cli::root_store_pg::PgRootStore; +use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; + +fn cid_of(b: &[u8]) -> Cid { + let h = blake3::hash(b); + let mh = cid::multihash::Multihash::<64>::wrap(0x1e, h.as_bytes()).unwrap(); + Cid::new_v1(0x55, mh) +} + +async fn pool_or_skip() -> Option { + let env = |k: &str| std::env::var(k).ok().filter(|s| !s.trim().is_empty()); + let (host, db, user) = (env("POSTGRES_HOST")?, env("POSTGRES_DB")?, env("POSTGRES_USER")?); + let port = env("POSTGRES_PORT").and_then(|s| s.parse().ok()).unwrap_or(5432u16); + let pass = std::env::var("POSTGRES_PASSWORD").unwrap_or_default(); + let opts = PgConnectOptions::new().host(&host).port(port).database(&db).username(&user).password(&pass); + match PgPoolOptions::new().max_connections(4).connect_with(opts).await { + Ok(p) => Some(p), + Err(e) => { + eprintln!("SKIP: cannot reach Postgres: {e}"); + None + } + } +} + +#[tokio::test] +#[ignore] +async fn pg_cas_arbitrates_the_two_master_race() { + let Some(pool) = pool_or_skip().await else { return }; + let owner = "p25-it-owner"; + let bucket = format!("p25-it-bkt-{}", std::process::id()); + // Clean any prior row for a deterministic run. + let _ = sqlx::query("DELETE FROM bucket_roots WHERE owner_id = $1 AND bucket = $2") + .bind(owner).bind(&bucket).execute(&pool).await; + + let store = PgRootStore::new(pool.clone()); + let (a, b, c) = (cid_of(b"r-a"), cid_of(b"r-b"), cid_of(b"r-c")); + + // First flush claims the slot (no row yet — expected==new is the bootstrap). + assert_eq!(store.cas_root(owner, &bucket, &a, &b).await.unwrap(), CasOutcome::Won); + assert_eq!(store.get_root(owner, &bucket).await.unwrap(), Some(b), "shared root is now B"); + + // Master #2 built on the STALE root A → must conflict, reporting current=B. + match store.cas_root(owner, &bucket, &a, &c).await.unwrap() { + CasOutcome::Conflict { current } => assert_eq!(current, Some(b)), + other => panic!("stale flush must conflict, got {other:?}"), + } + // The pointer is unchanged by the losing CAS. + assert_eq!(store.get_root(owner, &bucket).await.unwrap(), Some(b)); + + // Loser reopens at the shared root B and retries → wins. + assert_eq!(store.cas_root(owner, &bucket, &b, &c).await.unwrap(), CasOutcome::Won); + assert_eq!(store.get_root(owner, &bucket).await.unwrap(), Some(c)); + + // version incremented across the two winning CASes (claim=1, then +1). + let version: i64 = sqlx::query_scalar("SELECT version FROM bucket_roots WHERE owner_id=$1 AND bucket=$2") + .bind(owner).bind(&bucket).fetch_one(&pool).await.unwrap(); + assert_eq!(version, 2, "two winning CASes ⇒ version 2"); + + let _ = sqlx::query("DELETE FROM bucket_roots WHERE owner_id = $1 AND bucket = $2") + .bind(owner).bind(&bucket).execute(&pool).await; +} + +/// Concurrency: fire N tasks that all build on the SAME root; EXACTLY ONE may +/// win — the rest must conflict. This is the real two-master race, collapsed +/// into one process hitting the real Postgres. +#[tokio::test] +#[ignore] +async fn pg_cas_admits_exactly_one_concurrent_winner() { + let Some(pool) = pool_or_skip().await else { return }; + let owner = "p25-it-owner"; + let bucket = format!("p25-it-conc-{}", std::process::id()); + let _ = sqlx::query("DELETE FROM bucket_roots WHERE owner_id=$1 AND bucket=$2") + .bind(owner).bind(&bucket).execute(&pool).await; + + let base = cid_of(b"base"); + // Establish the base root. + PgRootStore::new(pool.clone()).cas_root(owner, &bucket, &base, &base).await.unwrap(); + + // 8 racers, each proposing a distinct new root from `base`. + let mut handles = Vec::new(); + for i in 0..8u8 { + let pool = pool.clone(); + let bucket = bucket.clone(); + let base = base; + handles.push(tokio::spawn(async move { + let store = PgRootStore::new(pool); + let new = cid_of(&[i; 8]); + store.cas_root(owner, &bucket, &base, &new).await.unwrap() + })); + } + let mut wins = 0; + for h in handles { + if matches!(h.await.unwrap(), CasOutcome::Won) { + wins += 1; + } + } + assert_eq!(wins, 1, "exactly one concurrent CAS from the same base may win"); + + let _ = sqlx::query("DELETE FROM bucket_roots WHERE owner_id=$1 AND bucket=$2") + .bind(owner).bind(&bucket).execute(&pool).await; +} diff --git a/crates/fula-client/src/client.rs b/crates/fula-client/src/client.rs index 1b2086b..f67791a 100644 --- a/crates/fula-client/src/client.rs +++ b/crates/fula-client/src/client.rs @@ -522,6 +522,24 @@ impl FulaClient { }) } + /// Phase 2 (decentralized ingress): one-shot master capability probe. + /// Returns whether the master accepts empty-body remote-cid mapping PUTs + /// (GET /fula/capabilities → {"remoteCidPut": bool}). ANY failure — 404 on + /// an old build, network error, unparseable body — returns `false`, so + /// callers degrade to the legacy full-bytes path, never to a protocol the + /// master would misinterpret. + pub async fn supports_remote_cid_put(&self) -> bool { + match self.request("GET", "/fula/capabilities", None, None, None).await { + Ok(resp) => resp + .json::() + .await + .ok() + .and_then(|v| v.get("remoteCidPut").and_then(|b| b.as_bool())) + .unwrap_or(false), + Err(_) => false, + } + } + /// Put an object with metadata and optional If-Match / If-None-Match guards. /// /// Used by conditional-write paths (e.g. forest flush) to detect concurrent diff --git a/crates/fula-client/src/config.rs b/crates/fula-client/src/config.rs index dfa4980..1660340 100644 --- a/crates/fula-client/src/config.rs +++ b/crates/fula-client/src/config.rs @@ -195,6 +195,18 @@ pub struct Config { /// targeted regressions or backward-compat tests). pub walkable_v8_writer_enabled: bool, + /// Phase 2 (decentralized ingress) — optional fula-ingest endpoints. + /// When non-empty AND `walkable_v8_writer_enabled` AND the master + /// advertises `remoteCidPut` via GET /fula/capabilities (probed once per + /// chunked upload), chunk ciphertext BYTES are streamed to the first + /// ingest endpoint (`PUT /v0/block?cid=` — the node verifies + /// the CID before storing) and the master receives only an empty-body + /// mapping PUT (`x-amz-meta-fula-remote-cid` + true size). ANY failure on + /// that route falls back transparently to the legacy full-bytes PUT. + /// Empty (default) = behavior byte-identical to before. Native-only + /// (ignored on wasm32). + pub ingest_endpoints: Vec, + /// Phase 19 — optional health-status callback. When set, the SDK /// invokes this closure on every Up↔Down transition of the /// master health gate (`MasterHealthEvent::Online` / @@ -330,6 +342,8 @@ impl Default for Config { // flipping master-side gates until SDK adoption reaches the // % they're comfortable with for the pre-v0.6 reader cost. walkable_v8_writer_enabled: true, + // Phase 2 — no ingest nodes by default (legacy byte path). + ingest_endpoints: Vec::new(), // Phase 19 — no callback by default (silent gate). health_callback: None, // E2E plan Phase 4 — Mode B/C signed-entry writer disabled @@ -358,6 +372,13 @@ impl Config { self } + /// Phase 2: route chunk bytes through fula-ingest node(s) when the master + /// supports remote-cid mapping PUTs. Empty = legacy behavior. + pub fn with_ingest_endpoints(mut self, endpoints: Vec) -> Self { + self.ingest_endpoints = endpoints; + self + } + /// Enable encryption pub fn with_encryption(mut self) -> Self { self.encryption_enabled = true; diff --git a/crates/fula-client/src/encryption.rs b/crates/fula-client/src/encryption.rs index 522aa0b..45cef3d 100644 --- a/crates/fula-client/src/encryption.rs +++ b/crates/fula-client/src/encryption.rs @@ -7083,6 +7083,31 @@ impl EncryptedClient { // (skip_serializing_if keeps it off the wire). let walkable_v8 = self.inner.config().walkable_v8_writer_enabled; + // Phase 2 (decentralized ingress): decide the byte route ONCE per + // upload. The ingest route activates only when (a) ingest endpoints + // are configured, (b) the walkable-v8 writer is on (we need the + // pre-computed chunk CID to declare), and (c) the master ADVERTISES + // remote-cid support via /fula/capabilities — an old or flag-off + // master is never sent the empty-body protocol it would misstore. + // (cfg-gated: native only; wasm32 keeps the legacy path.) + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx: Option<(reqwest::Client, String, Option)> = { + let cfg = self.inner.config(); + if cfg.ingest_endpoints.is_empty() || !walkable_v8 { + None + } else if self.inner.supports_remote_cid_put().await { + tracing::debug!(ingest = %cfg.ingest_endpoints[0], "ingest byte route enabled (master advertises remoteCidPut)"); + Some(( + reqwest::Client::new(), + cfg.ingest_endpoints[0].trim_end_matches('/').to_string(), + cfg.access_token.clone(), + )) + } else { + tracing::debug!("master does not advertise remoteCidPut — ingest route disabled, using legacy byte path"); + None + } + }; + // Upload chunks in parallel with bounded concurrency. Using // futures::stream::buffer_unordered rather than tokio::spawn so the // same code runs on wasm32 (where tokio has no multi-thread runtime). @@ -7111,8 +7136,61 @@ impl EncryptedClient { let bucket = bucket.to_string(); let pinning = pinning.clone(); let chunk_key_ret = chunk_key.clone(); + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx = ingest_ctx.clone(); async move { + // Phase 2: ingest byte route. Bytes go to the fula-ingest node + // (which verifies BLAKE3(body) == declared CID before storing) + // and the master gets an empty-body mapping PUT carrying the + // declared CID + true size. ANY failure on this route falls + // through to the legacy full-bytes PUT below — degradation, + // never corruption (the master only accepts the mapping when + // its flag is on AND the block is present in its blockstore). + #[cfg(not(target_arch = "wasm32"))] + if let (Some((ingest_http, ingest_base, token)), Some(expected)) = + (ingest_ctx.as_ref(), expected_chunk_cid) + { + let url = format!("{}/v0/block?cid={}", ingest_base, expected); + let mut rb = ingest_http.put(&url).body(chunk.ciphertext.clone()); + if let Some(t) = token { + rb = rb.bearer_auth(t); + } + match rb.send().await { + Ok(r) if r.status().is_success() => { + let map_meta = ObjectMetadata::new() + .with_content_type("application/octet-stream") + .with_metadata("x-fula-chunk", "true") + .with_metadata("x-fula-chunk-index", &chunk.index.to_string()) + .with_metadata("fula-remote-cid", &expected.to_string()) + .with_metadata("fula-remote-size", &chunk.ciphertext.len().to_string()); + let map_res = if let Some(ref pin) = pinning { + client.put_object_with_metadata_and_pinning( + &bucket, &chunk_key, bytes::Bytes::new(), Some(map_meta), + &pin.endpoint, &pin.token, + ).await + } else { + client.put_object_with_metadata( + &bucket, &chunk_key, bytes::Bytes::new(), Some(map_meta), + ).await + }; + match map_res { + Ok(put_result) => { + let chunk_cid = crate::walkable_v8::verify_etag_against_expected_cid( + &put_result.etag, expected, &bucket, &chunk_key, + ); + return Ok::<(String, u32, Option), ClientError>(( + chunk_key_ret, chunk_index_for_collect, chunk_cid, + )); + } + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "remote-cid mapping PUT failed — falling back to full-bytes PUT"), + } + } + Ok(r) => tracing::debug!(status = %r.status(), chunk = %chunk_key, "ingest rejected block — falling back to full-bytes PUT"), + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "ingest unreachable — falling back to full-bytes PUT"), + } + } + let put_result = if let Some(ref pin) = pinning { client.put_object_with_metadata_and_pinning( &bucket, @@ -10519,6 +10597,28 @@ impl EncryptedClient { // master's etag-attested CID matches. let walkable_v8 = self.inner.config().walkable_v8_writer_enabled; + // Phase 2 (decentralized ingress): same probe-gated ingest route as + // `put_object_chunked_internal` — this public API duplicates that + // loop (E51 mirroring pattern), so it must mirror the route too or + // FxFiles-path uploads silently stay on the legacy byte path. + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx: Option<(reqwest::Client, String, Option)> = { + let cfg = self.inner.config(); + if cfg.ingest_endpoints.is_empty() || !walkable_v8 { + None + } else if self.inner.supports_remote_cid_put().await { + tracing::debug!(ingest = %cfg.ingest_endpoints[0], "ingest byte route enabled (master advertises remoteCidPut)"); + Some(( + reqwest::Client::new(), + cfg.ingest_endpoints[0].trim_end_matches('/').to_string(), + cfg.access_token.clone(), + )) + } else { + tracing::debug!("master does not advertise remoteCidPut — ingest route disabled, using legacy byte path"); + None + } + }; + // Upload chunks in parallel with bounded concurrency. Using // futures::stream::buffer_unordered rather than tokio::spawn so the // same code runs on wasm32 (where tokio has no multi-thread runtime). @@ -10548,8 +10648,49 @@ impl EncryptedClient { let client = self.inner.clone(); let bucket = bucket.to_string(); let chunk_key_ret = chunk_key.clone(); + #[cfg(not(target_arch = "wasm32"))] + let ingest_ctx = ingest_ctx.clone(); async move { + // Phase 2: ingest byte route (see put_object_chunked_internal + // for the full rationale). ANY failure falls through to the + // legacy full-bytes PUT below. + #[cfg(not(target_arch = "wasm32"))] + if let (Some((ingest_http, ingest_base, token)), Some(expected)) = + (ingest_ctx.as_ref(), expected_chunk_cid) + { + let url = format!("{}/v0/block?cid={}", ingest_base, expected); + let mut rb = ingest_http.put(&url).body(chunk.ciphertext.clone()); + if let Some(t) = token { + rb = rb.bearer_auth(t); + } + match rb.send().await { + Ok(r) if r.status().is_success() => { + let map_meta = ObjectMetadata::new() + .with_content_type("application/octet-stream") + .with_metadata("x-fula-chunk", "true") + .with_metadata("x-fula-chunk-index", &chunk.index.to_string()) + .with_metadata("fula-remote-cid", &expected.to_string()) + .with_metadata("fula-remote-size", &chunk.ciphertext.len().to_string()); + match client.put_object_with_metadata( + &bucket, &chunk_key, bytes::Bytes::new(), Some(map_meta), + ).await { + Ok(put_result) => { + let chunk_cid = crate::walkable_v8::verify_etag_against_expected_cid( + &put_result.etag, expected, &bucket, &chunk_key, + ); + return Ok::<(String, u32, Option), ClientError>(( + chunk_key_ret, chunk_index_for_collect, chunk_cid, + )); + } + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "remote-cid mapping PUT failed — falling back to full-bytes PUT"), + } + } + Ok(r) => tracing::debug!(status = %r.status(), chunk = %chunk_key, "ingest rejected block — falling back to full-bytes PUT"), + Err(e) => tracing::debug!(error = %e, chunk = %chunk_key, "ingest unreachable — falling back to full-bytes PUT"), + } + } + let put_result = client.put_object_with_metadata( &bucket, &chunk_key, diff --git a/crates/fula-client/tests/ingest_route.rs b/crates/fula-client/tests/ingest_route.rs new file mode 100644 index 0000000..3a27386 --- /dev/null +++ b/crates/fula-client/tests/ingest_route.rs @@ -0,0 +1,273 @@ +//! Phase 2 (decentralized ingress) — client routing matrix. +//! +//! Verifies the three load-bearing behaviors of `Config.ingest_endpoints`: +//! +//! 1. **Old master (no /fula/capabilities)** → the ingest route never +//! activates; every chunk goes to the master as a full-bytes PUT +//! (the misstore-on-old-master hazard the capability probe exists for). +//! 2. **Capable master + healthy ingest** → chunk BYTES go to the ingest +//! node; the master receives ONLY empty-body mapping PUTs carrying +//! `x-amz-meta-fula-remote-cid` (the declared blake3 CID) and +//! `x-amz-meta-fula-remote-size` (the true ciphertext size). +//! 3. **Capable master + dead ingest** → transparent fallback: the upload +//! still succeeds and the master sees full-bytes chunk PUTs. +//! +//! Chunk PUTs are identified by the `x-amz-meta-x-fula-chunk: true` header +//! the SDK stamps on every chunk (index objects / forest blobs lack it). + +#![cfg(not(target_arch = "wasm32"))] + +use bytes::Bytes; +use cid::multihash::Multihash; +use cid::Cid; +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate}; + +fn blake3_raw_cid(data: &[u8]) -> Cid { + let h = blake3::hash(data); + let mh = Multihash::<64>::wrap(0x1e, h.as_bytes()).expect("blake3 multihash wrap"); + Cid::new_v1(0x55, mh) +} + +/// Master-side recording responder. Mimics BOTH master generations: +/// for a normal bytes PUT it returns `ETag = blake3(body)` (what every +/// master does); for an empty-body mapping PUT it returns `ETag = +/// declared cid` (what a flag-on master does after its presence check). +/// Records per-class counters the assertions read back. +struct MasterResponder { + chunk_full_bodies: Arc, + chunk_mapping_puts: Arc, + mapping_size_headers: Arc>>, +} + +impl Respond for MasterResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + let is_chunk = req + .headers + .get("x-amz-meta-x-fula-chunk") + .map(|v| v.to_str().unwrap_or("") == "true") + .unwrap_or(false); + let remote_cid = req + .headers + .get("x-amz-meta-fula-remote-cid") + .and_then(|v| v.to_str().ok()) + .map(str::to_owned); + + if let Some(declared) = remote_cid { + // Mapping PUT: must be empty-bodied; echo the declared cid as ETag. + assert!( + req.body.is_empty(), + "mapping PUT must carry an EMPTY body, got {} bytes", + req.body.len() + ); + if is_chunk { + self.chunk_mapping_puts.fetch_add(1, Ordering::SeqCst); + } + if let Some(sz) = req + .headers + .get("x-amz-meta-fula-remote-size") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + { + self.mapping_size_headers.lock().unwrap().push(sz); + } + return ResponseTemplate::new(200).insert_header("ETag", declared.as_str()); + } + + if is_chunk && !req.body.is_empty() { + self.chunk_full_bodies.fetch_add(1, Ordering::SeqCst); + } + let cid = blake3_raw_cid(&req.body); + ResponseTemplate::new(200).insert_header("ETag", cid.to_string()) + } +} + +/// Ingest-side responder: verifies the declared cid matches the body +/// (what the real fula-ingest does) and counts accepted blocks + bytes. +struct IngestResponder { + accepted: Arc, + bytes_seen: Arc, +} + +impl Respond for IngestResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + let declared = req + .url + .query_pairs() + .find(|(k, _)| k == "cid") + .map(|(_, v)| v.to_string()) + .unwrap_or_default(); + let actual = blake3_raw_cid(&req.body).to_string(); + assert_eq!(declared, actual, "client must declare the true blake3 CID"); + self.accepted.fetch_add(1, Ordering::SeqCst); + self.bytes_seen.fetch_add(req.body.len(), Ordering::SeqCst); + ResponseTemplate::new(200) + .set_body_json(serde_json::json!({"cid": declared, "size": req.body.len()})) + } +} + +struct Counters { + chunk_full_bodies: Arc, + chunk_mapping_puts: Arc, + mapping_size_headers: Arc>>, + ingest_accepted: Arc, + ingest_bytes: Arc, +} + +/// Build (master, ingest, counters); `advertise_capability` controls whether +/// the master exposes /fula/capabilities (old vs new build). +async fn setup(advertise_capability: bool) -> (MockServer, MockServer, Counters) { + let master = MockServer::start().await; + let ingest = MockServer::start().await; + + let c = Counters { + chunk_full_bodies: Arc::new(AtomicUsize::new(0)), + chunk_mapping_puts: Arc::new(AtomicUsize::new(0)), + mapping_size_headers: Arc::new(Mutex::new(Vec::new())), + ingest_accepted: Arc::new(AtomicUsize::new(0)), + ingest_bytes: Arc::new(AtomicUsize::new(0)), + }; + + if advertise_capability { + Mock::given(method("GET")) + .and(path("/fula/capabilities")) + .respond_with( + ResponseTemplate::new(200).set_body_json(serde_json::json!({"remoteCidPut": true})), + ) + .mount(&master) + .await; + } + Mock::given(method("PUT")) + .respond_with(MasterResponder { + chunk_full_bodies: c.chunk_full_bodies.clone(), + chunk_mapping_puts: c.chunk_mapping_puts.clone(), + mapping_size_headers: c.mapping_size_headers.clone(), + }) + .mount(&master) + .await; + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(404)) + .mount(&master) + .await; + Mock::given(method("HEAD")) + .respond_with(ResponseTemplate::new(200)) + .mount(&master) + .await; + + Mock::given(method("PUT")) + .respond_with(IngestResponder { + accepted: c.ingest_accepted.clone(), + bytes_seen: c.ingest_bytes.clone(), + }) + .mount(&ingest) + .await; + + (master, ingest, c) +} + +fn client_for(master_uri: &str, ingest_uri: Option<&str>) -> EncryptedClient { + let mut config = Config::new(master_uri).with_token("test-jwt"); + config.walkable_v8_writer_enabled = true; + if let Some(uri) = ingest_uri { + config.ingest_endpoints = vec![uri.to_string()]; + } + let secret = SecretKey::generate(); + EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)).expect("client") +} + +/// 5 chunks of 64 KiB — well past the threshold with an explicit chunk size. +fn payload() -> Vec { + (0..(5 * 64 * 1024)).map(|i| (i % 251) as u8).collect() +} + +#[tokio::test] +async fn old_master_never_receives_the_new_protocol() { + let (master, ingest, c) = setup(false).await; + let client = client_for(&master.uri(), Some(&ingest.uri())); + + client + .put_object_chunked("bucket-a", "/file.bin", &payload(), Some(64 * 1024)) + .await + .expect("upload must succeed via legacy path"); + + assert_eq!( + c.ingest_accepted.load(Ordering::SeqCst), + 0, + "ingest must never be used when the master lacks the capability" + ); + assert_eq!(c.chunk_mapping_puts.load(Ordering::SeqCst), 0); + assert!( + c.chunk_full_bodies.load(Ordering::SeqCst) >= 5, + "all chunks must arrive at the master as full bodies" + ); +} + +#[tokio::test] +async fn capable_master_routes_bytes_through_ingest() { + let (master, ingest, c) = setup(true).await; + let client = client_for(&master.uri(), Some(&ingest.uri())); + + let data = payload(); + client + .put_object_chunked("bucket-b", "/file.bin", &data, Some(64 * 1024)) + .await + .expect("upload must succeed via ingest route"); + + let accepted = c.ingest_accepted.load(Ordering::SeqCst); + let mappings = c.chunk_mapping_puts.load(Ordering::SeqCst); + assert!(accepted >= 5, "chunk bytes must hit the ingest node (got {accepted})"); + assert_eq!( + c.chunk_full_bodies.load(Ordering::SeqCst), + 0, + "no chunk should arrive at the master as a full body" + ); + assert_eq!(accepted, mappings, "every ingest-accepted chunk needs its mapping PUT"); + // The true ciphertext sizes (plaintext + AEAD overhead) must be declared. + let sizes = c.mapping_size_headers.lock().unwrap(); + assert_eq!(sizes.len(), mappings); + assert!(sizes.iter().all(|&s| s > 0), "remote-size must never be 0"); + let total_ingest_bytes = c.ingest_bytes.load(Ordering::SeqCst) as u64; + assert_eq!( + sizes.iter().sum::(), + total_ingest_bytes, + "declared sizes must equal the bytes the ingest node stored" + ); +} + +#[tokio::test] +async fn dead_ingest_falls_back_to_full_bytes() { + let (master, _ingest, c) = setup(true).await; + // Point at a port nothing listens on — connection refused, fast. + let client = client_for(&master.uri(), Some("http://127.0.0.1:9")); + + client + .put_object_chunked("bucket-c", "/file.bin", &payload(), Some(64 * 1024)) + .await + .expect("upload must SUCCEED despite the dead ingest (transparent fallback)"); + + assert_eq!(c.chunk_mapping_puts.load(Ordering::SeqCst), 0); + assert!( + c.chunk_full_bodies.load(Ordering::SeqCst) >= 5, + "fallback must deliver every chunk as a full-bytes PUT" + ); +} + +#[tokio::test] +async fn no_ingest_configured_is_byte_identical_legacy() { + let (master, ingest, c) = setup(true).await; + let client = client_for(&master.uri(), None); + + client + .put_object_chunked("bucket-d", "/file.bin", &payload(), Some(64 * 1024)) + .await + .expect("legacy upload"); + + assert_eq!(c.ingest_accepted.load(Ordering::SeqCst), 0); + assert_eq!(c.chunk_mapping_puts.load(Ordering::SeqCst), 0); + assert!(c.chunk_full_bodies.load(Ordering::SeqCst) >= 5); + drop(ingest); +} diff --git a/crates/fula-client/tests/live_ingest_e2e.rs b/crates/fula-client/tests/live_ingest_e2e.rs new file mode 100644 index 0000000..47d2a21 --- /dev/null +++ b/crates/fula-client/tests/live_ingest_e2e.rs @@ -0,0 +1,137 @@ +//! Phase 2 — LIVE ingest-route e2e (requires a running master + fula-ingest). +//! +//! Env (skip-without, offline_e2e convention): +//! FULA_S3 master base URL (e.g. http://127.0.0.1:9000) +//! FULA_JWT bearer for the master (HS256 with the master's secret) +//! FULA_INGEST fula-ingest base URL (e.g. http://127.0.0.1:3601) +//! FULA_BIG=1 also run the ≥1 GiB case (writes a temp file) +//! +//! Run: cargo test -p fula-client --test live_ingest_e2e --release -- --ignored --nocapture +//! +//! The runner script (fula-ota tests/e2e/phase-2/60-fidelity.sh) asserts the +//! ingest container's accepted-block log count INCREASES across this test — +//! the server-side proof that bytes flowed via the ingest node, not the +//! gateway. + +#![cfg(not(target_arch = "wasm32"))] + +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; + +fn env_or_skip(key: &str) -> Option { + match std::env::var(key) { + Ok(v) if !v.is_empty() => Some(v), + _ => { + eprintln!("SKIP: {key} not set"); + None + } + } +} + +fn client(s3: &str, jwt: &str, ingest: Option<&str>, v8: bool) -> EncryptedClient { + let mut config = Config::new(s3).with_token(jwt); + config.walkable_v8_writer_enabled = v8; + if let Some(i) = ingest { + config.ingest_endpoints = vec![i.to_string()]; + } + let secret = SecretKey::generate(); + EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)).expect("client") +} + +fn payload(len: usize) -> Vec { + (0..len).map(|i| ((i * 31 + 7) % 251) as u8).collect() +} + +/// Chunked upload routed via the ingest node, read back through the master — +/// full live round-trip of the Phase 2 byte path (client CID on). +#[tokio::test] +#[ignore] +async fn live_chunked_via_ingest_round_trip() { + let (Some(s3), Some(jwt), Some(ingest)) = ( + env_or_skip("FULA_S3"), + env_or_skip("FULA_JWT"), + env_or_skip("FULA_INGEST"), + ) else { + return; + }; + let c = client(&s3, &jwt, Some(&ingest), true); + let bucket = "p2-live-ingest"; + let key = "/ingest/round-trip.bin"; + // >768 KiB → put_object_flat dispatches into put_object_chunked_internal + // (the FxFiles photo/video path) where the ingest route lives. + let data = payload(1_500_000); + + c.put_object_flat(bucket, key, data.clone(), Some("application/octet-stream")) + .await + .expect("chunked flat upload via ingest route"); + + let got = c + .get_object_flat(bucket, key) + .await + .expect("download after ingest-routed upload"); + assert_eq!(got.len(), data.len(), "length mismatch"); + assert_eq!( + blake3::hash(&got), + blake3::hash(&data), + "content mismatch after ingest-routed round trip" + ); +} + +/// Same upload with the v8 writer OFF — the ingest route must self-disable +/// (no pre-computed CIDs to declare) and the legacy path must round-trip. +#[tokio::test] +#[ignore] +async fn live_chunked_v8_off_legacy_round_trip() { + let (Some(s3), Some(jwt)) = (env_or_skip("FULA_S3"), env_or_skip("FULA_JWT")) else { + return; + }; + let ingest = std::env::var("FULA_INGEST").ok(); + let c = client(&s3, &jwt, ingest.as_deref(), false); + let bucket = "p2-live-legacy"; + let key = "/legacy/round-trip.bin"; + let data = payload(1_200_000); + + c.put_object_flat(bucket, key, data.clone(), Some("application/octet-stream")) + .await + .expect("chunked flat upload, v8 off"); + let got = c.get_object_flat(bucket, key).await.expect("download"); + assert_eq!(blake3::hash(&got), blake3::hash(&data)); +} + +/// ≥1 GiB chunked upload via the ingest route (scale invariant: thousands of +/// chunks, bounded memory on the service side). Gated on FULA_BIG=1. +#[tokio::test] +#[ignore] +async fn live_1gib_chunked_via_ingest() { + if std::env::var("FULA_BIG").ok().as_deref() != Some("1") { + eprintln!("SKIP: FULA_BIG != 1"); + return; + } + let (Some(s3), Some(jwt), Some(ingest)) = ( + env_or_skip("FULA_S3"), + env_or_skip("FULA_JWT"), + env_or_skip("FULA_INGEST"), + ) else { + return; + }; + let c = client(&s3, &jwt, Some(&ingest), true); + let bucket = "p2-live-big"; + let key = "/big/one-gib.bin"; + + const GIB: usize = 1 << 30; + let data = payload(GIB); // ~4096 chunks at 256 KiB + let want = blake3::hash(&data); + + let started = std::time::Instant::now(); + // MOVE the payload — holding payload + a clone is >2 GiB in this one + // process and OOM-kills the run on the 7.8 GB e2e box (run #4, SIGKILL). + // `want` already carries everything the verification needs. + c.put_object_flat(bucket, key, data, Some("application/octet-stream")) + .await + .expect("1 GiB chunked flat upload via ingest"); + eprintln!("1 GiB upload took {:?}", started.elapsed()); + + let got = c.get_object_flat(bucket, key).await.expect("1 GiB download"); + assert_eq!(got.len(), GIB); + assert_eq!(blake3::hash(&got), want, "1 GiB content mismatch"); +} diff --git a/crates/fula-core/src/bucket.rs b/crates/fula-core/src/bucket.rs index 06e95f0..61606df 100644 --- a/crates/fula-core/src/bucket.rs +++ b/crates/fula-core/src/bucket.rs @@ -47,6 +47,12 @@ pub struct Bucket { metadata_cache: Option>>, /// Key used in the metadata cache (may differ from metadata.name for user-scoped buckets) cache_key: Option, + /// FM-1 (Phase 2.5): shared multi-master root arbiter. `None` (default) + /// = single-master behavior, byte-identical to before. Set via + /// [`set_root_pointer_store`](Self::set_root_pointer_store) when the + /// operator enables the flag; `flush()` then CASes the shared pointer + /// before publishing the new root. + root_store: Option>, } impl Bucket { @@ -74,6 +80,7 @@ impl Bucket { config, metadata_cache: None, cache_key: None, + root_store: None, }) } @@ -91,6 +98,7 @@ impl Bucket { config, metadata_cache, cache_key: None, + root_store: None, }) } @@ -110,9 +118,20 @@ impl Bucket { config, metadata_cache, cache_key: Some(cache_key), + root_store: None, }) } + /// FM-1: attach the shared multi-master root arbiter (see + /// [`crate::root_pointer`]). Called by `BucketManager` right after + /// opening when the operator enabled bucket-root CAS. + pub fn set_root_pointer_store( + &mut self, + store: Arc, + ) { + self.root_store = Some(store); + } + /// Get bucket name pub fn name(&self) -> &str { &self.metadata.name @@ -262,7 +281,37 @@ impl Bucket { /// Flush changes and return the new root CID pub async fn flush(&mut self) -> Result { + // FM-1: capture the root this bucket was OPENED at — the value the + // shared CAS must still hold for this flush to win the multi-master + // race. + let old_root = self.metadata.root_cid; + let root_cid = self.index.flush().await?; + + // FM-1 (Phase 2.5): with a shared arbiter attached, the new root is + // published to ALL masters atomically BEFORE this process's caches + // learn it. Losing the race means another master flushed this bucket + // after we opened it — surfacing as PreconditionFailed keeps the + // contract identical to conditional writes (retryable; SDKs already + // re-open + retry on 412). No-op when old == new (nothing changed). + if let Some(ref arbiter) = self.root_store { + if old_root != root_cid { + use crate::root_pointer::CasOutcome; + match arbiter + .cas_root(&self.metadata.owner_id, &self.metadata.name, &old_root, &root_cid) + .await? + { + CasOutcome::Won => {} + CasOutcome::Conflict { current } => { + return Err(CoreError::PreconditionFailed(format!( + "bucket '{}' was modified by another master (shared root is {:?}, this flush built on {}) — reopen and retry", + self.metadata.name, current, old_root + ))); + } + } + } + } + self.metadata.root_cid = root_cid; // Update the metadata cache if we have a reference to it @@ -430,6 +479,12 @@ pub struct BucketManager { /// registry.cid file from concurrent overwrites and coalesces the /// IPLD write when many mutations fire at once. registry_persist_lock: Arc>, + /// FM-1 (Phase 2.5): shared multi-master root arbiter, attached to every + /// bucket opened for writing. Unset (default) = single-master behavior. + /// `OnceLock` so it can be wired AFTER the manager is shared in an `Arc` + /// (the Postgres pool is constructed later in AppState init); reads are + /// lock-free. + root_pointer_store: std::sync::OnceLock>, } impl BucketManager { @@ -444,6 +499,7 @@ impl BucketManager { dirty: std::sync::atomic::AtomicBool::new(false), bucket_write_locks: Arc::new(DashMap::new()), registry_persist_lock: Arc::new(tokio::sync::Mutex::new(())), + root_pointer_store: std::sync::OnceLock::new(), } } @@ -458,9 +514,23 @@ impl BucketManager { dirty: std::sync::atomic::AtomicBool::new(false), bucket_write_locks: Arc::new(DashMap::new()), registry_persist_lock: Arc::new(tokio::sync::Mutex::new(())), + root_pointer_store: std::sync::OnceLock::new(), } } + /// FM-1 (Phase 2.5): enable shared multi-master root arbitration. Every + /// bucket opened via [`open_bucket_for_user`](Self::open_bucket_for_user) + /// afterwards (a) opens at the SHARED root when another master has moved + /// it past this process's cache, and (b) CASes the shared pointer on + /// flush. Call once at startup when the operator flag is on (set-once; + /// later calls are ignored). + pub fn set_root_pointer_store( + &self, + store: Arc, + ) { + let _ = self.root_pointer_store.set(store); + } + /// Rebuild the name_index from the current buckets DashMap fn rebuild_name_index(&self) { self.name_index.clear(); @@ -1135,21 +1205,51 @@ impl BucketManager { let internal_key = Self::scoped_bucket_key(user_id, name); tracing::debug!(bucket_name = %name, internal_key = %internal_key, "Opening user-scoped bucket"); - let metadata = self.buckets.get(&internal_key) + let mut metadata = self.buckets.get(&internal_key) .map(|r| r.clone()) .ok_or_else(|| { tracing::error!(bucket_name = %name, internal_key = %internal_key, "User-scoped bucket not found"); CoreError::BucketNotFound(name.to_string()) })?; + // FM-1 (Phase 2.5): another master may have flushed this bucket past + // our in-process cache. Open at the SHARED root so this write builds + // on the latest state instead of forking from a stale one (a stale + // open would only lose the CAS at flush — correct but wasteful). + if let Some(arbiter) = self.root_pointer_store.get() { + match arbiter.get_root(user_id, name).await { + Ok(Some(shared_root)) if shared_root != metadata.root_cid => { + tracing::debug!( + bucket = %name, + cached = %metadata.root_cid, + shared = %shared_root, + "opening at the shared root (another master moved it)" + ); + metadata.root_cid = shared_root; + // Keep the cache truthful for read paths too. + self.buckets.insert(internal_key.clone(), metadata.clone()); + } + Ok(_) => {} + Err(e) => { + // Arbiter unreachable: opening at the cached root is safe — + // the flush-side CAS still arbitrates. Log and continue. + tracing::warn!(error = %e, bucket = %name, "shared-root lookup failed; opening at cached root"); + } + } + } + // Use load_with_cache_key so flush() updates the correct entry - Bucket::load_with_cache_key( + let mut bucket = Bucket::load_with_cache_key( metadata, Arc::clone(&self.store), self.default_config.clone(), Some(Arc::clone(&self.buckets)), internal_key, - ).await + ).await?; + if let Some(arbiter) = self.root_pointer_store.get() { + bucket.set_root_pointer_store(Arc::clone(arbiter)); + } + Ok(bucket) } /// Delete a bucket for a specific user diff --git a/crates/fula-core/src/lib.rs b/crates/fula-core/src/lib.rs index f348e5b..eba7fa8 100644 --- a/crates/fula-core/src/lib.rs +++ b/crates/fula-core/src/lib.rs @@ -27,9 +27,11 @@ pub mod crdt; pub mod error; pub mod metadata; pub mod prolly; +pub mod root_pointer; pub use bucket::{Bucket, BucketConfig, BucketManager, BucketRegistry}; pub use error::{CoreError, Result}; +pub use root_pointer::{CasOutcome, RootPointerStore}; pub use metadata::{ObjectMetadata, EncryptionMetadata, StorageClass}; pub use prolly::{ProllyTree, ProllyNode, ProllyConfig}; diff --git a/crates/fula-core/src/root_pointer.rs b/crates/fula-core/src/root_pointer.rs new file mode 100644 index 0000000..016d268 --- /dev/null +++ b/crates/fula-core/src/root_pointer.rs @@ -0,0 +1,145 @@ +//! FM-1 (Phase 2.5, federated masters) — shared bucket-root arbitration. +//! +//! With more than one gateway serving writes, the per-bucket root pointer +//! must live somewhere ALL masters can compare-and-swap, or concurrent +//! flushes silently drop each other's objects (the in-process +//! `bucket_write_locks` only serialize within one process). This trait is +//! that arbiter, deliberately storage-agnostic: fula-core never learns what +//! backs it (fula-cli injects a Postgres implementation — the Stage-A +//! masters already share one database — and later phases may swap in a +//! chain-backed store without touching this crate). +//! +//! Wiring (when the operator enables the flag): +//! * `Bucket::flush()` CASes `(owner, bucket): old_root -> new_root` +//! BEFORE publishing the new root to the in-process metadata cache. +//! A lost race surfaces as `CoreError::PreconditionFailed` — the same +//! retryable contract as conditional writes, which client SDKs +//! already handle. +//! * `BucketManager::open_bucket_for_user()` consults `get_root` so a +//! bucket modified by ANOTHER master is opened at the shared root, +//! not this process's stale cache. +//! +//! Disabled (no store injected — the default): behavior is byte-identical +//! to today's single-master code. + +use crate::error::Result; +use async_trait::async_trait; +use cid::Cid; + +/// Outcome of a compare-and-swap on the shared root pointer. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CasOutcome { + /// This master won: the shared pointer now holds the new root. + Won, + /// Another master moved the pointer first. `current` is the root the + /// shared store holds now (None if the row vanished — treat as retry). + Conflict { current: Option }, +} + +/// Shared, multi-master-visible bucket-root pointer store. +#[async_trait] +pub trait RootPointerStore: Send + Sync { + /// Atomically set `(owner_id, bucket)` to `new` iff the stored root is + /// `expected` (or no row exists yet — first flush claims the slot). + async fn cas_root( + &self, + owner_id: &str, + bucket: &str, + expected: &Cid, + new: &Cid, + ) -> Result; + + /// The shared root for `(owner_id, bucket)`, if any master has flushed. + async fn get_root(&self, owner_id: &str, bucket: &str) -> Result>; +} + +#[cfg(test)] +mod tests { + use super::test_support::InMemoryRootStore; + use super::*; + + /// Distinct (raw, blake3-coded) CIDs from a seed — no real hashing needed, + /// the tests only require uniqueness. Avoids a blake3 dep on fula-core. + fn cid_of(seed: u8) -> Cid { + let digest = [seed; 32]; + let mh = cid::multihash::Multihash::<64>::wrap(0x1e, &digest).unwrap(); + Cid::new_v1(0x55, mh) + } + + /// The exact two-master race: both open at root A; master 1 flushes A→B + /// and wins; master 2 flushes A→C and MUST conflict (its write would + /// silently drop master 1's objects otherwise). + #[tokio::test] + async fn second_master_building_on_a_stale_root_loses_the_cas() { + let store = InMemoryRootStore::default(); + let (a, b, c) = (cid_of(1), cid_of(2), cid_of(3)); + + assert_eq!( + store.cas_root("owner", "bkt", &a, &b).await.unwrap(), + CasOutcome::Won, + "first flush claims the slot" + ); + match store.cas_root("owner", "bkt", &a, &c).await.unwrap() { + CasOutcome::Conflict { current } => assert_eq!(current, Some(b)), + other => panic!("stale flush must conflict, got {other:?}"), + } + // The loser reopens at the shared root and retries — now it wins. + assert_eq!( + store.cas_root("owner", "bkt", &b, &c).await.unwrap(), + CasOutcome::Won + ); + assert_eq!(store.get_root("owner", "bkt").await.unwrap(), Some(c)); + } + + #[tokio::test] + async fn buckets_and_owners_are_isolated() { + let store = InMemoryRootStore::default(); + let (a, b) = (cid_of(10), cid_of(11)); + store.cas_root("o1", "bkt", &a, &b).await.unwrap(); + assert_eq!(store.get_root("o2", "bkt").await.unwrap(), None); + assert_eq!(store.get_root("o1", "other").await.unwrap(), None); + } +} + +#[cfg(test)] +pub mod test_support { + //! In-memory store for unit tests: two Buckets sharing one of these + //! reproduce the cross-master race deterministically. + use super::*; + use dashmap::DashMap; + use std::sync::Arc; + + #[derive(Default, Clone)] + pub struct InMemoryRootStore { + inner: Arc>, + } + + #[async_trait] + impl RootPointerStore for InMemoryRootStore { + async fn cas_root( + &self, + owner_id: &str, + bucket: &str, + expected: &Cid, + new: &Cid, + ) -> Result { + let key = (owner_id.to_string(), bucket.to_string()); + let mut entry = self.inner.entry(key).or_insert(*expected); + if *entry.value() == *expected { + *entry.value_mut() = *new; + Ok(CasOutcome::Won) + } else { + Ok(CasOutcome::Conflict { + current: Some(*entry.value()), + }) + } + } + + async fn get_root(&self, owner_id: &str, bucket: &str) -> Result> { + Ok(self + .inner + .get(&(owner_id.to_string(), bucket.to_string())) + .map(|e| *e.value())) + } + } +} diff --git a/docker/Dockerfile.gateway b/docker/Dockerfile.gateway new file mode 100644 index 0000000..fabcb30 --- /dev/null +++ b/docker/Dockerfile.gateway @@ -0,0 +1,34 @@ +# fula-gateway — S3-compatible gateway (federated-master image). +# +# Build (repo root): docker build -f docker/Dockerfile.gateway -t fula-gateway:latest . +# Used by the federated-master stack: functionland/pinning-service +# docker/master/docker-compose.master.yml ("gateway" profile, auto-enabled by +# update-scripts/join-as-master.sh when this image exists on the box). +# +# All configuration is env-driven (clap): FULA_HOST (0.0.0.0), FULA_PORT +# (9000), IPFS_API_URL, CLUSTER_API_URL, PINNING_SERVICE_ENDPOINT, +# STORAGE_API_URL, JWT_SECRET, ADMIN_JWT_SECRET, FULA_BLOCK_CACHE_MB, +# FULA_PIN_QUEUE_PATH (durable pin queue — production MUST set it to a +# persistent mount), rate limits, etc. See crates/fula-cli/src/main.rs. +FROM rust:1-bookworm AS build +WORKDIR /src +# Workspace build; fula-js (wasm) and fula-flutter are workspace members but +# building only -p fula-cli avoids their toolchains. +COPY . . +RUN cargo build --release -p fula-cli --bin fula-gateway + +FROM debian:bookworm-slim AS runtime +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates netcat-openbsd \ + && rm -rf /var/lib/apt/lists/* +COPY --from=build /src/target/release/fula-gateway /usr/local/bin/fula-gateway +# Persistent state lives at HARDCODED /var/lib/fula-gateway (pin_queue.redb, +# registry.cid, local_retain.redb — see crates/fula-cli/src/config.rs defaults). +# Mount a volume here or pin retries/crash recovery silently degrade to +# fire-and-forget and the bucket registry resets on restart. +RUN mkdir -p /var/lib/fula-gateway +VOLUME ["/var/lib/fula-gateway"] +ENV FULA_HOST=0.0.0.0 \ + FULA_PORT=9000 +EXPOSE 9000 +ENTRYPOINT ["fula-gateway"]