Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/buzz-media/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ hex = { workspace = true }
chrono = { workspace = true }
axum = { workspace = true }
s3 = { version = "0.37", package = "rust-s3", default-features = false, features = ["tokio-rustls-tls", "fail-on-err", "tags"] }
http = "1"
infer = "0.19"
image = { version = "0.25", default-features = false, features = ["jpeg", "png", "gif", "webp"] }
blurhash = "0.2"
Expand Down
4 changes: 3 additions & 1 deletion crates/buzz-media/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ pub use config::MediaConfig;
pub use error::MediaError;
pub use storage::{BlobHeadMeta, BlobMeta, ByteStream, MediaStorage};
pub use types::BlobDescriptor;
pub use upload::{process_file_upload, process_upload, process_video_upload};
pub use upload::{
process_file_upload, process_upload, process_video_upload, UploadAttributionLabels,
};
pub use validation::{serve_inline, validate_video_file, VideoMeta};
240 changes: 234 additions & 6 deletions crates/buzz-media/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! S3/MinIO storage client.

use std::collections::HashMap;
use std::path::Path;
use std::pin::Pin;

Expand All @@ -15,6 +16,15 @@ use serde::{Deserialize, Serialize};
/// A stream of byte chunks from S3, usable with `axum::body::Body::from_stream()`.
pub type ByteStream = Pin<Box<dyn futures_core::Stream<Item = Result<Bytes, MediaError>> + Send>>;

/// Bare S3 user-metadata key for the authenticated uploader pubkey.
pub const BUZZ_UPLOADER_ID_META_KEY: &str = "buzz-uploader-id";
/// Bare S3 user-metadata key for the uploader's configured display name.
pub const BUZZ_UPLOADER_NAME_META_KEY: &str = "buzz-uploader-name";
/// Bare S3 user-metadata key for the server-resolved community id.
pub const BUZZ_COMMUNITY_ID_META_KEY: &str = "buzz-community-id";
/// Bare S3 user-metadata key for the server-resolved community host.
pub const BUZZ_COMMUNITY_HOST_META_KEY: &str = "buzz-community-host";

