Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions crates/buzz-db/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,61 @@ pub async fn get_last_message_at_bulk(
Ok(map)
}

/// Bulk-fetch the latest non-deleted event per channel, one indexed top-1
/// lookup per channel in a single round trip.
///
/// `unnest` + `LATERAL` guarantees an index-backed
/// (`idx_events_community_channel_created`) `LIMIT 1` per channel — unlike
/// `DISTINCT ON`, it cannot degrade to a full sort as channels grow. Per-channel
/// ordering (`created_at DESC, id ASC LIMIT 1`) is byte-identical to
/// [`query_events`] with `limit: 1`, so a caller migrating from N single-channel
/// queries observes the same winning event per channel.
///
/// Channels with no matching events are omitted. `kinds` follows NIP-01
/// semantics: `None` = any kind, `Some(&[])` = match nothing.
pub async fn get_latest_event_per_channel(
pool: &PgPool,
community_id: CommunityId,
channel_ids: &[uuid::Uuid],
kinds: Option<&[i32]>,
) -> Result<Vec<StoredEvent>> {
if channel_ids.is_empty() || kinds.is_some_and(|k| k.is_empty()) {
return Ok(vec![]);
}

let mut qb: QueryBuilder<sqlx::Postgres> = QueryBuilder::new(
"SELECT e.id, e.pubkey, e.created_at, e.kind, e.tags, e.content, \
e.sig, e.received_at, e.channel_id \
FROM unnest(",
);
qb.push_bind(channel_ids);
qb.push(
"::uuid[]) AS c(channel_id) \
CROSS JOIN LATERAL ( \
SELECT id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id \
FROM events \
WHERE community_id = ",
);
qb.push_bind(community_id.as_uuid());
qb.push(" AND channel_id = c.channel_id AND deleted_at IS NULL");
if let Some(ks) = kinds {
qb.push(" AND kind = ANY(");
qb.push_bind(ks);
qb.push(")");
}
qb.push(" ORDER BY created_at DESC, id ASC LIMIT 1 ) e");

let rows = qb.build().fetch_all(pool).await?;

let mut out = Vec::with_capacity(rows.len());
for row in rows {
if let Some(ev) = row_to_stored_event(row)? {
out.push(ev);
}
}
Ok(out)
}

/// Fetches a single non-deleted event by its raw 32-byte ID.
///
/// Returns `None` if the event does not exist or has been soft-deleted.
Expand Down Expand Up @@ -1402,6 +1457,110 @@ mod tests {
assert_eq!(b.event.content, "community-b-copy");
}

