diff --git a/crates/buzz-db/src/event.rs b/crates/buzz-db/src/event.rs index 5ab082dda..82ab1b235 100644 --- a/crates/buzz-db/src/event.rs +++ b/crates/buzz-db/src/event.rs @@ -841,6 +841,61 @@ pub async fn get_last_message_at_bulk( Ok(map) } +/// Bulk-fetch the latest non-deleted event per channel, one indexed top-1 +/// lookup per channel in a single round trip. +/// +/// `unnest` + `LATERAL` guarantees an index-backed +/// (`idx_events_community_channel_created`) `LIMIT 1` per channel — unlike +/// `DISTINCT ON`, it cannot degrade to a full sort as channels grow. Per-channel +/// ordering (`created_at DESC, id ASC LIMIT 1`) is byte-identical to +/// [`query_events`] with `limit: 1`, so a caller migrating from N single-channel +/// queries observes the same winning event per channel. +/// +/// Channels with no matching events are omitted. `kinds` follows NIP-01 +/// semantics: `None` = any kind, `Some(&[])` = match nothing. +pub async fn get_latest_event_per_channel( + pool: &PgPool, + community_id: CommunityId, + channel_ids: &[uuid::Uuid], + kinds: Option<&[i32]>, +) -> Result> { + if channel_ids.is_empty() || kinds.is_some_and(|k| k.is_empty()) { + return Ok(vec![]); + } + + let mut qb: QueryBuilder = QueryBuilder::new( + "SELECT e.id, e.pubkey, e.created_at, e.kind, e.tags, e.content, \ + e.sig, e.received_at, e.channel_id \ + FROM unnest(", + ); + qb.push_bind(channel_ids); + qb.push( + "::uuid[]) AS c(channel_id) \ + CROSS JOIN LATERAL ( \ + SELECT id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id \ + FROM events \ + WHERE community_id = ", + ); + qb.push_bind(community_id.as_uuid()); + qb.push(" AND channel_id = c.channel_id AND deleted_at IS NULL"); + if let Some(ks) = kinds { + qb.push(" AND kind = ANY("); + qb.push_bind(ks); + qb.push(")"); + } + qb.push(" ORDER BY created_at DESC, id ASC LIMIT 1 ) e"); + + let rows = qb.build().fetch_all(pool).await?; + + let mut out = Vec::with_capacity(rows.len()); + for row in rows { + if let Some(ev) = row_to_stored_event(row)? { + out.push(ev); + } + } + Ok(out) +} + /// Fetches a single non-deleted event by its raw 32-byte ID. /// /// Returns `None` if the event does not exist or has been soft-deleted. @@ -1402,6 +1457,110 @@ mod tests { assert_eq!(b.event.content, "community-b-copy"); } + #[tokio::test] + #[ignore = "requires Postgres"] + async fn latest_event_per_channel_matches_serial_per_channel_queries() { + let pool = setup_pool().await; + let community = CommunityId::from_uuid(make_test_community(&pool).await); + let keys = Keys::generate(); + let chan_a = Uuid::new_v4(); + let chan_b = Uuid::new_v4(); + let chan_c = Uuid::new_v4(); + let chan_empty = Uuid::new_v4(); + let base = nostr::Timestamp::now().as_secs() - 100; + + let sign = |kind: u16, content: &str, ts: u64| { + EventBuilder::new(Kind::Custom(kind), content) + .custom_created_at(nostr::Timestamp::from(ts)) + .sign_with_keys(&keys) + .expect("sign") + }; + + // chan_a: two kind:9 messages; the newer must win. + let a_old = sign(9, "a-old", base); + let a_new = sign(9, "a-new", base + 10); + // chan_b: newest matching event is kind:40002; a newer kind:1 must be + // excluded by the kinds constraint. Also a soft-deleted even-newer 9. + let b_match = sign(40002, "b-match", base + 5); + let b_wrong_kind = sign(1, "b-wrong-kind", base + 20); + let b_deleted = sign(9, "b-deleted", base + 30); + // chan_c: same-second tie — `id ASC` must break it identically to + // query_events. + let c_tie_1 = sign(9, "c-tie-1", base + 7); + let c_tie_2 = sign(9, "c-tie-2", base + 7); + + for (ev, ch) in [ + (&a_old, chan_a), + (&a_new, chan_a), + (&b_match, chan_b), + (&b_wrong_kind, chan_b), + (&b_deleted, chan_b), + (&c_tie_1, chan_c), + (&c_tie_2, chan_c), + ] { + insert_event(&pool, community, ev, Some(ch)) + .await + .expect("insert"); + } + sqlx::query("UPDATE events SET deleted_at = now() WHERE community_id = $1 AND id = $2") + .bind(community.as_uuid()) + .bind(b_deleted.id.as_bytes()) + .execute(&pool) + .await + .expect("soft-delete"); + + let channels = [chan_a, chan_b, chan_c, chan_empty]; + let kinds = [9i32, 40002]; + + // Reference: the serial shape being replaced — one limit:1 query per channel. + let mut expected: Vec<(Uuid, nostr::EventId)> = Vec::new(); + for ch in channels { + let mut q = EventQuery::for_community(community); + q.channel_id = Some(ch); + q.kinds = Some(kinds.to_vec()); + q.limit = Some(1); + if let Some(se) = query_events(&pool, &q).await.expect("serial query").pop() { + expected.push((ch, se.event.id)); + } + } + + let bulk = get_latest_event_per_channel(&pool, community, &channels, Some(&kinds)) + .await + .expect("bulk query"); + let mut got: Vec<(Uuid, nostr::EventId)> = bulk + .iter() + .map(|se| (se.channel_id.expect("channel scoped"), se.event.id)) + .collect(); + got.sort(); + expected.sort(); + assert_eq!(got, expected, "bulk winners must match serial winners"); + + // Spot-check the interesting winners directly. + let winner = |ch: Uuid| { + got.iter() + .find(|(c, _)| *c == ch) + .map(|(_, id)| *id) + .expect("winner present") + }; + assert_eq!(winner(chan_a), a_new.id); + assert_eq!(winner(chan_b), b_match.id, "deleted + wrong-kind excluded"); + let tie_winner = std::cmp::min(c_tie_1.id, c_tie_2.id); + assert_eq!(winner(chan_c), tie_winner, "same-second tie: id ASC"); + assert!(!got.iter().any(|(c, _)| *c == chan_empty)); + + // kinds=None matches any kind (chan_b's non-deleted newest is kind:1); + // kinds=Some(&[]) matches nothing. + let any_kind = get_latest_event_per_channel(&pool, community, &[chan_b], None) + .await + .expect("any-kind query"); + assert_eq!(any_kind.len(), 1); + assert_eq!(any_kind[0].event.id, b_wrong_kind.id); + let no_kind = get_latest_event_per_channel(&pool, community, &[chan_b], Some(&[])) + .await + .expect("empty-kinds query"); + assert!(no_kind.is_empty()); + } + fn make_event_with_kind_and_tags(kind: u16, tags: Vec) -> nostr::Event { let keys = Keys::generate(); EventBuilder::new(Kind::Custom(kind), "test") diff --git a/crates/buzz-db/src/lib.rs b/crates/buzz-db/src/lib.rs index 69150aaad..a685a7acc 100644 --- a/crates/buzz-db/src/lib.rs +++ b/crates/buzz-db/src/lib.rs @@ -557,6 +557,18 @@ impl Db { event::get_last_message_at_bulk(&self.pool, community_id, channel_ids).await } + /// Bulk-fetch the latest non-deleted event per channel (single round trip). + /// + /// See [`event::get_latest_event_per_channel`] for ordering guarantees. + pub async fn get_latest_event_per_channel( + &self, + community_id: CommunityId, + channel_ids: &[Uuid], + kinds: Option<&[i32]>, + ) -> Result> { + event::get_latest_event_per_channel(&self.pool, community_id, channel_ids, kinds).await + } + /// Batch-fetch non-deleted events by their raw IDs. pub async fn get_events_by_ids( &self, diff --git a/crates/buzz-relay/src/api/bridge.rs b/crates/buzz-relay/src/api/bridge.rs index 59093a060..99c783196 100644 --- a/crates/buzz-relay/src/api/bridge.rs +++ b/crates/buzz-relay/src/api/bridge.rs @@ -253,6 +253,15 @@ fn extract_feed_types(raw: &Value) -> Option> { } } +/// `latest_per_channel: true` marks a filter as a bulk latest-message lookup: +/// the top-1 event per `#h` channel in one DB round trip, instead of one +/// `limit:1` filter per channel. +fn extract_latest_per_channel(raw: &Value) -> bool { + raw.get("latest_per_channel") + .and_then(Value::as_bool) + .unwrap_or(false) +} + fn extract_search_mode(raw: &Value) -> buzz_search::SearchMode { match raw .get("search_mode") @@ -636,6 +645,64 @@ pub async fn query_events( handled.insert(idx); } + // Bulk latest-message lookup: `latest_per_channel: true` + multi-`#h` + // resolves the top-1 event per channel in one LATERAL round trip, replacing + // one `limit:1` filter per channel. Inaccessible channels are dropped from + // the lookup set (same outcome as the catch-all's access-scope skip), and + // every returned event still passes the identical per-event gates below — + // top-1-per-channel is guaranteed by construction, so the multi-`#h` + // limit-budget hazard that rules out SQL pushdown elsewhere doesn't apply. + let h_tag_key = nostr::SingleLetterTag::lowercase(nostr::Alphabet::H); + for (idx, (raw, filter)) in raw_filters.iter().zip(filters.iter()).enumerate() { + if handled.contains(&idx) || !extract_latest_per_channel(raw) { + continue; + } + handled.insert(idx); + + let requested: Vec = filter + .generic_tags + .get(&h_tag_key) + .map(|vs| { + vs.iter() + .filter_map(|v| v.parse::().ok()) + .filter(|ch| accessible_channels.contains(ch)) + .collect() + }) + .unwrap_or_default(); + if requested.is_empty() { + continue; + } + + let kinds: Option> = filter + .kinds + .as_ref() + .map(|ks| ks.iter().map(|k| k.as_u16() as i32).collect()); + + let latest = state + .db + .get_latest_event_per_channel(tenant.community(), &requested, kinds.as_deref()) + .await + .map_err(|e| internal_error(&format!("latest-per-channel query error: {e}")))?; + + for se in latest { + if !event_in_accessible_channel(&se, &accessible_channels) { + continue; + } + if !buzz_core::filter::filters_match(std::slice::from_ref(filter), &se) { + continue; + } + if !buzz_core::filter::reader_authorized_for_event(&se.event, &authed_pubkey_hex) { + continue; + } + if crate::handlers::req::is_author_only_event(&se.event, &pubkey_bytes) { + continue; + } + if let Ok(v) = serde_json::to_value(&se.event) { + events.push(v); + } + } + } + // Phase 1 — pure construction + validation, in filter order. Access-scope // skips and the `before_id` BAD_REQUEST are decided here, before any DB // work is issued (validation errors are deterministic client mistakes, so @@ -2107,6 +2174,23 @@ mod tests { assert!(extract_feed_types(&raw).is_none()); } + #[test] + fn extract_latest_per_channel_true() { + let raw = serde_json::json!({ "latest_per_channel": true }); + assert!(extract_latest_per_channel(&raw)); + } + + #[test] + fn extract_latest_per_channel_false_absent_or_non_bool() { + assert!(!extract_latest_per_channel(&serde_json::json!({ + "latest_per_channel": false + }))); + assert!(!extract_latest_per_channel(&serde_json::json!({}))); + assert!(!extract_latest_per_channel(&serde_json::json!({ + "latest_per_channel": "true" + }))); + } + #[test] fn event_accessible_no_channel() { let keys = Keys::generate(); diff --git a/desktop/src-tauri/src/commands/channels.rs b/desktop/src-tauri/src/commands/channels.rs index 59ab3b25a..73ffb6bfd 100644 --- a/desktop/src-tauri/src/commands/channels.rs +++ b/desktop/src-tauri/src/commands/channels.rs @@ -170,21 +170,19 @@ pub async fn get_channels(state: State<'_, AppState>) -> Result let t_member_counts = _profile_start.elapsed(); // Populate last_message_at by fetching the most recent human message per - // channel. Uses per-channel filters (single #h value each) so the relay can - // push the query to its indexed channel_id column. Multi-value #h is NOT - // SQL-pushed and would silently drop quieter channels under the global limit. + // channel. `latest_per_channel` is the bridge's bulk top-1-per-channel + // extension: one indexed relay round trip for all channels, with the same + // per-channel winner as the previous one-`limit:1`-filter-per-channel + // shape (multi-value #h alone is NOT SQL-pushed and would silently drop + // quieter channels under the global limit; the extension exists so the + // relay can push it safely). let channel_ids: Vec = channels.iter().map(|c| c.id.clone()).collect(); if !channel_ids.is_empty() { - let filters: Vec = channel_ids - .iter() - .map(|id| { - serde_json::json!({ - "kinds": [9, 40002], - "#h": [id], - "limit": 1 - }) - }) - .collect(); + let filters = [serde_json::json!({ + "kinds": [9, 40002], + "#h": channel_ids, + "latest_per_channel": true, + })]; let message_events = query_relay(&state, &filters).await.unwrap_or_default();