/// S3-compatible object storage client.
pub struct MediaStorage {
bucket: Box<Bucket>,
Expand Down Expand Up @@ -77,6 +87,32 @@ impl MediaStorage {
Ok(())
}

/// Store an object from a byte slice with `x-amz-meta-*` object metadata.
///
/// `metadata` keys are bare names (e.g. `buzz-uploader-id`); the S3 client
/// adds the `x-amz-meta-` prefix. Used for media blobs that carry upload
/// attribution so out-of-band consumers can read it from a HEAD without
/// touching relay internals.
pub async fn put_with_metadata(
&self,
key: &str,
bytes: &[u8],
content_type: &str,
metadata: &[(&str, &str)],
) -> Result<(), MediaError> {
let mut builder = self
.bucket
.put_object_builder(key, bytes)
.with_content_type(content_type);
for (k, v) in metadata {
builder = builder
.with_metadata(k, v)
.map_err(|e| MediaError::StorageError(e.to_string()))?;
}
builder.execute().await?;
Ok(())
}

/// Stream a file from disk into S3 without loading it into RAM.
///
/// Uses rust-s3's `put_object_stream_with_content_type` which reads from
Expand All @@ -87,6 +123,24 @@ impl MediaStorage {
key: &str,
path: &Path,
content_type: &str,
) -> Result<(), MediaError> {
self.put_file_with_metadata(key, path, content_type, &[])
.await
}

/// Stream a file from disk into S3 with `x-amz-meta-*` object metadata.
///
/// Metadata is attached via bucket-level `extra_headers` (not the stream
/// builder's `with_metadata`) because rust-s3's streaming multipart path
/// only forwards builder headers on the small-file (single PUT) branch;
/// bucket `extra_headers` are applied to `InitiateMultipartUpload` too, so
/// metadata survives files larger than the 8 MiB chunk threshold.
pub async fn put_file_with_metadata(
&self,
key: &str,
path: &Path,
content_type: &str,
metadata: &[(&str, &str)],
) -> Result<(), MediaError> {
const BUF: usize = 8 * 1024 * 1024; // 8 MiB read buffer

Expand All @@ -95,7 +149,19 @@ impl MediaStorage {
.map_err(|e| MediaError::Io(e.to_string()))?;
let mut reader = tokio::io::BufReader::with_capacity(BUF, file);

self.bucket
if metadata.is_empty() {
self.bucket
.put_object_stream_with_content_type(&mut reader, key, content_type)
.await?;
return Ok(());
}

let headers = build_amz_meta_headers(metadata)?;
let bucket = self
.bucket
.with_extra_headers(headers)
.map_err(|e| MediaError::StorageError(e.to_string()))?;
bucket
.put_object_stream_with_content_type(&mut reader, key, content_type)
.await?;
Ok(())
Expand Down Expand Up @@ -163,12 +229,10 @@ impl MediaStorage {
Ok(())
}

/// HEAD with metadata — returns Content-Length (size).
/// HEAD with metadata — returns Content-Length (size) and user metadata.
pub async fn head_with_metadata(&self, key: &str) -> Result<Option<BlobHeadMeta>, MediaError> {
match self.bucket.head_object(key).await {
Ok((result, _)) => Ok(Some(BlobHeadMeta {
size: result.content_length.unwrap_or(0) as u64,
})),
Ok((result, _)) => Ok(Some(BlobHeadMeta::from_head_object_result(result))),
Err(s3::error::S3Error::HttpFailWithBody(404, _)) => Ok(None),
Err(e) => Err(MediaError::StorageError(e.to_string())),
}
Expand Down Expand Up @@ -336,11 +400,156 @@ mod tests {
"video/mp4"
);
}

#[test]
fn amz_meta_headers_are_prefixed_and_validated() {
let headers = build_amz_meta_headers(&[
(BUZZ_UPLOADER_ID_META_KEY, "aabbcc"),
(BUZZ_UPLOADER_NAME_META_KEY, "Ada"),
(BUZZ_COMMUNITY_ID_META_KEY, "0000-1111"),
(BUZZ_COMMUNITY_HOST_META_KEY, "moderation.buzz.example"),
])
.unwrap();
assert_eq!(
headers
.get(format!("x-amz-meta-{BUZZ_UPLOADER_ID_META_KEY}"))
.unwrap(),
"aabbcc"
);
assert_eq!(
headers
.get(format!("x-amz-meta-{BUZZ_UPLOADER_NAME_META_KEY}"))
.unwrap(),
"Ada"
);
assert_eq!(
headers
.get(format!("x-amz-meta-{BUZZ_COMMUNITY_ID_META_KEY}"))
.unwrap(),
"0000-1111"
);
assert_eq!(
headers
.get(format!("x-amz-meta-{BUZZ_COMMUNITY_HOST_META_KEY}"))
.unwrap(),
"moderation.buzz.example"
);

// Control characters in values are rejected, not silently mangled.
assert!(build_amz_meta_headers(&[(BUZZ_UPLOADER_ID_META_KEY, "bu\nzz")]).is_err());
// Invalid header-name characters in the key are rejected.
assert!(build_amz_meta_headers(&[("bad key", "v")]).is_err());
}

#[test]
fn blob_head_meta_surfaces_s3_user_metadata() {
let mut metadata = HashMap::new();
metadata.insert(BUZZ_UPLOADER_ID_META_KEY.to_string(), "aabbcc".to_string());
metadata.insert(BUZZ_UPLOADER_NAME_META_KEY.to_string(), "Ada".to_string());
metadata.insert(
BUZZ_COMMUNITY_ID_META_KEY.to_string(),
"0000-1111".to_string(),
);
metadata.insert(
BUZZ_COMMUNITY_HOST_META_KEY.to_string(),
"moderation.buzz.example".to_string(),
);

let result = s3::serde_types::HeadObjectResult {
content_length: Some(42),
metadata: Some(metadata),
..Default::default()
};

let head = BlobHeadMeta::from_head_object_result(result);
assert_eq!(head.size, 42);
assert_eq!(
head.metadata.get(BUZZ_UPLOADER_ID_META_KEY),
Some(&"aabbcc".to_string())
);
assert_eq!(
head.metadata.get(BUZZ_UPLOADER_NAME_META_KEY),
Some(&"Ada".to_string())
);
assert_eq!(
head.metadata.get(BUZZ_COMMUNITY_ID_META_KEY),
Some(&"0000-1111".to_string())
);
assert_eq!(
head.metadata.get(BUZZ_COMMUNITY_HOST_META_KEY),
Some(&"moderation.buzz.example".to_string())
);
}

/// Old sidecars (written before upload attribution) must still parse, and
/// new fields must round-trip.
#[test]
fn sidecar_attribution_fields_are_backward_compatible() {
// Pre-attribution sidecar JSON — no uploader_id/community_id keys.
let old = r#"{"dim":"800x600","blurhash":"","thumb_url":"","ext":"jpg","mime_type":"image/jpeg","size":123,"uploaded_at":1700000000}"#;
let meta: BlobMeta = serde_json::from_str(old).unwrap();
assert_eq!(meta.uploader_id, None);
assert_eq!(meta.uploader_name, None);
assert_eq!(meta.community_id, None);
assert_eq!(meta.community_host, None);

// Absent attribution is omitted from serialized output (not null).
let json = serde_json::to_value(&meta).unwrap();
assert!(json.get("uploader_id").is_none());
assert!(json.get("uploader_name").is_none());
assert!(json.get("community_id").is_none());
assert!(json.get("community_host").is_none());

// Populated attribution round-trips.
let meta = BlobMeta {
uploader_id: Some("aa".repeat(32)),
uploader_name: Some("Ada".to_string()),
community_id: Some("6b8e1c2a-0000-0000-0000-000000000000".to_string()),
community_host: Some("moderation.buzz.example".to_string()),
..meta
};
let round: BlobMeta = serde_json::from_str(&serde_json::to_string(&meta).unwrap()).unwrap();
assert_eq!(round.uploader_id, meta.uploader_id);
assert_eq!(round.uploader_name, meta.uploader_name);
assert_eq!(round.community_id, meta.community_id);
assert_eq!(round.community_host, meta.community_host);
}
}

/// Build an `x-amz-meta-*` [`http::HeaderMap`] from bare metadata key/value
/// pairs. Keys must be valid header-name characters; values must be valid
/// header values (S3 object metadata is US-ASCII).
fn build_amz_meta_headers(metadata: &[(&str, &str)]) -> Result<http::HeaderMap, MediaError> {
let mut headers = http::HeaderMap::new();
for (k, v) in metadata {
let name: http::HeaderName = format!("x-amz-meta-{k}")
.parse()
.map_err(|_| MediaError::StorageError(format!("invalid metadata key: {k}")))?;
let value: http::HeaderValue = v
.parse()
.map_err(|_| MediaError::StorageError(format!("invalid metadata value for {k}")))?;
headers.insert(name, value);
}
Ok(headers)
}

/// Metadata returned by HEAD — just enough for BUD-01 response headers.
/// Metadata returned by HEAD for BUD-01 responses and moderation tooling.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct BlobHeadMeta {
/// Object size in bytes.
pub size: u64,
/// S3 user metadata returned by HEAD, keyed by bare metadata name (without
/// the `x-amz-meta-` prefix), e.g. `buzz-uploader-id`.
pub metadata: HashMap<String, String>,
}

impl BlobHeadMeta {
fn from_head_object_result(result: s3::serde_types::HeadObjectResult) -> Self {
Self {
size: result.content_length.unwrap_or(0).max(0) as u64,
metadata: result.metadata.unwrap_or_default(),
}
}
}

/// Full blob metadata — stored as sidecar JSON in `_meta/{community}/{sha256}.json`.
Expand All @@ -364,4 +573,23 @@ pub struct BlobMeta {
/// Video duration in seconds. `None` for non-video blobs.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub duration_secs: Option<f64>,
/// Authenticated uploader pubkey (hex). Upload attribution for
/// out-of-band consumers; `None` on sidecars written before attribution
/// existed.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub uploader_id: Option<String>,
/// Best-effort configured display name for the authenticated uploader. This
/// is a readability hint, not an authority boundary; `uploader_id` and the
/// audit log remain authoritative.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub uploader_name: Option<String>,
/// Host-resolved community id (UUID string). Mirrors the community segment
/// of the sidecar key so attribution survives even if the object is copied
/// out of its keyed location; `None` on pre-attribution sidecars.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub community_id: Option<String>,
/// Server-resolved community host (for example `team.example.com`).
/// Readability hint only; `community_id` remains authoritative.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub community_host: Option<String>,
}
Loading
Loading