#[tokio::test]
#[ignore = "requires Postgres"]
async fn latest_event_per_channel_matches_serial_per_channel_queries() {
let pool = setup_pool().await;
let community = CommunityId::from_uuid(make_test_community(&pool).await);
let keys = Keys::generate();
let chan_a = Uuid::new_v4();
let chan_b = Uuid::new_v4();
let chan_c = Uuid::new_v4();
let chan_empty = Uuid::new_v4();
let base = nostr::Timestamp::now().as_secs() - 100;

let sign = |kind: u16, content: &str, ts: u64| {
EventBuilder::new(Kind::Custom(kind), content)
.custom_created_at(nostr::Timestamp::from(ts))
.sign_with_keys(&keys)
.expect("sign")
};

// chan_a: two kind:9 messages; the newer must win.
let a_old = sign(9, "a-old", base);
let a_new = sign(9, "a-new", base + 10);
// chan_b: newest matching event is kind:40002; a newer kind:1 must be
// excluded by the kinds constraint. Also a soft-deleted even-newer 9.
let b_match = sign(40002, "b-match", base + 5);
let b_wrong_kind = sign(1, "b-wrong-kind", base + 20);
let b_deleted = sign(9, "b-deleted", base + 30);
// chan_c: same-second tie — `id ASC` must break it identically to
// query_events.
let c_tie_1 = sign(9, "c-tie-1", base + 7);
let c_tie_2 = sign(9, "c-tie-2", base + 7);

for (ev, ch) in [
(&a_old, chan_a),
(&a_new, chan_a),
(&b_match, chan_b),
(&b_wrong_kind, chan_b),
(&b_deleted, chan_b),
(&c_tie_1, chan_c),
(&c_tie_2, chan_c),
] {
insert_event(&pool, community, ev, Some(ch))
.await
.expect("insert");
}
sqlx::query("UPDATE events SET deleted_at = now() WHERE community_id = $1 AND id = $2")
.bind(community.as_uuid())
.bind(b_deleted.id.as_bytes())
.execute(&pool)
.await
.expect("soft-delete");

let channels = [chan_a, chan_b, chan_c, chan_empty];
let kinds = [9i32, 40002];

// Reference: the serial shape being replaced — one limit:1 query per channel.
let mut expected: Vec<(Uuid, nostr::EventId)> = Vec::new();
for ch in channels {
let mut q = EventQuery::for_community(community);
q.channel_id = Some(ch);
q.kinds = Some(kinds.to_vec());
q.limit = Some(1);
if let Some(se) = query_events(&pool, &q).await.expect("serial query").pop() {
expected.push((ch, se.event.id));
}
}

let bulk = get_latest_event_per_channel(&pool, community, &channels, Some(&kinds))
.await
.expect("bulk query");
let mut got: Vec<(Uuid, nostr::EventId)> = bulk
.iter()
.map(|se| (se.channel_id.expect("channel scoped"), se.event.id))
.collect();
got.sort();
expected.sort();
assert_eq!(got, expected, "bulk winners must match serial winners");

// Spot-check the interesting winners directly.
let winner = |ch: Uuid| {
got.iter()
.find(|(c, _)| *c == ch)
.map(|(_, id)| *id)
.expect("winner present")
};
assert_eq!(winner(chan_a), a_new.id);
assert_eq!(winner(chan_b), b_match.id, "deleted + wrong-kind excluded");
let tie_winner = std::cmp::min(c_tie_1.id, c_tie_2.id);
assert_eq!(winner(chan_c), tie_winner, "same-second tie: id ASC");
assert!(!got.iter().any(|(c, _)| *c == chan_empty));

// kinds=None matches any kind (chan_b's non-deleted newest is kind:1);
// kinds=Some(&[]) matches nothing.
let any_kind = get_latest_event_per_channel(&pool, community, &[chan_b], None)
.await
.expect("any-kind query");
assert_eq!(any_kind.len(), 1);
assert_eq!(any_kind[0].event.id, b_wrong_kind.id);
let no_kind = get_latest_event_per_channel(&pool, community, &[chan_b], Some(&[]))
.await
.expect("empty-kinds query");
assert!(no_kind.is_empty());
}

