From dd536dfc247e43cdfdb26ad56bf7f74e49a0060c Mon Sep 17 00:00:00 2001 From: klopez4212 Date: Tue, 30 Jun 2026 14:09:52 +0100 Subject: [PATCH] Add huddle screen sharing controls --- crates/buzz-relay/src/audio/handler.rs | 90 ++- crates/buzz-relay/src/audio/room.rs | 6 +- crates/buzz-relay/src/audio/wire.rs | 7 + desktop/src-tauri/src/huddle/mod.rs | 3 + desktop/src-tauri/src/huddle/pipeline.rs | 5 +- desktop/src-tauri/src/huddle/playout.rs | 240 +++++--- desktop/src-tauri/src/huddle/relay_api.rs | 250 +++++--- desktop/src-tauri/src/huddle/screen_share.rs | 67 +++ desktop/src-tauri/src/huddle/state.rs | 10 + desktop/src-tauri/src/huddle/wire.rs | 20 +- desktop/src-tauri/src/lib.rs | 5 +- .../features/huddle/components/HuddleBar.tsx | 180 +++--- .../huddle/components/ParticipantList.tsx | 165 +++--- .../huddle/components/ScreenShareControls.tsx | 557 ++++++++++++++++++ .../src/features/huddle/lib/audioWorklet.ts | 16 +- .../src/features/huddle/lib/tauriRawBinary.ts | 18 + desktop/src/testing/e2eBridge.ts | 104 ++++ desktop/tests/helpers/bridge.ts | 20 + desktop/tests/helpers/screenshot.mjs | 50 +- 19 files changed, 1494 insertions(+), 319 deletions(-) create mode 100644 desktop/src-tauri/src/huddle/screen_share.rs create mode 100644 desktop/src/features/huddle/components/ScreenShareControls.tsx create mode 100644 desktop/src/features/huddle/lib/tauriRawBinary.ts diff --git a/crates/buzz-relay/src/audio/handler.rs b/crates/buzz-relay/src/audio/handler.rs index 9e930ccc4..33326cbc2 100644 --- a/crates/buzz-relay/src/audio/handler.rs +++ b/crates/buzz-relay/src/audio/handler.rs @@ -43,6 +43,12 @@ use crate::state::AppState; /// Maximum binary frame size: 4 KB is generous for a single Opus packet. const MAX_AUDIO_FRAME_BYTES: usize = 4096; +/// Maximum screen-share JPEG frame size. +const MAX_SCREEN_FRAME_BYTES: usize = 512 * 1024; + +/// Maximum screen-share control JSON size. +const MAX_SCREEN_CONTROL_BYTES: usize = 2 * 1024; + /// Maximum text frame size: 8 KB bounds auth/control JSON parsing. const MAX_TEXT_FRAME_BYTES: usize = 8192; @@ -87,7 +93,7 @@ pub async fn ws_audio_handler( /// Highest huddle audio protocol version this relay understands. Clients are /// allowed to negotiate any version in `1..=CURRENT_PROTOCOL_VERSION`; older /// versions stay supported indefinitely for staged rollouts. -const CURRENT_PROTOCOL_VERSION: u8 = 2; +const CURRENT_PROTOCOL_VERSION: u8 = 3; #[derive(Deserialize)] struct AuthMsg { @@ -200,7 +206,6 @@ async fn handle_audio_connection( return; } - // ── Step 3: membership check / auto-add ─────────────────────────────────── let parent_id_for_event = match ensure_membership( &state, &tenant, @@ -384,7 +389,6 @@ async fn handle_audio_connection( room.broadcast_control(joined_msg); - // ── Step 6: emit kind:48101 (PARTICIPANT_JOINED) ────────────────────────── emit_participant_event( &state, &tenant, @@ -508,7 +512,10 @@ async fn recv_loop( missed_pongs: Arc, cancel: CancellationToken, ) { - use crate::audio::wire::{FrameHeader, V2_HEADER_LEN}; + use crate::audio::wire::{ + FrameHeader, MEDIA_KIND_AUDIO, MEDIA_KIND_SCREEN_CONTROL, MEDIA_KIND_SCREEN_FRAME, + V2_HEADER_LEN, + }; loop { tokio::select! { @@ -517,6 +524,81 @@ async fn recv_loop( msg = ws_recv.next() => { match msg { Some(Ok(WsMessage::Binary(data))) => { + if protocol_version >= 3 { + let Some((&media_kind, payload)) = data.split_first() else { + warn!(peer_id = %peer_id, "v3 media frame missing kind — dropping"); + continue; + }; + + match media_kind { + MEDIA_KIND_AUDIO => { + if payload.len() > MAX_AUDIO_FRAME_BYTES { + warn!(peer_id = %peer_id, bytes = payload.len(), "audio frame too large — dropping"); + continue; + } + if payload.len() <= V2_HEADER_LEN { + warn!( + peer_id = %peer_id, + bytes = payload.len(), + "v3 audio frame missing header or payload — dropping" + ); + continue; + } + match FrameHeader::parse(payload) { + Some((header, opus_payload)) if !opus_payload.is_empty() => { + tracing::trace!( + peer_id = %peer_id, + seq = header.seq, + ts_48k = header.ts_48k, + level_dbov = header.level_dbov, + is_dtx = header.is_dtx(), + "v3 audio frame" + ); + } + _ => { + warn!( + peer_id = %peer_id, + bytes = payload.len(), + "v3 audio frame failed header parse — dropping" + ); + continue; + } + } + } + MEDIA_KIND_SCREEN_FRAME => { + if payload.is_empty() || payload.len() > MAX_SCREEN_FRAME_BYTES { + warn!( + peer_id = %peer_id, + bytes = payload.len(), + "screen-share frame empty or too large — dropping" + ); + continue; + } + } + MEDIA_KIND_SCREEN_CONTROL => { + if payload.is_empty() || payload.len() > MAX_SCREEN_CONTROL_BYTES { + warn!( + peer_id = %peer_id, + bytes = payload.len(), + "screen-share control empty or too large — dropping" + ); + continue; + } + } + _ => { + warn!( + peer_id = %peer_id, + media_kind, + "unknown v3 media kind — dropping" + ); + continue; + } + } + + room.broadcast_frame(peer_id, data); + continue; + } + if data.len() > MAX_AUDIO_FRAME_BYTES { warn!(peer_id = %peer_id, bytes = data.len(), "audio frame too large — dropping"); continue; diff --git a/crates/buzz-relay/src/audio/room.rs b/crates/buzz-relay/src/audio/room.rs index d0a70fbe7..a73341676 100644 --- a/crates/buzz-relay/src/audio/room.rs +++ b/crates/buzz-relay/src/audio/room.rs @@ -18,7 +18,7 @@ use uuid::Uuid; pub struct AudioPeer { /// Nostr pubkey hex. pub pubkey: String, - /// Audio frames (binary Opus with peer_index prefix). Drops on full — real-time. + /// Realtime media frames (binary payload with peer_index prefix). Drops on full. pub audio_tx: mpsc::Sender, /// Control messages (joined/left/close JSON). Separate queue so control /// is never starved by audio backpressure. @@ -27,7 +27,7 @@ pub struct AudioPeer { pub peer_index: u8, } -/// Control message for a single peer (separate from audio frames). +/// Control message for a single peer (separate from realtime media frames). pub enum PeerCtrl { /// JSON control message (joined/left/speakers). Json(String), @@ -261,7 +261,7 @@ impl Room { Some((peer_index, should_end)) } - /// Fan-out a binary frame to all peers except the sender. + /// Fan-out a binary realtime media frame to all peers except the sender. /// Prepends the sender's `peer_index` as a 1-byte prefix. /// Drops on full buffer — real-time audio never queues. pub fn broadcast_frame(&self, sender_id: Uuid, frame: Bytes) { diff --git a/crates/buzz-relay/src/audio/wire.rs b/crates/buzz-relay/src/audio/wire.rs index f2d9d5b36..721c9851d 100644 --- a/crates/buzz-relay/src/audio/wire.rs +++ b/crates/buzz-relay/src/audio/wire.rs @@ -32,6 +32,13 @@ pub const V2_HEADER_LEN: usize = 8; /// `flags & FLAG_DTX` indicates a DTX/comfort-noise frame. pub const FLAG_DTX: u8 = 0x01; +/// v3 media kind: payload is a v2 audio frame (`FrameHeader` + Opus bytes). +pub const MEDIA_KIND_AUDIO: u8 = 0x01; +/// v3 media kind: payload is a compressed screen-share frame. +pub const MEDIA_KIND_SCREEN_FRAME: u8 = 0x02; +/// v3 media kind: payload is UTF-8 JSON screen-share control data. +pub const MEDIA_KIND_SCREEN_CONTROL: u8 = 0x03; + /// Parsed v2 header view. Cheap (Copy). #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct FrameHeader { diff --git a/desktop/src-tauri/src/huddle/mod.rs b/desktop/src-tauri/src/huddle/mod.rs index 5499ccab4..145cde723 100644 --- a/desktop/src-tauri/src/huddle/mod.rs +++ b/desktop/src-tauri/src/huddle/mod.rs @@ -32,6 +32,7 @@ pub mod playout; pub mod pocket; pub mod preprocessing; pub mod relay_api; +pub mod screen_share; pub mod state; pub mod stt; pub mod transcription; @@ -60,6 +61,7 @@ pub(super) fn drain_until_shutdown( // ── Re-exports ──────────────────────────────────────────────────────────────── +pub use screen_share::{push_huddle_screen_control, push_huddle_screen_frame}; pub use state::{HuddleJoinInfo, HuddlePhase, HuddleState, VoiceInputMode}; pub use transcription::{set_huddle_transcription_enabled, start_stt_pipeline}; @@ -417,6 +419,7 @@ fn teardown_huddle(state: &AppState) -> Result<(), String> { c.cancel(); } hs.audio_relay_pcm_tx.take(); // Drop sender — signals the relay task. + hs.screen_relay_tx.take(); hs.reset_preserving_generation(); (stt, tts, cancel) }; diff --git a/desktop/src-tauri/src/huddle/pipeline.rs b/desktop/src-tauri/src/huddle/pipeline.rs index ebae8e339..0d8a9133b 100644 --- a/desktop/src-tauri/src/huddle/pipeline.rs +++ b/desktop/src-tauri/src/huddle/pipeline.rs @@ -50,12 +50,15 @@ pub(crate) async fn post_connect_setup( let hs = state.huddle()?; hs.parent_channel_id.clone() }; - let (cancel, pcm_tx) = + let (cancel, pcm_tx, screen_tx) = relay_api::connect_audio_relay(ephemeral_channel_id, parent_id.as_deref(), state).await?; + let screen_share_available = screen_tx.is_some(); { let mut hs = state.huddle()?; hs.audio_ws_cancel = Some(cancel); hs.audio_relay_pcm_tx = Some(pcm_tx); + hs.screen_relay_tx = screen_tx; + hs.screen_share_available = screen_share_available; } // Start TTS immediately. STT/transcript posting is opt-in and starts only diff --git a/desktop/src-tauri/src/huddle/playout.rs b/desktop/src-tauri/src/huddle/playout.rs index 69548bcfc..62057b5a2 100644 --- a/desktop/src-tauri/src/huddle/playout.rs +++ b/desktop/src-tauri/src/huddle/playout.rs @@ -26,13 +26,18 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use base64::Engine; use futures_util::{SinkExt, StreamExt}; +use serde::Serialize; use tokio_tungstenite::tungstenite::Message as WsMsg; use tokio_util::sync::CancellationToken; use super::jitter::{PeerJitterBuffer, SAMPLE_RATE_HZ}; use super::relay_api::{WsStream, REMOTE_SPEECH_THRESHOLD}; -use super::wire::{FrameHeader, FLAG_DTX, V2_HEADER_LEN}; +use super::wire::{ + FrameHeader, FLAG_DTX, MEDIA_KIND_AUDIO, MEDIA_KIND_SCREEN_CONTROL, MEDIA_KIND_SCREEN_FRAME, + V2_HEADER_LEN, +}; /// Speaker-tick window for emitting `huddle-active-speakers`. Active set is /// cleared each tick — peers that didn't send a frame in the last window are @@ -89,6 +94,19 @@ struct PeerSlot { last_packet_at: tokio::time::Instant, } +#[derive(Clone, Serialize)] +struct HuddleScreenFrameEvent { + pubkey: String, + mime_type: &'static str, + data_base64: String, +} + +#[derive(Clone, Serialize)] +struct HuddleScreenControlEvent { + pubkey: String, + control: serde_json::Value, +} + impl PeerSlot { fn new(peer_idx: u8, sink_mixer: &rodio::mixer::Mixer) -> Option { match PeerJitterBuffer::new(peer_idx) { @@ -123,6 +141,77 @@ impl PeerSlot { } } +#[allow(clippy::too_many_arguments)] +fn handle_audio_frame( + peer_idx: u8, + payload: &[u8], + peers: &mut std::collections::HashMap, + active_indices: &mut std::collections::HashSet, + frame_counts: &mut std::collections::HashMap, + last_frame_reset: &mut tokio::time::Instant, + tts_was_active: &mut bool, + tts_active: &AtomicBool, + tts_cancel: &AtomicBool, + sink_mixer: &rodio::mixer::Mixer, +) { + if payload.len() <= V2_HEADER_LEN { + return; + } + let Some((header, opus_bytes)) = FrameHeader::parse(payload) else { + eprintln!( + "buzz-desktop: dropping malformed audio frame from peer {peer_idx} ({} bytes)", + payload.len(), + ); + return; + }; + if opus_bytes.is_empty() { + return; + } + + let is_dtx = (header.flags & FLAG_DTX) != 0; + if !is_dtx { + active_indices.insert(peer_idx); + } + + let tts_now = tts_active.load(Ordering::Acquire); + if tts_now && !*tts_was_active { + frame_counts.clear(); + *last_frame_reset = tokio::time::Instant::now(); + } + *tts_was_active = tts_now; + + let slot = match peers.entry(peer_idx) { + std::collections::hash_map::Entry::Occupied(e) => e.into_mut(), + std::collections::hash_map::Entry::Vacant(e) => { + let Some(slot) = PeerSlot::new(peer_idx, sink_mixer) else { + return; + }; + e.insert(slot) + } + }; + + if let Err(err) = slot + .jitter + .insert_packet(header.seq, header.ts_48k, opus_bytes) + { + eprintln!("buzz-desktop: jitter insert peer {peer_idx}: {err}"); + } else { + slot.last_packet_at = tokio::time::Instant::now(); + } + + if tts_now && !is_dtx { + if last_frame_reset.elapsed() >= FRAME_WINDOW { + frame_counts.clear(); + *last_frame_reset = tokio::time::Instant::now(); + } + let count = frame_counts.entry(peer_idx).or_insert(0); + *count = count.saturating_add(1); + if *count >= REMOTE_SPEECH_THRESHOLD { + tts_cancel.store(true, Ordering::Release); + } + } +} + /// Drive the receive loop until cancelled or the WS closes. /// /// `ws_tx_for_pongs` is shared with the encode-side task and only used here to @@ -138,6 +227,7 @@ pub(crate) async fn run_playout_recv_loop( initial_peers: Vec<(u8, String)>, tts_active: Arc, tts_cancel: Arc, + protocol_version: u8, ) { use rodio::buffer::SamplesBuffer; use std::num::NonZero; @@ -224,87 +314,93 @@ pub(crate) async fn run_playout_recv_loop( msg = ws_rx.next() => { match msg { Some(Ok(WsMsg::Binary(data))) => { - // Wire shape (v2): [peer_index: u8][header: 8 bytes][opus payload...] - // The minimum size is 1 (peer_index) + 8 (header) + ≥1 Opus byte. - if data.len() <= 1 + V2_HEADER_LEN { - continue; - } - let peer_idx = data[0]; - let after_idx = &data[1..]; - let Some((header, opus_bytes)) = FrameHeader::parse(after_idx) - else { - // Malformed v2 frame: header parse only fails when - // the slice is too short, which `if data.len() <= ...` - // already guards. Defensive log + drop. - eprintln!( - "buzz-desktop: dropping malformed audio frame from peer {peer_idx} ({} bytes)", - data.len(), + if protocol_version < 3 { + // Wire shape (v2): [peer_index: u8][header: 8 bytes][opus...] + if data.len() <= 1 + V2_HEADER_LEN { + continue; + } + handle_audio_frame( + data[0], + &data[1..], + &mut peers, + &mut active_indices, + &mut frame_counts, + &mut last_frame_reset, + &mut tts_was_active, + &tts_active, + &tts_cancel, + sink_handle.mixer(), ); continue; - }; - if opus_bytes.is_empty() { - continue; - } - let is_dtx = (header.flags & FLAG_DTX) != 0; - // Only count non-DTX arrivals toward the UI's - // active-speaker set. DTX/comfort packets are emitted - // by an idle peer to keep the codec alive — they - // don't mean the peer is speaking, and shouldn't - // make their tile flash for the 500 ms speaker tick. - if !is_dtx { - active_indices.insert(peer_idx); } - // TTS interrupt frame counter — reset on TTS rising edge. - let tts_now = tts_active.load(Ordering::Acquire); - if tts_now && !tts_was_active { - frame_counts.clear(); - last_frame_reset = tokio::time::Instant::now(); + // Wire shape (v3): [peer_index: u8][media_kind: u8][payload...] + if data.len() < 2 { + continue; } - tts_was_active = tts_now; - - let slot = match peers.entry(peer_idx) { - std::collections::hash_map::Entry::Occupied(e) => e.into_mut(), - std::collections::hash_map::Entry::Vacant(e) => { - let Some(slot) = PeerSlot::new(peer_idx, sink_handle.mixer()) - else { - continue; - }; - e.insert(slot) - } + let peer_idx = data[0]; + let media_kind = data[1]; + let payload = &data[2..]; + let peer_pubkey = || { + index_to_pubkey + .get(&peer_idx) + .cloned() + .unwrap_or_else(|| format!("peer:{peer_idx}")) }; - // Sender-authored seq/ts: NetEq can detect real - // packet reordering & loss, not just arrival jitter. - if let Err(err) = - slot.jitter - .insert_packet(header.seq, header.ts_48k, opus_bytes) - { - eprintln!( - "buzz-desktop: jitter insert peer {peer_idx}: {err}" - ); - } else { - // Heartbeat for the playout tick's idle-peer - // guard — only on successful insert so a stream - // of bad packets can't keep a dead peer "active". - slot.last_packet_at = tokio::time::Instant::now(); - } - - // Count remote-speech frame arrivals for the TTS - // interrupt. DTX/comfort frames don't count — they - // mean the peer is silent, just keeping the codec - // state alive. - if tts_now && !is_dtx { - if last_frame_reset.elapsed() >= FRAME_WINDOW { - frame_counts.clear(); - last_frame_reset = tokio::time::Instant::now(); + match media_kind { + MEDIA_KIND_SCREEN_FRAME => { + if payload.is_empty() { + continue; + } + if let Some(ref app) = app_handle { + use tauri::Emitter; + let data_base64 = base64::engine::general_purpose::STANDARD + .encode(payload); + let _ = app.emit( + "huddle-screen-frame", + HuddleScreenFrameEvent { + pubkey: peer_pubkey(), + mime_type: "image/jpeg", + data_base64, + }, + ); + } + continue; } - let count = frame_counts.entry(peer_idx).or_insert(0); - *count = count.saturating_add(1); - if *count >= REMOTE_SPEECH_THRESHOLD { - tts_cancel.store(true, Ordering::Release); + MEDIA_KIND_SCREEN_CONTROL => { + if let Ok(control) = + serde_json::from_slice::(payload) + { + if let Some(ref app) = app_handle { + use tauri::Emitter; + let _ = app.emit( + "huddle-screen-control", + HuddleScreenControlEvent { + pubkey: peer_pubkey(), + control, + }, + ); + } + } + continue; } + MEDIA_KIND_AUDIO => {} + _ => continue, } + + handle_audio_frame( + peer_idx, + payload, + &mut peers, + &mut active_indices, + &mut frame_counts, + &mut last_frame_reset, + &mut tts_was_active, + &tts_active, + &tts_cancel, + sink_handle.mixer(), + ); } Some(Ok(WsMsg::Text(text))) => { if let Ok(v) = serde_json::from_str::(&text) { diff --git a/desktop/src-tauri/src/huddle/relay_api.rs b/desktop/src-tauri/src/huddle/relay_api.rs index eb3fea92d..f2407736e 100644 --- a/desktop/src-tauri/src/huddle/relay_api.rs +++ b/desktop/src-tauri/src/huddle/relay_api.rs @@ -40,16 +40,73 @@ pub(crate) fn parse_channel_uuid(channel_id: &str) -> Result { /// Handshake timeout — matches the server's AUTH_TIMEOUT (5 s). const HANDSHAKE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); +const FALLBACK_AUDIO_PROTOCOL_VERSION: u8 = 2; + +fn should_fallback_to_audio_v2(error: &str) -> bool { + error.contains("unsupported_version") + || error.contains("upgrade_required") + || error.contains("not supported") + || error.contains("relay max is v2") +} /// Connect to the relay's audio WebSocket and run the Opus encode/decode pipeline. /// -/// Returns `(cancel_token, pcm_sender)` — caller stores both in `HuddleState`. +/// Returns `(cancel_token, pcm_sender, screen_sender)` — caller stores them in +/// `HuddleState`. /// Dropping the sender or calling `cancel.cancel()` shuts down the relay task. pub(crate) async fn connect_audio_relay( channel_id: &str, parent_channel_id: Option<&str>, state: &AppState, -) -> Result<(CancellationToken, tokio::sync::mpsc::Sender>), String> { +) -> Result< + ( + CancellationToken, + tokio::sync::mpsc::Sender>, + Option>, + ), + String, +> { + match connect_audio_relay_with_version( + channel_id, + parent_channel_id, + state, + super::wire::PROTOCOL_VERSION, + ) + .await + { + Ok(v) => Ok(v), + Err(e) + if super::wire::PROTOCOL_VERSION > FALLBACK_AUDIO_PROTOCOL_VERSION + && should_fallback_to_audio_v2(&e) => + { + eprintln!( + "buzz-desktop: huddle relay does not support screen-share protocol yet; falling back to audio v{FALLBACK_AUDIO_PROTOCOL_VERSION}: {e}" + ); + connect_audio_relay_with_version( + channel_id, + parent_channel_id, + state, + FALLBACK_AUDIO_PROTOCOL_VERSION, + ) + .await + } + Err(e) => Err(e), + } +} + +async fn connect_audio_relay_with_version( + channel_id: &str, + parent_channel_id: Option<&str>, + state: &AppState, + protocol_version: u8, +) -> Result< + ( + CancellationToken, + tokio::sync::mpsc::Sender>, + Option>, + ), + String, +> { use nostr::JsonUtil; let relay_url = crate::relay::relay_ws_url_with_override(state); @@ -109,12 +166,12 @@ pub(crate) async fn connect_audio_relay( "type": "auth", "event": event_json, "parent_channel_id": parent_channel_id, - // Negotiate huddle audio protocol v2 (8-byte sender-authored header - // per Opus frame: seq | ts_48k | level_dbov | flags). See - // huddle::wire for the layout. The relay pins the first joiner's - // version per-room and rejects mismatched joiners with + // Negotiate huddle media protocol v3. Audio keeps the v2 per-Opus + // header (seq | ts_48k | level_dbov | flags) and v3 adds a one-byte + // media-kind envelope for screen sharing. The relay pins the first + // joiner's version per-room and rejects mismatched joiners with // `upgrade_required`. - "protocol_version": super::wire::PROTOCOL_VERSION, + "protocol_version": protocol_version, }); ws_tx .send(WsMsg::Text(auth_msg.to_string().into())) @@ -144,7 +201,11 @@ pub(crate) async fn connect_audio_relay( break Ok(peers); } Some("error") => { - break Err(format!("audio relay auth error: {}", v["message"])); + let code = v["code"].as_str().unwrap_or("unknown"); + break Err(format!( + "audio relay auth error ({code}): {}", + v["message"] + )); } _ => continue, } @@ -163,6 +224,7 @@ pub(crate) async fn connect_audio_relay( let cancel = CancellationToken::new(); let cancel_clone = cancel.clone(); let (pcm_tx, pcm_rx) = tokio::sync::mpsc::channel::>(50); + let (screen_tx, screen_rx) = tokio::sync::mpsc::channel::(8); let output_device_name = state .audio_output_device .lock() @@ -174,6 +236,12 @@ pub(crate) async fn connect_audio_relay( ws_tx, ws_rx, pcm_rx, + screen_rx: if protocol_version >= 3 { + Some(screen_rx) + } else { + None + }, + protocol_version, cancel: cancel_clone.clone(), app_handle: app_handle.clone(), initial_peers, @@ -197,17 +265,31 @@ pub(crate) async fn connect_audio_relay( } }); - Ok((cancel, pcm_tx)) + let screen_tx = if protocol_version >= 3 { + Some(screen_tx) + } else { + None + }; + + Ok((cancel, pcm_tx, screen_tx)) } /// Background Opus encode/decode pipeline spawned by `connect_audio_relay`. pub(crate) type WsStream = tokio_tungstenite::WebSocketStream>; +#[derive(Debug)] +pub(crate) enum ScreenRelayFrame { + Video(Vec), + Control(Vec), +} + struct AudioRelayPipelineArgs { ws_tx: futures_util::stream::SplitSink, ws_rx: futures_util::stream::SplitStream, pcm_rx: tokio::sync::mpsc::Receiver>, + screen_rx: Option>, + protocol_version: u8, cancel: CancellationToken, app_handle: Option, initial_peers: Vec<(u8, String)>, @@ -221,6 +303,8 @@ async fn audio_relay_pipeline(args: AudioRelayPipelineArgs) -> Result<(), String ws_tx, ws_rx, mut pcm_rx, + mut screen_rx, + protocol_version, cancel, app_handle, initial_peers, @@ -246,79 +330,100 @@ async fn audio_relay_pipeline(args: AudioRelayPipelineArgs) -> Result<(), String let cancel_send = cancel.clone(); let send_task = tokio::spawn(async move { - use super::wire::{audio_level_dbov, FrameHeader, V2_HEADER_LEN}; + use super::wire::{ + audio_level_dbov, FrameHeader, MEDIA_KIND_AUDIO, MEDIA_KIND_SCREEN_CONTROL, + MEDIA_KIND_SCREEN_FRAME, V2_HEADER_LEN, + }; let mut encoder = encoder; // Move encoder into task. const FRAME_SAMPLES: usize = 960; let mut out_buf = vec![0u8; 4000]; - // Per-frame wire-protocol state. We send v2 frames now: each Opus - // payload is preceded by an 8-byte header carrying our own seq + - // 48 kHz timestamp + audio level + flags. + // Per-frame wire-protocol state. v3 audio payloads retain the v2 + // header shape, wrapped in a one-byte media-kind envelope. let mut seq: u16 = 0; let mut ts_48k: u32 = 0; loop { - let pcm_bytes = { - use futures_util::future::Either; - let cancelled = std::pin::pin!(cancel_send.cancelled()); - let recv = std::pin::pin!(pcm_rx.recv()); - match futures_util::future::select(cancelled, recv).await { - Either::Left(_) => break, // Cancelled. - Either::Right((Some(b), _)) => b, - Either::Right((None, _)) => break, // Sender dropped. - } - }; - - if pcm_bytes.len() % 4 != 0 { - continue; // Malformed batch. - } - let samples: Vec = pcm_bytes - .chunks_exact(4) - .map(|b| f32::from_le_bytes([b[0], b[1], b[2], b[3]])) - .collect(); - - let mut tx = ws_tx_send.lock().await; - for chunk in samples.chunks(FRAME_SAMPLES) { - // dBov is computed from the pre-encode PCM. Opus DTX may - // produce a 1-2 byte comfort packet; computing level from - // the encoded payload would be meaningless. - let level = audio_level_dbov(chunk); - let encode_result = if chunk.len() == FRAME_SAMPLES { - encoder.encode_float(chunk, &mut out_buf) - } else { - let mut padded = chunk.to_vec(); - padded.resize(FRAME_SAMPLES, 0.0); - encoder.encode_float(&padded, &mut out_buf) - }; - let n = match encode_result { - Ok(n) => n, - Err(e) => { - eprintln!("buzz-desktop: opus encode error: {e}"); - continue; - } - }; - if n > 0 { - // Opus DTX packets are very small (≤2 bytes). Flag them - // explicitly so the receiver can elide DTX from speaker - // detection without re-parsing the Opus payload. - let flags = if n <= 2 { super::wire::FLAG_DTX } else { 0 }; - let header = FrameHeader { - seq, - ts_48k, - level_dbov: level, - flags, + tokio::select! { + _ = cancel_send.cancelled() => break, + screen = async { + match screen_rx.as_mut() { + Some(rx) => rx.recv().await, + None => std::future::pending().await, } - .encode(); - - // Build the v2 wire frame: 8-byte header + Opus payload. - let mut frame = Vec::with_capacity(V2_HEADER_LEN + n); - frame.extend_from_slice(&header); - frame.extend_from_slice(&out_buf[..n]); + } => { + let Some(screen) = screen else { break }; + let (kind, payload) = match screen { + ScreenRelayFrame::Video(bytes) => (MEDIA_KIND_SCREEN_FRAME, bytes), + ScreenRelayFrame::Control(bytes) => (MEDIA_KIND_SCREEN_CONTROL, bytes), + }; + let mut frame = Vec::with_capacity(1 + payload.len()); + frame.push(kind); + frame.extend_from_slice(&payload); + let mut tx = ws_tx_send.lock().await; if tx.send(WsMsg::Binary(frame.into())).await.is_err() { - return; // WS closed. + return; + } + } + pcm = pcm_rx.recv() => { + let Some(pcm_bytes) = pcm else { break }; + if pcm_bytes.len() % 4 != 0 { + continue; // Malformed batch. + } + let samples: Vec = pcm_bytes + .chunks_exact(4) + .map(|b| f32::from_le_bytes([b[0], b[1], b[2], b[3]])) + .collect(); + + let mut tx = ws_tx_send.lock().await; + for chunk in samples.chunks(FRAME_SAMPLES) { + // dBov is computed from the pre-encode PCM. Opus DTX may + // produce a 1-2 byte comfort packet; computing level from + // the encoded payload would be meaningless. + let level = audio_level_dbov(chunk); + let encode_result = if chunk.len() == FRAME_SAMPLES { + encoder.encode_float(chunk, &mut out_buf) + } else { + let mut padded = chunk.to_vec(); + padded.resize(FRAME_SAMPLES, 0.0); + encoder.encode_float(&padded, &mut out_buf) + }; + let n = match encode_result { + Ok(n) => n, + Err(e) => { + eprintln!("buzz-desktop: opus encode error: {e}"); + continue; + } + }; + if n > 0 { + // Opus DTX packets are very small (≤2 bytes). Flag them + // explicitly so the receiver can elide DTX from speaker + // detection without re-parsing the Opus payload. + let flags = if n <= 2 { super::wire::FLAG_DTX } else { 0 }; + let header = FrameHeader { + seq, + ts_48k, + level_dbov: level, + flags, + } + .encode(); + + let mut frame = if protocol_version >= 3 { + let mut frame = Vec::with_capacity(1 + V2_HEADER_LEN + n); + frame.push(MEDIA_KIND_AUDIO); + frame + } else { + Vec::with_capacity(V2_HEADER_LEN + n) + }; + frame.extend_from_slice(&header); + frame.extend_from_slice(&out_buf[..n]); + if tx.send(WsMsg::Binary(frame.into())).await.is_err() { + return; // WS closed. + } + + seq = seq.wrapping_add(1); + ts_48k = ts_48k.wrapping_add(super::jitter::FRAME_TIMESTAMP_DELTA); + } } - - seq = seq.wrapping_add(1); - ts_48k = ts_48k.wrapping_add(super::jitter::FRAME_TIMESTAMP_DELTA); } } } @@ -335,6 +440,7 @@ async fn audio_relay_pipeline(args: AudioRelayPipelineArgs) -> Result<(), String initial_peers, tts_active, tts_cancel, + protocol_version, )); // Wait for either task to finish, then abort the survivor. diff --git a/desktop/src-tauri/src/huddle/screen_share.rs b/desktop/src-tauri/src/huddle/screen_share.rs new file mode 100644 index 000000000..f47d9453b --- /dev/null +++ b/desktop/src-tauri/src/huddle/screen_share.rs @@ -0,0 +1,67 @@ +use tauri::State; + +use crate::app_state::AppState; + +use super::relay_api::ScreenRelayFrame; + +/// Maximum IPC screen-share frame size. The frontend sends low-rate JPEG +/// frames and drops anything above this cap before invoking, but keep the +/// Rust boundary defensive as well. +const MAX_SCREEN_FRAME_BYTES: usize = 512 * 1024; + +/// Maximum JSON control payload (cursor/share-state) for screen sharing. +const MAX_SCREEN_CONTROL_BYTES: usize = 2 * 1024; + +/// Receive a compressed screen-share frame from the frontend and fan it out +/// over the huddle relay. The frame is a JPEG payload; metadata is fixed by +/// the receiver event (`image/jpeg`) to keep the realtime frame small. +#[tauri::command] +pub fn push_huddle_screen_frame( + request: tauri::ipc::Request<'_>, + state: State<'_, AppState>, +) -> Result<(), String> { + match request.body() { + tauri::ipc::InvokeBody::Raw(bytes) => { + if bytes.len() > MAX_SCREEN_FRAME_BYTES { + return Err(format!( + "screen frame too large: {} bytes (max {})", + bytes.len(), + MAX_SCREEN_FRAME_BYTES + )); + } + if bytes.is_empty() { + return Err("screen frame is empty".to_string()); + } + let hs = state.huddle()?; + let Some(ref tx) = hs.screen_relay_tx else { + return Err("huddle screen share is not connected".to_string()); + }; + let _ = tx.try_send(ScreenRelayFrame::Video(bytes.to_vec())); + Ok(()) + } + _ => Err("expected raw binary body".to_string()), + } +} + +/// Send a small JSON control payload for screen sharing (cursor position, +/// share start/stop state) over the huddle relay. +#[tauri::command] +pub fn push_huddle_screen_control( + control: serde_json::Value, + state: State<'_, AppState>, +) -> Result<(), String> { + let bytes = serde_json::to_vec(&control).map_err(|e| format!("screen control JSON: {e}"))?; + if bytes.len() > MAX_SCREEN_CONTROL_BYTES { + return Err(format!( + "screen control too large: {} bytes (max {})", + bytes.len(), + MAX_SCREEN_CONTROL_BYTES + )); + } + let hs = state.huddle()?; + let Some(ref tx) = hs.screen_relay_tx else { + return Err("huddle screen share is not connected".to_string()); + }; + let _ = tx.try_send(ScreenRelayFrame::Control(bytes)); + Ok(()) +} diff --git a/desktop/src-tauri/src/huddle/state.rs b/desktop/src-tauri/src/huddle/state.rs index 876c2d688..8cc360b90 100644 --- a/desktop/src-tauri/src/huddle/state.rs +++ b/desktop/src-tauri/src/huddle/state.rs @@ -50,6 +50,12 @@ pub struct HuddleState { /// Sends PCM batches from push_audio_pcm to the audio relay encode thread. #[serde(skip)] pub audio_relay_pcm_tx: Option>>, + /// Sends screen-share frames/control messages to the huddle relay task. + #[serde(skip)] + pub screen_relay_tx: Option>, + /// Whether the negotiated huddle relay protocol supports screen sharing. + #[serde(default)] + pub screen_share_available: bool, /// Participant pubkey hex strings (all members, including humans). pub participants: Vec, /// Agent pubkeys only — used as p-tags on transcribed messages. @@ -150,6 +156,8 @@ impl Clone for HuddleState { ephemeral_channel_id: self.ephemeral_channel_id.clone(), audio_ws_cancel: None, // Never clone handles. audio_relay_pcm_tx: None, // Never clone handles. + screen_relay_tx: None, // Never clone handles. + screen_share_available: self.screen_share_available, participants: self.participants.clone(), agent_pubkeys: Arc::new(Mutex::new(agent_pubkeys_snapshot)), stt_pipeline: None, // Never clone the pipeline handle. @@ -177,6 +185,8 @@ impl Default for HuddleState { ephemeral_channel_id: None, audio_ws_cancel: None, audio_relay_pcm_tx: None, + screen_relay_tx: None, + screen_share_available: false, participants: Vec::new(), agent_pubkeys: Arc::new(Mutex::new(Vec::new())), stt_pipeline: None, diff --git a/desktop/src-tauri/src/huddle/wire.rs b/desktop/src-tauri/src/huddle/wire.rs index d315dd7f2..71b43848e 100644 --- a/desktop/src-tauri/src/huddle/wire.rs +++ b/desktop/src-tauri/src/huddle/wire.rs @@ -9,7 +9,7 @@ //! Kept for backward compatibility — relay still admits v1 clients into //! v1-pinned rooms — but new clients always speak v2. //! -//! ## v2 (this commit) +//! ## v2 //! //! Client → relay: `` //! Relay → client: `` @@ -34,10 +34,26 @@ //! * Negotiation lives in the WS auth message (`protocol_version: 2`), not //! in any bit of `flags`. Mixed-version rooms are rejected at the relay //! with `upgrade_required`. +//! +//! ## v3 +//! +//! Client → relay: `` +//! Relay → client: `` +//! +//! Audio payloads keep the v2 shape (`
`). The one-byte +//! kind lets the same huddle websocket carry screen-share JPEG frames and +//! tiny JSON control messages without feeding those bytes to the Opus decoder. /// Wire protocol version this client speaks. Bumped only when the frame /// layout itself changes; the relay tracks pinned per-room. -pub const PROTOCOL_VERSION: u8 = 2; +pub const PROTOCOL_VERSION: u8 = 3; + +/// v3 media kind: payload is a v2 audio frame (`FrameHeader` + Opus bytes). +pub const MEDIA_KIND_AUDIO: u8 = 0x01; +/// v3 media kind: payload is a JPEG screen-share frame. +pub const MEDIA_KIND_SCREEN_FRAME: u8 = 0x02; +/// v3 media kind: payload is UTF-8 JSON screen-share control data. +pub const MEDIA_KIND_SCREEN_CONTROL: u8 = 0x03; /// Length of the v2 per-frame header in bytes. pub const V2_HEADER_LEN: usize = 8; diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index 369046a9d..ff8950a9e 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -34,7 +34,8 @@ use huddle::audio_output::{ use huddle::{ add_agent_to_huddle, check_pipeline_hotstart, confirm_huddle_active, download_voice_models, end_huddle, get_huddle_agent_pubkeys, get_huddle_state, get_model_status, get_voice_input_mode, - join_huddle, leave_huddle, push_audio_pcm, set_huddle_transcription_enabled, set_tts_enabled, + join_huddle, leave_huddle, push_audio_pcm, push_huddle_screen_control, + push_huddle_screen_frame, set_huddle_transcription_enabled, set_tts_enabled, set_voice_input_mode, speak_agent_message, start_huddle, start_stt_pipeline, }; use managed_agents::{ @@ -571,6 +572,8 @@ pub fn run() { end_huddle, get_huddle_state, push_audio_pcm, + push_huddle_screen_frame, + push_huddle_screen_control, start_stt_pipeline, set_huddle_transcription_enabled, download_voice_models, diff --git a/desktop/src/features/huddle/components/HuddleBar.tsx b/desktop/src/features/huddle/components/HuddleBar.tsx index f5fd84907..e16ebf44b 100644 --- a/desktop/src/features/huddle/components/HuddleBar.tsx +++ b/desktop/src/features/huddle/components/HuddleBar.tsx @@ -3,9 +3,11 @@ import { listen } from "@tauri-apps/api/event"; import { Bot, Captions, + EllipsisVertical, MessageSquareText, PhoneOff, SmilePlus, + UsersRound, } from "lucide-react"; import * as React from "react"; @@ -24,13 +26,23 @@ import { import { cn } from "@/shared/lib/cn"; import { rewriteRelayUrl } from "@/shared/lib/mediaUrl"; import { Button } from "@/shared/ui/button"; +import { + DropdownMenu, + DropdownMenuContent, + DropdownMenuItem, + DropdownMenuSub, + DropdownMenuSubContent, + DropdownMenuSubTrigger, + DropdownMenuTrigger, +} from "@/shared/ui/dropdown-menu"; import { useEmojiBurst } from "@/shared/ui/EmojiBurstProvider"; import { Popover, PopoverContent, PopoverTrigger } from "@/shared/ui/popover"; import { Tooltip, TooltipContent, TooltipTrigger } from "@/shared/ui/tooltip"; import { useHuddle } from "../HuddleContext"; import { AddAgentDialog, type AgentAddResult } from "./AddAgentDialog"; import { MicControls, SpeakerControls } from "./MicControls"; -import { HuddleParticipantsControl } from "./ParticipantList"; +import { HuddleParticipantsPanel } from "./ParticipantList"; +import { ScreenShareControls } from "./ScreenShareControls"; // Mirrors HuddleState in src-tauri/src/huddle/mod.rs. type HuddleState = { @@ -47,6 +59,7 @@ type HuddleState = { agent_pubkeys: string[]; tts_enabled: boolean; transcription_enabled: boolean; + screen_share_available: boolean; is_creator: boolean; voice_input_mode: "push_to_talk" | "voice_activity"; }; @@ -524,6 +537,31 @@ export function HuddleBar({ onOpenThread?.(parentChannelId, huddleThreadEventId); } + async function handleRemoveAgentFromHuddle(pubkey: string) { + const channelId = barState?.ephemeral_channel_id; + if (!channelId) return; + const confirmed = window.confirm("Remove this agent from the huddle?"); + if (!confirmed) return; + try { + await invoke("remove_channel_member", { + channelId, + pubkey, + }); + // Optimistically remove from local state — the backend's 15s membership + // poll will eventually converge. + setState((prev) => { + if (!prev) return prev; + return { + ...prev, + participants: prev.participants.filter((p) => p !== pubkey), + agent_pubkeys: prev.agent_pubkeys.filter((p) => p !== pubkey), + }; + }); + } catch (e) { + console.error("Failed to remove agent from huddle:", e); + } + } + return (
+ +
+ +
- -
- { - if (!barState.ephemeral_channel_id) return; - const confirmed = window.confirm( - "Remove this agent from the huddle?", - ); - if (!confirmed) return; - try { - await invoke("remove_channel_member", { - channelId: barState.ephemeral_channel_id, - pubkey, - }); - // Optimistically remove from local state — the backend's - // 15s membership poll will eventually converge. - setState((prev) => { - if (!prev) return prev; - return { - ...prev, - participants: prev.participants.filter((p) => p !== pubkey), - agent_pubkeys: prev.agent_pubkeys.filter( - (p) => p !== pubkey, - ), - }; - }); - } catch (e) { - console.error("Failed to remove agent from huddle:", e); - } - }} - /> - - - - - - {transcriptionEnabled ? "Stop transcript" : "Start transcript"} - - - - - - + + + + More + + + + setShowAddAgent(true)}> - - - - Add agent - - + Add agent + + void handleToggleTranscript()}> + + {transcriptionEnabled ? "Stop transcript" : "Start transcript"} + + + + + Participants + {barState.participants.length > 0 ? ( + + ({barState.participants.length}) + + ) : null} + + + + void handleRemoveAgentFromHuddle(pubkey) + } + className="p-3" + /> + + + +
diff --git a/desktop/src/features/huddle/components/ParticipantList.tsx b/desktop/src/features/huddle/components/ParticipantList.tsx index a99e5aa6d..fa0461805 100644 --- a/desktop/src/features/huddle/components/ParticipantList.tsx +++ b/desktop/src/features/huddle/components/ParticipantList.tsx @@ -7,7 +7,7 @@ import { cn } from "@/shared/lib/cn"; import { Button } from "@/shared/ui/button"; import { Popover, PopoverContent, PopoverTrigger } from "@/shared/ui/popover"; -type ParticipantListProps = { +type ParticipantListBaseProps = { /** Pubkey hex strings from the Rust huddle state */ participants: string[]; activeSpeakers?: string[]; @@ -15,6 +15,9 @@ type ParticipantListProps = { agentPubkeys?: string[]; /** Called when the user clicks the remove button on an agent avatar */ onRemoveAgent?: (pubkey: string) => void; +}; + +type ParticipantListProps = ParticipantListBaseProps & { className?: string; }; @@ -25,20 +28,8 @@ export function HuddleParticipantsControl({ onRemoveAgent, className, }: ParticipantListProps) { - const { data } = useUsersBatchQuery(participants); - const profiles = data?.profiles ?? {}; - const agentSet = React.useMemo( - () => new Set(agentPubkeys ?? []), - [agentPubkeys], - ); - if (participants.length === 0) return null; - const participantLabel = - participants.length === 1 - ? "1 participant" - : `${participants.length} participants`; - return ( @@ -62,70 +53,104 @@ export function HuddleParticipantsControl({ -
-

Participants

- - {participantLabel} - -
-
    - {participants.map((pubkey) => { - const profile = profiles[pubkey.toLowerCase()]; - const displayName = - profile?.displayName || `Participant ${pubkey.slice(0, 8)}`; - const isActive = activeSpeakers?.includes(pubkey); - const isAgent = agentSet.has(pubkey); + + + + ); +} + +export function HuddleParticipantsPanel({ + participants, + activeSpeakers, + agentPubkeys, + onRemoveAgent, + className, +}: ParticipantListProps) { + const { data } = useUsersBatchQuery(participants); + const profiles = data?.profiles ?? {}; + const agentSet = React.useMemo( + () => new Set(agentPubkeys ?? []), + [agentPubkeys], + ); + + if (participants.length === 0) return null; - return ( -
  • - {profile?.displayName || profile?.avatarUrl ? ( - - ) : ( - - )} + const participantLabel = + participants.length === 1 + ? "1 participant" + : `${participants.length} participants`; + + return ( +
    +
    +

    Participants

    + + {participantLabel} + +
    +
      + {participants.map((pubkey) => { + const profile = profiles[pubkey.toLowerCase()]; + const displayName = + profile?.displayName || `Participant ${pubkey.slice(0, 8)}`; + const isActive = activeSpeakers?.includes(pubkey); + const isAgent = agentSet.has(pubkey); -
      -
      - {displayName} -
      -
      - {isActive ? "Speaking" : isAgent ? "Agent" : "In huddle"} -
      + return ( +
    • + {profile?.displayName || profile?.avatarUrl ? ( + + ) : ( + + )} + +
      +
      + {displayName} +
      +
      + {isActive ? "Speaking" : isAgent ? "Agent" : "In huddle"}
      +
      - {isAgent && onRemoveAgent && ( - - )} -
    • - ); - })} -
    - - + {isAgent && onRemoveAgent && ( + + )} +
  • + ); + })} +
+ ); } diff --git a/desktop/src/features/huddle/components/ScreenShareControls.tsx b/desktop/src/features/huddle/components/ScreenShareControls.tsx new file mode 100644 index 000000000..b09d16f19 --- /dev/null +++ b/desktop/src/features/huddle/components/ScreenShareControls.tsx @@ -0,0 +1,557 @@ +import { invoke } from "@tauri-apps/api/core"; +import { listen } from "@tauri-apps/api/event"; +import { MousePointer2, ScreenShare, ScreenShareOff, X } from "lucide-react"; +import * as React from "react"; + +import { cn } from "@/shared/lib/cn"; +import { Button } from "@/shared/ui/button"; +import { Tooltip, TooltipContent, TooltipTrigger } from "@/shared/ui/tooltip"; +import { invokeRawBinary } from "../lib/tauriRawBinary"; + +type HuddleScreenFrameEvent = { + pubkey: string; + mime_type: string; + data_base64: string; +}; + +type ScreenCursorControl = { + type: "cursor"; + x: number; + y: number; + visible: boolean; +}; + +type ScreenShareStateControl = { + type: "share_state"; + active: boolean; +}; + +type ScreenShareControl = ScreenCursorControl | ScreenShareStateControl; + +type HuddleScreenControlEvent = { + pubkey: string; + control: ScreenShareControl | Record; +}; + +type RemoteScreenShare = { + pubkey: string; + src: string | null; + lastFrameAt: number; + stale: boolean; + cursor: { + x: number; + y: number; + visible: boolean; + } | null; +}; + +type ScreenShareControlsProps = { + available: boolean; + localPubkey: string | null; +}; + +type DisplayMediaVideoConstraints = MediaTrackConstraints & { + cursor?: "always" | "motion" | "never"; + displaySurface?: "browser" | "monitor" | "window"; +}; + +const SCREEN_FRAME_INTERVAL_MS = 250; +const SCREEN_FRAME_MAX_WIDTH = 1280; +const SCREEN_FRAME_MAX_HEIGHT = 800; +const SCREEN_FRAME_JPEG_QUALITY = 0.56; +const SCREEN_FRAME_MAX_BYTES = 500 * 1024; +const CURSOR_SEND_INTERVAL_MS = 80; +const REMOTE_STALE_AFTER_MS = 6_000; + +function clamp01(value: number): number { + return Math.min(1, Math.max(0, value)); +} + +function isCursorControl(control: unknown): control is ScreenCursorControl { + if (!control || typeof control !== "object") return false; + const c = control as ScreenCursorControl; + return ( + c.type === "cursor" && + typeof c.x === "number" && + typeof c.y === "number" && + typeof c.visible === "boolean" + ); +} + +function isShareStateControl( + control: unknown, +): control is ScreenShareStateControl { + if (!control || typeof control !== "object") return false; + const c = control as ScreenShareStateControl; + return c.type === "share_state" && typeof c.active === "boolean"; +} + +function screenDisplayName(pubkey: string | null): string { + return pubkey ? `Participant ${pubkey.slice(0, 8)}` : "Participant"; +} + +function canvasToJpegBlob(canvas: HTMLCanvasElement): Promise { + return new Promise((resolve) => { + canvas.toBlob(resolve, "image/jpeg", SCREEN_FRAME_JPEG_QUALITY); + }); +} + +function scaledFrameSize(width: number, height: number) { + if (width <= 0 || height <= 0) return { width: 0, height: 0 }; + const scale = Math.min( + 1, + SCREEN_FRAME_MAX_WIDTH / width, + SCREEN_FRAME_MAX_HEIGHT / height, + ); + return { + width: Math.max(1, Math.round(width * scale)), + height: Math.max(1, Math.round(height * scale)), + }; +} + +async function requestDisplayStream(): Promise { + const video: DisplayMediaVideoConstraints = { + cursor: "always", + frameRate: { ideal: 4, max: 5 }, + height: { ideal: SCREEN_FRAME_MAX_HEIGHT }, + width: { ideal: SCREEN_FRAME_MAX_WIDTH }, + }; + return navigator.mediaDevices.getDisplayMedia({ + audio: false, + video, + } as DisplayMediaStreamOptions); +} + +export function ScreenShareControls({ + available, + localPubkey, +}: ScreenShareControlsProps) { + const [localStream, setLocalStream] = React.useState( + null, + ); + const [isStarting, setIsStarting] = React.useState(false); + const [error, setError] = React.useState(null); + const [remoteShare, setRemoteShare] = + React.useState(null); + const [localCursor, setLocalCursor] = React.useState<{ + x: number; + y: number; + visible: boolean; + } | null>(null); + + const localVideoRef = React.useRef(null); + const captureVideoRef = React.useRef(null); + const captureCanvasRef = React.useRef(null); + const frameTimerRef = React.useRef(null); + const isSharingRef = React.useRef(false); + const lastCursorSentAtRef = React.useRef(0); + + const pushControl = React.useCallback( + (control: ScreenShareControl): Promise => + invoke("push_huddle_screen_control", { control }), + [], + ); + + const clearFrameTimer = React.useCallback(() => { + if (frameTimerRef.current !== null) { + window.clearTimeout(frameTimerRef.current); + frameTimerRef.current = null; + } + }, []); + + const stopLocalShare = React.useCallback( + (notify = true) => { + clearFrameTimer(); + isSharingRef.current = false; + captureVideoRef.current?.pause(); + captureVideoRef.current = null; + captureCanvasRef.current = null; + setLocalCursor(null); + setLocalStream((stream) => { + stream?.getTracks().forEach((track) => { + track.stop(); + }); + return null; + }); + if (notify) { + void pushControl({ type: "share_state", active: false }).catch(() => { + /* best-effort */ + }); + } + }, + [clearFrameTimer, pushControl], + ); + + React.useEffect(() => { + return () => stopLocalShare(); + }, [stopLocalShare]); + + React.useEffect(() => { + const video = localVideoRef.current; + if (!video) return; + video.srcObject = localStream; + if (localStream) { + void video.play().catch(() => { + /* preview can fail if the element is mid-unmount */ + }); + } + return () => { + video.srcObject = null; + }; + }, [localStream]); + + React.useEffect(() => { + let disposed = false; + let unlistenFrame: (() => void) | null = null; + let unlistenControl: (() => void) | null = null; + + void listen("huddle-screen-frame", (event) => { + if (disposed || event.payload.pubkey === localPubkey) return; + const { + pubkey, + mime_type: mimeType, + data_base64: dataBase64, + } = event.payload; + setRemoteShare((prev) => ({ + pubkey, + src: `data:${mimeType};base64,${dataBase64}`, + lastFrameAt: Date.now(), + stale: false, + cursor: prev?.pubkey === pubkey ? prev.cursor : null, + })); + }).then((fn) => { + if (disposed) fn(); + else unlistenFrame = fn; + }); + + void listen("huddle-screen-control", (event) => { + if (disposed || event.payload.pubkey === localPubkey) return; + const { pubkey, control } = event.payload; + if (isShareStateControl(control)) { + setRemoteShare((prev) => { + if (!control.active && prev?.pubkey === pubkey) return null; + if (control.active && !prev) { + return { + pubkey, + src: null, + lastFrameAt: Date.now(), + stale: false, + cursor: null, + }; + } + return prev; + }); + return; + } + + if (!isCursorControl(control)) return; + setRemoteShare((prev) => { + if (!prev || prev.pubkey !== pubkey) return prev; + return { + ...prev, + cursor: { + x: clamp01(control.x), + y: clamp01(control.y), + visible: control.visible, + }, + }; + }); + }).then((fn) => { + if (disposed) fn(); + else unlistenControl = fn; + }); + + return () => { + disposed = true; + unlistenFrame?.(); + unlistenControl?.(); + }; + }, [localPubkey]); + + React.useEffect(() => { + const id = window.setInterval(() => { + const now = Date.now(); + setRemoteShare((prev) => { + if (!prev) return prev; + if (now - prev.lastFrameAt < REMOTE_STALE_AFTER_MS) return prev; + return { ...prev, stale: true }; + }); + }, 1_000); + return () => window.clearInterval(id); + }, []); + + const scheduleFrame = React.useCallback(() => { + clearFrameTimer(); + frameTimerRef.current = window.setTimeout(async () => { + if (!isSharingRef.current) return; + + const video = captureVideoRef.current; + if (!video || video.videoWidth === 0 || video.videoHeight === 0) { + scheduleFrame(); + return; + } + + const { width, height } = scaledFrameSize( + video.videoWidth, + video.videoHeight, + ); + if (!width || !height) { + scheduleFrame(); + return; + } + + const canvas = + captureCanvasRef.current ?? document.createElement("canvas"); + captureCanvasRef.current = canvas; + if (canvas.width !== width) canvas.width = width; + if (canvas.height !== height) canvas.height = height; + const ctx = canvas.getContext("2d", { alpha: false }); + if (!ctx) { + scheduleFrame(); + return; + } + + try { + ctx.drawImage(video, 0, 0, width, height); + } catch (drawError) { + console.warn("[huddle] Screen frame draw skipped:", drawError); + scheduleFrame(); + return; + } + const blob = await canvasToJpegBlob(canvas); + if (blob && blob.size <= SCREEN_FRAME_MAX_BYTES) { + const bytes = new Uint8Array(await blob.arrayBuffer()); + void invokeRawBinary("push_huddle_screen_frame", bytes).catch((err) => { + console.error("[huddle] Failed to send screen frame:", err); + }); + } + + scheduleFrame(); + }, SCREEN_FRAME_INTERVAL_MS); + }, [clearFrameTimer]); + + const startLocalShare = React.useCallback(async () => { + if (isStarting || localStream) return; + if (!available) { + setError("Screen sharing needs an updated huddle relay."); + return; + } + if (!navigator.mediaDevices?.getDisplayMedia) { + setError("Screen sharing is not available in this app window."); + return; + } + + setError(null); + setIsStarting(true); + try { + const stream = await requestDisplayStream(); + const [track] = stream.getVideoTracks(); + if (!track) { + stream.getTracks().forEach((t) => { + t.stop(); + }); + throw new Error("No screen video track was selected."); + } + + const captureVideo = document.createElement("video"); + captureVideo.autoplay = true; + captureVideo.muted = true; + captureVideo.playsInline = true; + captureVideo.srcObject = stream; + captureVideoRef.current = captureVideo; + + void captureVideo.play().catch((playError) => { + console.warn( + "[huddle] Screen capture preview playback was delayed:", + playError, + ); + }); + + track.addEventListener("ended", () => { + setError("Screen sharing stopped."); + stopLocalShare(); + }); + isSharingRef.current = true; + setLocalStream(stream); + void pushControl({ type: "share_state", active: true }).catch(() => { + /* best-effort */ + }); + scheduleFrame(); + } catch (err) { + stopLocalShare(false); + const message = err instanceof Error ? err.message : String(err); + setError(message || "Screen sharing failed."); + } finally { + setIsStarting(false); + } + }, [ + available, + isStarting, + localStream, + pushControl, + scheduleFrame, + stopLocalShare, + ]); + + const handleCursorMove = React.useCallback( + (event: React.PointerEvent) => { + if (!localStream) return; + const rect = event.currentTarget.getBoundingClientRect(); + const cursor = { + x: clamp01((event.clientX - rect.left) / rect.width), + y: clamp01((event.clientY - rect.top) / rect.height), + visible: true, + }; + setLocalCursor(cursor); + + const now = Date.now(); + if (now - lastCursorSentAtRef.current < CURSOR_SEND_INTERVAL_MS) return; + lastCursorSentAtRef.current = now; + void pushControl({ type: "cursor", ...cursor }).catch(() => { + /* best-effort */ + }); + }, + [localStream, pushControl], + ); + + const handleCursorLeave = React.useCallback(() => { + if (!localStream) return; + const cursor = { x: localCursor?.x ?? 0.5, y: localCursor?.y ?? 0.5 }; + setLocalCursor((prev) => (prev ? { ...prev, visible: false } : null)); + void pushControl({ type: "cursor", ...cursor, visible: false }).catch( + () => { + /* best-effort */ + }, + ); + }, [localCursor, localStream, pushControl]); + + const showingLocal = Boolean(localStream); + const showPanel = showingLocal || Boolean(remoteShare); + const cursor = showingLocal ? localCursor : remoteShare?.cursor; + const cursorVisible = Boolean(cursor?.visible); + const title = showingLocal + ? "You are sharing" + : remoteShare + ? `${screenDisplayName(remoteShare.pubkey)} is sharing` + : "Screen share"; + + return ( + <> + + + + + + {error ?? + (!available + ? "Screen sharing needs an updated huddle relay" + : localStream + ? "Stop sharing screen" + : "Share screen")} + + + + {showPanel && ( +
+
+
+
+

{title}

+ {remoteShare?.stale && !showingLocal && ( +

+ Reconnecting... +

+ )} +
+ {showingLocal && ( + + )} +
+ +
+
+ {showingLocal ? ( +
+
+
+
+ )} + + + {error ? `Screen share error: ${error}` : ""} + + + ); +} diff --git a/desktop/src/features/huddle/lib/audioWorklet.ts b/desktop/src/features/huddle/lib/audioWorklet.ts index 677f4ac0a..5720aed8e 100644 --- a/desktop/src/features/huddle/lib/audioWorklet.ts +++ b/desktop/src/features/huddle/lib/audioWorklet.ts @@ -1,20 +1,6 @@ import { listen, type UnlistenFn } from "@tauri-apps/api/event"; -/** - * Raw binary invoke — uses Tauri's internal IPC for zero-copy ArrayBuffer transfer. - * - * The typed @tauri-apps/api doesn't support raw binary payloads (InvokeBody::Raw). - * This wrapper isolates the internal API dependency to a single call site. - * Tested against Tauri v2. If this breaks on upgrade, only this function needs updating. - */ -function invokeRawBinary(cmd: string, payload: Uint8Array): Promise { - // biome-ignore lint/suspicious/noExplicitAny: Tauri internals have no public type definition - const internals = (window as any).__TAURI_INTERNALS__; - if (!internals?.invoke) { - return Promise.reject(new Error("Tauri internals not available")); - } - return internals.invoke(cmd, payload); -} +import { invokeRawBinary } from "./tauriRawBinary"; /** Return type for setupAudioWorklet — stop + mode control. */ export type AudioWorkletHandle = { diff --git a/desktop/src/features/huddle/lib/tauriRawBinary.ts b/desktop/src/features/huddle/lib/tauriRawBinary.ts new file mode 100644 index 000000000..edb5c7796 --- /dev/null +++ b/desktop/src/features/huddle/lib/tauriRawBinary.ts @@ -0,0 +1,18 @@ +/** + * Raw binary invoke — uses Tauri's internal IPC for zero-copy ArrayBuffer transfer. + * + * The typed @tauri-apps/api doesn't expose InvokeBody::Raw. Keep the internal + * dependency isolated here so both audio and screen-share fast paths share the + * same escape hatch. + */ +export function invokeRawBinary( + cmd: string, + payload: Uint8Array, +): Promise { + // biome-ignore lint/suspicious/noExplicitAny: Tauri internals have no public type definition + const internals = (window as any).__TAURI_INTERNALS__; + if (!internals?.invoke) { + return Promise.reject(new Error("Tauri internals not available")); + } + return internals.invoke(cmd, payload); +} diff --git a/desktop/src/testing/e2eBridge.ts b/desktop/src/testing/e2eBridge.ts index eaaf6eaa4..ed01e7455 100644 --- a/desktop/src/testing/e2eBridge.ts +++ b/desktop/src/testing/e2eBridge.ts @@ -69,6 +69,25 @@ type MockSearchProfileSeed = { isAgent?: boolean; }; +type MockHuddleState = { + phase: + | "idle" + | "creating" + | "connecting" + | "connected" + | "active" + | "leaving"; + parent_channel_id: string | null; + ephemeral_channel_id: string | null; + participants: string[]; + agent_pubkeys: string[]; + tts_enabled: boolean; + transcription_enabled: boolean; + screen_share_available: boolean; + is_creator: boolean; + voice_input_mode: "push_to_talk" | "voice_activity"; +}; + type E2eConfig = { mode?: "mock" | "relay"; mock?: { @@ -103,6 +122,7 @@ type E2eConfig = { updateDownloadDelayMs?: number; restartDelayMs?: number; updateVersion?: string; + huddleState?: MockHuddleState | null; stallWebsocketSends?: boolean; userSearchDelayMs?: number; // NIP-IA gate inputs — see tests/helpers/bridge.ts:MockBridgeOptions for @@ -2055,6 +2075,7 @@ const mockSockets = new Map(); let mockWebsocketSendMutexWedged = false; const realSockets = new Map(); let mockManagedAgents: MockManagedAgent[] = []; +let mockHuddleState: MockHuddleState | null = null; // Mesh-compute mock state — TEST-ONLY. // @@ -2090,6 +2111,42 @@ function resetMockMesh() { mockMeshState.nodeState = "off"; mockMeshState.nodeMode = null; } + +function cloneMockHuddleState(state: MockHuddleState): MockHuddleState { + return { + ...state, + agent_pubkeys: [...state.agent_pubkeys], + participants: [...state.participants], + }; +} + +function idleMockHuddleState(): MockHuddleState { + return { + agent_pubkeys: [], + ephemeral_channel_id: null, + is_creator: false, + parent_channel_id: null, + participants: [], + phase: "idle", + screen_share_available: false, + transcription_enabled: false, + tts_enabled: false, + voice_input_mode: "voice_activity", + }; +} + +function resetMockHuddle(config: E2eConfig | undefined) { + mockHuddleState = config?.mock?.huddleState + ? cloneMockHuddleState(config.mock.huddleState) + : null; +} + +function currentMockHuddleState(): MockHuddleState { + return mockHuddleState + ? cloneMockHuddleState(mockHuddleState) + : idleMockHuddleState(); +} + let mockPersonas: RawPersona[] = []; let mockTeams: RawTeam[] = []; // Listeners registered via the mock __TAURI_INTERNALS__.listen — keyed by event name. @@ -6773,6 +6830,7 @@ export function maybeInstallE2eTauriMocks() { seedMockSearchProfiles(config); resetMockWorkflows(); resetMockMesh(); + resetMockHuddle(config); resetMockUserStatuses(); mockWebsocketSendMutexWedged = false; mockWindows("main"); @@ -7490,6 +7548,52 @@ export function maybeInstallE2eTauriMocks() { ); case "get_media_proxy_port": return MOCK_MEDIA_PROXY_PORT; + case "get_huddle_state": + return currentMockHuddleState(); + case "get_huddle_agent_pubkeys": + return currentMockHuddleState().agent_pubkeys; + case "get_model_status": + return { stt: "ready", tts: "ready" }; + case "set_tts_enabled": + if (mockHuddleState) { + mockHuddleState.tts_enabled = Boolean( + (payload as { enabled?: boolean }).enabled, + ); + } + return undefined; + case "set_huddle_transcription_enabled": + if (mockHuddleState) { + mockHuddleState.transcription_enabled = Boolean( + (payload as { enabled?: boolean }).enabled, + ); + } + return undefined; + case "add_agent_to_huddle": { + if (!mockHuddleState) return undefined; + const agentPubkey = + (payload as { agentPubkey?: string; pubkey?: string }).agentPubkey ?? + (payload as { agentPubkey?: string; pubkey?: string }).pubkey; + if ( + agentPubkey && + !mockHuddleState.agent_pubkeys.includes(agentPubkey) + ) { + mockHuddleState.agent_pubkeys.push(agentPubkey); + mockHuddleState.participants.push(agentPubkey); + } + return { + ephemeral_added: true, + parent_added: true, + parent_error: null, + }; + } + case "leave_huddle": + case "end_huddle": + mockHuddleState = null; + return undefined; + case "confirm_huddle_active": + case "push_huddle_screen_control": + case "push_huddle_screen_frame": + return undefined; case "pick_and_upload_media": return await resolveMockUploadDescriptors(activeConfig); case "upload_media_bytes": diff --git a/desktop/tests/helpers/bridge.ts b/desktop/tests/helpers/bridge.ts index 2d1cc2561..9f953ffe9 100644 --- a/desktop/tests/helpers/bridge.ts +++ b/desktop/tests/helpers/bridge.ts @@ -67,6 +67,25 @@ type MockSearchProfileSeed = { isAgent?: boolean; }; +type MockHuddleState = { + phase: + | "idle" + | "creating" + | "connecting" + | "connected" + | "active" + | "leaving"; + parent_channel_id: string | null; + ephemeral_channel_id: string | null; + participants: string[]; + agent_pubkeys: string[]; + tts_enabled: boolean; + transcription_enabled: boolean; + screen_share_available: boolean; + is_creator: boolean; + voice_input_mode: "push_to_talk" | "voice_activity"; +}; + type MockRelayAgentSeed = { pubkey: string; name: string; @@ -125,6 +144,7 @@ type MockBridgeOptions = { updateChannelDelayMs?: number; updateDownloadDelayMs?: number; updateVersion?: string; + huddleState?: MockHuddleState | null; stallWebsocketSends?: boolean; userSearchDelayMs?: number; // NIP-IA gate inputs — drive the archive-button gate matrix in diff --git a/desktop/tests/helpers/screenshot.mjs b/desktop/tests/helpers/screenshot.mjs index 2d1d0b392..7b6411462 100644 --- a/desktop/tests/helpers/screenshot.mjs +++ b/desktop/tests/helpers/screenshot.mjs @@ -22,6 +22,7 @@ // --outdir Output directory (default: test-results/screenshots) // --messages JSON file with messages to inject before capture // --update-ready Mock an available update so the sidebar update card renders +// --huddle-active Mock an active huddle with screen sharing enabled import { parseArgs } from "node:util"; import { existsSync, mkdirSync, readFileSync } from "node:fs"; @@ -42,6 +43,7 @@ const { values: args } = parseArgs({ outdir: { type: "string", default: "test-results/screenshots" }, messages: { type: "string" }, "update-ready": { type: "boolean", default: false }, + "huddle-active": { type: "boolean", default: false }, }, strict: true, }); @@ -69,6 +71,12 @@ function bail(msg) { const BASE_URL = "http://127.0.0.1:4173"; const DEFAULT_MOCK_PUBKEY = "deadbeef".repeat(8); +const ALICE_PUBKEY = + "953d3363262e86b770419834c53d2446409db6d918a57f8f339d495d54ab001f"; +const BOB_PUBKEY = + "bb22a5299220cad76ffd46190ccbeede8ab5dc260faa28b6e5a2cb31b9aff260"; +const ENGINEERING_CHANNEL_ID = "1c7e1c02-87bb-5e88-b2da-5a7a9432d0c9"; +const HUDDLE_EPHEMERAL_CHANNEL_ID = "5aa9f4fd-437b-5834-9128-00e2e2e0041d"; const ONBOARDING_PREFIX = "buzz-onboarding-complete.v1:"; const TEST_PUBKEYS = [ @@ -110,7 +118,15 @@ await page.addInitScript( // Install E2E mock bridge config + MockNotification (mirrors installBridge in bridge.ts) await page.addInitScript( - ({ updateReady }) => { + ({ + alicePubkey, + bobPubkey, + defaultMockPubkey, + engineeringChannelId, + huddleActive, + huddleEphemeralChannelId, + updateReady, + }) => { class MockNotification extends EventTarget { static permission = "granted"; static async requestPermission() { @@ -132,13 +148,41 @@ await page.addInitScript( writable: true, }); + const mock = { + ...(updateReady ? { updateAvailable: true } : {}), + ...(huddleActive + ? { + huddleState: { + agent_pubkeys: [], + ephemeral_channel_id: huddleEphemeralChannelId, + is_creator: true, + parent_channel_id: engineeringChannelId, + participants: [defaultMockPubkey, alicePubkey, bobPubkey], + phase: "active", + screen_share_available: true, + transcription_enabled: false, + tts_enabled: true, + voice_input_mode: "voice_activity", + }, + } + : {}), + }; + window.__BUZZ_E2E__ = { mode: "mock", - ...(updateReady ? { mock: { updateAvailable: true } } : {}), + ...(Object.keys(mock).length > 0 ? { mock } : {}), }; window.__BUZZ_E2E_APP_BADGE_COUNT__ = 0; }, - { updateReady: args["update-ready"] }, + { + alicePubkey: ALICE_PUBKEY, + bobPubkey: BOB_PUBKEY, + defaultMockPubkey: DEFAULT_MOCK_PUBKEY, + engineeringChannelId: ENGINEERING_CHANNEL_ID, + huddleActive: args["huddle-active"], + huddleEphemeralChannelId: HUDDLE_EPHEMERAL_CHANNEL_ID, + updateReady: args["update-ready"], + }, ); try {