Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 86 additions & 4 deletions crates/buzz-relay/src/audio/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ use crate::state::AppState;
/// Maximum binary frame size: 4 KB is generous for a single Opus packet.
const MAX_AUDIO_FRAME_BYTES: usize = 4096;

/// Maximum screen-share JPEG frame size.
const MAX_SCREEN_FRAME_BYTES: usize = 512 * 1024;

/// Maximum screen-share control JSON size.
const MAX_SCREEN_CONTROL_BYTES: usize = 2 * 1024;

/// Maximum text frame size: 8 KB bounds auth/control JSON parsing.
const MAX_TEXT_FRAME_BYTES: usize = 8192;

Expand Down Expand Up @@ -87,7 +93,7 @@ pub async fn ws_audio_handler(
/// Highest huddle audio protocol version this relay understands. Clients are
/// allowed to negotiate any version in `1..=CURRENT_PROTOCOL_VERSION`; older
/// versions stay supported indefinitely for staged rollouts.
const CURRENT_PROTOCOL_VERSION: u8 = 2;
const CURRENT_PROTOCOL_VERSION: u8 = 3;

#[derive(Deserialize)]
struct AuthMsg {
Expand Down Expand Up @@ -200,7 +206,6 @@ async fn handle_audio_connection(
return;
}

// ── Step 3: membership check / auto-add ───────────────────────────────────
let parent_id_for_event = match ensure_membership(
&state,
&tenant,
Expand Down Expand Up @@ -384,7 +389,6 @@ async fn handle_audio_connection(

room.broadcast_control(joined_msg);

// ── Step 6: emit kind:48101 (PARTICIPANT_JOINED) ──────────────────────────
emit_participant_event(
&state,
&tenant,
Expand Down Expand Up @@ -508,7 +512,10 @@ async fn recv_loop(
missed_pongs: Arc<AtomicU8>,
cancel: CancellationToken,
) {
use crate::audio::wire::{FrameHeader, V2_HEADER_LEN};
use crate::audio::wire::{
FrameHeader, MEDIA_KIND_AUDIO, MEDIA_KIND_SCREEN_CONTROL, MEDIA_KIND_SCREEN_FRAME,
V2_HEADER_LEN,
};

loop {
tokio::select! {
Expand All @@ -517,6 +524,81 @@ async fn recv_loop(
msg = ws_recv.next() => {
match msg {
Some(Ok(WsMessage::Binary(data))) => {
if protocol_version >= 3 {
let Some((&media_kind, payload)) = data.split_first() else {
warn!(peer_id = %peer_id, "v3 media frame missing kind — dropping");
continue;
};

match media_kind {
MEDIA_KIND_AUDIO => {
if payload.len() > MAX_AUDIO_FRAME_BYTES {
warn!(peer_id = %peer_id, bytes = payload.len(), "audio frame too large — dropping");
continue;
}
if payload.len() <= V2_HEADER_LEN {
warn!(
peer_id = %peer_id,
bytes = payload.len(),
"v3 audio frame missing header or payload — dropping"
);
continue;
}
match FrameHeader::parse(payload) {
Some((header, opus_payload)) if !opus_payload.is_empty() => {
tracing::trace!(
peer_id = %peer_id,
seq = header.seq,
ts_48k = header.ts_48k,
level_dbov = header.level_dbov,
is_dtx = header.is_dtx(),
"v3 audio frame"
);
}
_ => {
warn!(
peer_id = %peer_id,
bytes = payload.len(),
"v3 audio frame failed header parse — dropping"
);
continue;
}
}
}
MEDIA_KIND_SCREEN_FRAME => {
if payload.is_empty() || payload.len() > MAX_SCREEN_FRAME_BYTES {
warn!(
peer_id = %peer_id,
bytes = payload.len(),
"screen-share frame empty or too large — dropping"
);
continue;
}
}
MEDIA_KIND_SCREEN_CONTROL => {
if payload.is_empty() || payload.len() > MAX_SCREEN_CONTROL_BYTES {
warn!(
peer_id = %peer_id,
bytes = payload.len(),
"screen-share control empty or too large — dropping"
);
continue;
}
}
_ => {
warn!(
peer_id = %peer_id,
media_kind,
"unknown v3 media kind — dropping"
);
continue;
}
}

room.broadcast_frame(peer_id, data);
continue;
}

if data.len() > MAX_AUDIO_FRAME_BYTES {
warn!(peer_id = %peer_id, bytes = data.len(), "audio frame too large — dropping");
continue;
Expand Down
6 changes: 3 additions & 3 deletions crates/buzz-relay/src/audio/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use uuid::Uuid;
pub struct AudioPeer {
/// Nostr pubkey hex.
pub pubkey: String,
/// Audio frames (binary Opus with peer_index prefix). Drops on full — real-time.
/// Realtime media frames (binary payload with peer_index prefix). Drops on full.
pub audio_tx: mpsc::Sender<Bytes>,
/// Control messages (joined/left/close JSON). Separate queue so control
/// is never starved by audio backpressure.
Expand All @@ -27,7 +27,7 @@ pub struct AudioPeer {
pub peer_index: u8,
}

/// Control message for a single peer (separate from audio frames).
/// Control message for a single peer (separate from realtime media frames).
pub enum PeerCtrl {
/// JSON control message (joined/left/speakers).
Json(String),
Expand Down Expand Up @@ -261,7 +261,7 @@ impl Room {
Some((peer_index, should_end))
}

/// Fan-out a binary frame to all peers except the sender.
/// Fan-out a binary realtime media frame to all peers except the sender.
/// Prepends the sender's `peer_index` as a 1-byte prefix.
/// Drops on full buffer — real-time audio never queues.
pub fn broadcast_frame(&self, sender_id: Uuid, frame: Bytes) {
Expand Down
7 changes: 7 additions & 0 deletions crates/buzz-relay/src/audio/wire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ pub const V2_HEADER_LEN: usize = 8;
/// `flags & FLAG_DTX` indicates a DTX/comfort-noise frame.
pub const FLAG_DTX: u8 = 0x01;

/// v3 media kind: payload is a v2 audio frame (`FrameHeader` + Opus bytes).
pub const MEDIA_KIND_AUDIO: u8 = 0x01;
/// v3 media kind: payload is a compressed screen-share frame.
pub const MEDIA_KIND_SCREEN_FRAME: u8 = 0x02;
/// v3 media kind: payload is UTF-8 JSON screen-share control data.
pub const MEDIA_KIND_SCREEN_CONTROL: u8 = 0x03;

/// Parsed v2 header view. Cheap (Copy).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FrameHeader {
Expand Down
3 changes: 3 additions & 0 deletions desktop/src-tauri/src/huddle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod playout;
pub mod pocket;
pub mod preprocessing;
pub mod relay_api;
pub mod screen_share;
pub mod state;
pub mod stt;
pub mod transcription;
Expand Down Expand Up @@ -60,6 +61,7 @@ pub(super) fn drain_until_shutdown<T>(

// ── Re-exports ────────────────────────────────────────────────────────────────

pub use screen_share::{push_huddle_screen_control, push_huddle_screen_frame};
pub use state::{HuddleJoinInfo, HuddlePhase, HuddleState, VoiceInputMode};
pub use transcription::{set_huddle_transcription_enabled, start_stt_pipeline};

Expand Down Expand Up @@ -417,6 +419,7 @@ fn teardown_huddle(state: &AppState) -> Result<(), String> {
c.cancel();
}
hs.audio_relay_pcm_tx.take(); // Drop sender — signals the relay task.
hs.screen_relay_tx.take();
hs.reset_preserving_generation();
(stt, tts, cancel)
};
Expand Down
5 changes: 4 additions & 1 deletion desktop/src-tauri/src/huddle/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ pub(crate) async fn post_connect_setup(
let hs = state.huddle()?;
hs.parent_channel_id.clone()
};
let (cancel, pcm_tx) =
let (cancel, pcm_tx, screen_tx) =
relay_api::connect_audio_relay(ephemeral_channel_id, parent_id.as_deref(), state).await?;
let screen_share_available = screen_tx.is_some();
{
let mut hs = state.huddle()?;
hs.audio_ws_cancel = Some(cancel);
hs.audio_relay_pcm_tx = Some(pcm_tx);
hs.screen_relay_tx = screen_tx;
hs.screen_share_available = screen_share_available;
}

// Start TTS immediately. STT/transcript posting is opt-in and starts only
Expand Down
Loading
Loading