fn make_event_with_kind_and_tags(kind: u16, tags: Vec<Tag>) -> nostr::Event {
let keys = Keys::generate();
EventBuilder::new(Kind::Custom(kind), "test")
Expand Down
12 changes: 12 additions & 0 deletions crates/buzz-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,18 @@ impl Db {
event::get_last_message_at_bulk(&self.pool, community_id, channel_ids).await
}

/// Bulk-fetch the latest non-deleted event per channel (single round trip).
///
/// See [`event::get_latest_event_per_channel`] for ordering guarantees.
pub async fn get_latest_event_per_channel(
&self,
community_id: CommunityId,
channel_ids: &[Uuid],
kinds: Option<&[i32]>,
) -> Result<Vec<StoredEvent>> {
event::get_latest_event_per_channel(&self.pool, community_id, channel_ids, kinds).await
}

/// Batch-fetch non-deleted events by their raw IDs.
pub async fn get_events_by_ids(
&self,
Expand Down
84 changes: 84 additions & 0 deletions crates/buzz-relay/src/api/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,15 @@ fn extract_feed_types(raw: &Value) -> Option<Vec<String>> {
}
}

/// `latest_per_channel: true` marks a filter as a bulk latest-message lookup:
/// the top-1 event per `#h` channel in one DB round trip, instead of one
/// `limit:1` filter per channel.
fn extract_latest_per_channel(raw: &Value) -> bool {
raw.get("latest_per_channel")
.and_then(Value::as_bool)
.unwrap_or(false)
}

fn extract_search_mode(raw: &Value) -> buzz_search::SearchMode {
match raw
.get("search_mode")
Expand Down Expand Up @@ -636,6 +645,64 @@ pub async fn query_events(
handled.insert(idx);
}

// Bulk latest-message lookup: `latest_per_channel: true` + multi-`#h`
// resolves the top-1 event per channel in one LATERAL round trip, replacing
// one `limit:1` filter per channel. Inaccessible channels are dropped from
// the lookup set (same outcome as the catch-all's access-scope skip), and
// every returned event still passes the identical per-event gates below —
// top-1-per-channel is guaranteed by construction, so the multi-`#h`
// limit-budget hazard that rules out SQL pushdown elsewhere doesn't apply.
let h_tag_key = nostr::SingleLetterTag::lowercase(nostr::Alphabet::H);
for (idx, (raw, filter)) in raw_filters.iter().zip(filters.iter()).enumerate() {
if handled.contains(&idx) || !extract_latest_per_channel(raw) {
continue;
}
handled.insert(idx);

let requested: Vec<uuid::Uuid> = filter
.generic_tags
.get(&h_tag_key)
.map(|vs| {
vs.iter()
.filter_map(|v| v.parse::<uuid::Uuid>().ok())
.filter(|ch| accessible_channels.contains(ch))
.collect()
})
.unwrap_or_default();
if requested.is_empty() {
continue;
}

let kinds: Option<Vec<i32>> = filter
.kinds
.as_ref()
.map(|ks| ks.iter().map(|k| k.as_u16() as i32).collect());

let latest = state
.db
.get_latest_event_per_channel(tenant.community(), &requested, kinds.as_deref())
.await
.map_err(|e| internal_error(&format!("latest-per-channel query error: {e}")))?;

for se in latest {
if !event_in_accessible_channel(&se, &accessible_channels) {
continue;
}
if !buzz_core::filter::filters_match(std::slice::from_ref(filter), &se) {
continue;
}
if !buzz_core::filter::reader_authorized_for_event(&se.event, &authed_pubkey_hex) {
continue;
}
if crate::handlers::req::is_author_only_event(&se.event, &pubkey_bytes) {
continue;
}
if let Ok(v) = serde_json::to_value(&se.event) {
events.push(v);
}
}
}

// Phase 1 — pure construction + validation, in filter order. Access-scope
// skips and the `before_id` BAD_REQUEST are decided here, before any DB
// work is issued (validation errors are deterministic client mistakes, so
Expand Down Expand Up @@ -2107,6 +2174,23 @@ mod tests {
assert!(extract_feed_types(&raw).is_none());
}

#[test]
fn extract_latest_per_channel_true() {
let raw = serde_json::json!({ "latest_per_channel": true });
assert!(extract_latest_per_channel(&raw));
}

#[test]
fn extract_latest_per_channel_false_absent_or_non_bool() {
assert!(!extract_latest_per_channel(&serde_json::json!({
"latest_per_channel": false
})));
assert!(!extract_latest_per_channel(&serde_json::json!({})));
assert!(!extract_latest_per_channel(&serde_json::json!({
"latest_per_channel": "true"
})));
}

#[test]
fn event_accessible_no_channel() {
let keys = Keys::generate();
Expand Down
24 changes: 11 additions & 13 deletions desktop/src-tauri/src/commands/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,19 @@ pub async fn get_channels(state: State<'_, AppState>) -> Result<Vec<ChannelInfo>
let t_member_counts = _profile_start.elapsed();

// Populate last_message_at by fetching the most recent human message per
// channel. Uses per-channel filters (single #h value each) so the relay can
// push the query to its indexed channel_id column. Multi-value #h is NOT
// SQL-pushed and would silently drop quieter channels under the global limit.
// channel. `latest_per_channel` is the bridge's bulk top-1-per-channel
// extension: one indexed relay round trip for all channels, with the same
// per-channel winner as the previous one-`limit:1`-filter-per-channel
// shape (multi-value #h alone is NOT SQL-pushed and would silently drop
// quieter channels under the global limit; the extension exists so the
// relay can push it safely).
let channel_ids: Vec<String> = channels.iter().map(|c| c.id.clone()).collect();
if !channel_ids.is_empty() {
let filters: Vec<serde_json::Value> = channel_ids
.iter()
.map(|id| {
serde_json::json!({
"kinds": [9, 40002],
"#h": [id],
"limit": 1
})
})
.collect();
let filters = [serde_json::json!({
"kinds": [9, 40002],
"#h": channel_ids,
"latest_per_channel": true,
})];

let message_events = query_relay(&state, &filters).await.unwrap_or_default();

Expand Down
Loading