Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
354 changes: 338 additions & 16 deletions crates/buzz-db/src/thread.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/buzz-db/src/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub async fn search_users(
let escaped = escape_like(&normalized);
let contains_pattern = format!("%{escaped}%");
let prefix_pattern = format!("{escaped}%");
let limit = limit.clamp(1, 50) as i64;
let limit = limit.clamp(1, 500) as i64;

let rows = sqlx::query_as::<_, (Vec<u8>, Option<String>, Option<String>, Option<String>)>(
r#"
Expand Down
181 changes: 178 additions & 3 deletions crates/buzz-relay/src/api/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,41 @@ fn extract_depth_limit(raw: &Value) -> Option<u32> {
.and_then(|n| u32::try_from(n).ok())
}

/// Extract a thread pagination cursor from the raw filter JSON.
///
/// The desktop pages `get_thread_replies` forward with a keyset cursor derived
/// transparently from the last reply it has already loaded — no server-issued
/// token. The cursor is a composite of that reply's `created_at` (Unix seconds,
/// field `thread_cursor`/`threadCursor`) and its hex event id (field
/// `thread_cursor_id`/`threadCursorId`). The event id is the tiebreak that lets
/// pagination cross replies sharing the same `created_at` second — without it,
/// a timestamp-only cursor silently drops every tied reply past the page limit
/// (the exact "missed messages" bug this work exists to fix).
///
/// Wire → DB encoding: 8-byte big-endian i64 seconds, followed by the raw
/// event-id bytes when present. `get_thread_replies` decodes this layout back
/// into its `(timestamp, event_id)` keyset. A bare timestamp (no id) is still
/// accepted and paginates on time alone (unsafe across same-second ties).
fn extract_thread_cursor(raw: &Value) -> Option<Vec<u8>> {
let secs = raw
.get("thread_cursor")
.or_else(|| raw.get("threadCursor"))?
.as_i64()?;
let mut bytes = secs.to_be_bytes().to_vec();

if let Some(id_hex) = raw
.get("thread_cursor_id")
.or_else(|| raw.get("threadCursorId"))
.and_then(Value::as_str)
{
if let Ok(id_bytes) = hex::decode(id_hex) {
bytes.extend_from_slice(&id_bytes);
}
}

Some(bytes)
}

fn extract_feed_types(raw: &Value) -> Option<Vec<String>> {
let arr = raw.get("feed_types")?.as_array()?;
let types: Vec<String> = arr
Expand All @@ -229,6 +264,35 @@ fn extract_search_mode(raw: &Value) -> buzz_search::SearchMode {
}
}

fn extract_search_page(raw: &Value) -> u32 {
raw.get("page")
.or_else(|| raw.get("search_page"))
.or_else(|| raw.get("searchPage"))
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok())
.filter(|value| *value > 0)
.unwrap_or(1)
}

/// Compute the SQL `OFFSET` for a raw `page` extension on a non-search general
/// query, or `None` if paging shouldn't apply.
///
/// `page` is 1-based: page 1 → offset 0 (no change), page N → `(N-1) * limit`.
/// Returns `None` when `page` is absent or ≤ 1 (so unrelated general queries
/// keep their default offset) and when `limit` is missing (can't size a page).
/// This mirrors the FTS path's `page`/`per_page` for the non-search directory
/// listing (empty-query kind:0), whose deterministic `created_at DESC, id ASC`
/// ordering in `query_events` makes offset paging stable.
fn extract_page_offset(raw: &Value, limit: Option<i64>) -> Option<i64> {
let page = raw
.get("page")
.and_then(Value::as_u64)
.and_then(|value| i64::try_from(value).ok())
.filter(|value| *value > 1)?;
let per_page = limit.filter(|l| *l > 0)?;
page.checked_sub(1)?.checked_mul(per_page)
}

fn event_in_accessible_channel(se: &buzz_core::StoredEvent, accessible: &[uuid::Uuid]) -> bool {
match se.channel_id {
Some(ch_id) => accessible.contains(&ch_id),
Expand Down Expand Up @@ -547,9 +611,16 @@ pub async fn query_events(
.limit
.unwrap_or(100)
.min(BRIDGE_THREAD_MAX_LIMIT as usize) as u32;
let thread_cursor = extract_thread_cursor(raw);
let thread_replies = state
.db
.get_thread_replies(tenant.community(), &root_bytes, Some(depth), limit, None)
.get_thread_replies(
tenant.community(),
&root_bytes,
Some(depth),
limit,
thread_cursor.as_deref(),
)
.await
.map_err(|e| internal_error(&format!("thread query error: {e}")))?;

Expand Down Expand Up @@ -594,6 +665,18 @@ pub async fn query_events(
query.before_id = Some(bid);
}

// Honor `page` on non-search general queries so offset paging works for
// the empty-query people directory (kind:0 listing). The FTS path
// (`handle_bridge_search`) has its own `page`/`per_page`; a filter with
// no `search` field lands here instead, where paging would otherwise be
// dropped and the directory would terminate at its first page. Deterministic
// ordering in `query_events` (`created_at DESC, id ASC`) makes offset paging
// stable. `page` defaults to 1 → offset 0, so unrelated general queries are
// unaffected.
if let Some(offset) = extract_page_offset(raw, query.limit) {
query.offset = Some(offset);
}

match state.db.query_events(&query).await {
Ok(stored_events) => {
for se in stored_events {
Expand Down Expand Up @@ -867,7 +950,7 @@ fn search_hit_accepted(
}

/// Handle search filters by routing to Postgres FTS, then fetching full events
/// from DB. Returns first page of results (no pagination for bridge MVP).
/// from DB. Supports a bridge-only `page` extension over the FTS result set.
async fn handle_bridge_search(
state: &AppState,
raw_filters: &[Value],
Expand All @@ -893,6 +976,7 @@ async fn handle_bridge_search(

for (raw, filter) in raw_filters.iter().zip(filters) {
let search_mode = extract_search_mode(raw);
let search_page = extract_search_page(raw);
let search_text = match &filter.search {
Some(s) if !s.is_empty() => s.clone(),
_ => continue,
Expand Down Expand Up @@ -946,7 +1030,7 @@ async fn handle_bridge_search(
authors,
since,
until,
page: 1,
page: search_page,
per_page: limit,
mode: search_mode,
};
Expand Down Expand Up @@ -1832,12 +1916,103 @@ mod tests {
assert!(extract_before_id(&raw).is_none());
}

#[test]
fn extract_page_offset_absent_is_none() {
// No `page` → default offset (unrelated general queries untouched).
let raw = serde_json::json!({ "kinds": [0], "limit": 50 });
assert_eq!(extract_page_offset(&raw, Some(50)), None);
}

#[test]
fn extract_page_offset_page_one_is_none() {
// Page 1 is the first page → offset 0, expressed as no override.
let raw = serde_json::json!({ "kinds": [0], "limit": 50, "page": 1 });
assert_eq!(extract_page_offset(&raw, Some(50)), None);
}

#[test]
fn extract_page_offset_computes_offset_from_page_and_limit() {
// Empty people-directory contract: page N → (N-1) * limit.
let raw = serde_json::json!({ "kinds": [0], "limit": 50, "page": 3 });
assert_eq!(extract_page_offset(&raw, Some(50)), Some(100));
}

#[test]
fn extract_page_offset_missing_limit_is_none() {
// Can't size a page without a limit.
let raw = serde_json::json!({ "kinds": [0], "page": 2 });
assert_eq!(extract_page_offset(&raw, None), None);
}

#[test]
fn extract_depth_limit_valid() {
let raw = serde_json::json!({ "depth_limit": 3 });
assert_eq!(extract_depth_limit(&raw), Some(3));
}

#[test]
fn extract_thread_cursor_valid() {
// Timestamp-only cursor: 8-byte BE seconds, no tiebreak id.
let raw = serde_json::json!({ "thread_cursor": 1_782_866_946_i64 });
assert_eq!(
extract_thread_cursor(&raw),
Some(1_782_866_946_i64.to_be_bytes().to_vec())
);
}

#[test]
fn extract_thread_cursor_camel_case() {
let raw = serde_json::json!({ "threadCursor": 42_i64 });
assert_eq!(
extract_thread_cursor(&raw),
Some(42_i64.to_be_bytes().to_vec())
);
}

#[test]
fn extract_thread_cursor_composite() {
// Composite cursor: 8-byte BE seconds followed by the raw event-id bytes.
let id_hex = "aa".repeat(32);
let raw = serde_json::json!({
"thread_cursor": 1_782_866_946_i64,
"thread_cursor_id": id_hex,
});
let mut expected = 1_782_866_946_i64.to_be_bytes().to_vec();
expected.extend_from_slice(&[0xaa; 32]);
assert_eq!(extract_thread_cursor(&raw), Some(expected));
}

#[test]
fn extract_thread_cursor_composite_camel_case() {
let id_hex = "bb".repeat(32);
let raw = serde_json::json!({
"threadCursor": 7_i64,
"threadCursorId": id_hex,
});
let mut expected = 7_i64.to_be_bytes().to_vec();
expected.extend_from_slice(&[0xbb; 32]);
assert_eq!(extract_thread_cursor(&raw), Some(expected));
}

#[test]
fn extract_thread_cursor_ignores_bad_id_hex() {
// A malformed id falls back to timestamp-only rather than erroring.
let raw = serde_json::json!({
"thread_cursor": 5_i64,
"thread_cursor_id": "not-hex",
});
assert_eq!(
extract_thread_cursor(&raw),
Some(5_i64.to_be_bytes().to_vec())
);
}

#[test]
fn extract_thread_cursor_absent() {
let raw = serde_json::json!({ "depth_limit": 3 });
assert!(extract_thread_cursor(&raw).is_none());
}

#[test]
fn extract_depth_limit_zero() {
let raw = serde_json::json!({ "depth_limit": 0 });
Expand Down
1 change: 1 addition & 0 deletions desktop/playwright.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export default defineConfig({
"**/reminders.spec.ts",
"**/virtualization.spec.ts",
"**/scroll-history.spec.ts",
"**/channel-dense-second-reach.spec.ts",
"**/overscroll-boundary.spec.ts",
"**/cold-switch-longtask.perf.ts",
"**/timeline-no-shift.spec.ts",
Expand Down
15 changes: 13 additions & 2 deletions desktop/scripts/check-file-sizes.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ const overrides = new Map([
// commands add ~40 lines. Queued to split.
// branch cut; override bumped to cover the merged total. Queued to split.
["src-tauri/src/commands/agents.rs", 1437],
// #1418 read-path fix: get_thread_replies' blocker fix (shared TIMELINE_KINDS
// const + build_thread_replies_filter helper, mirroring the channel sibling so
// the two p-gate filters can't drift) plus two guard unit tests. The file was
// already at 995; this load-bearing correctness fix crossed 1000. Not generic
// debt growth. Approved override; queued to split with the rest of this list.
["src-tauri/src/commands/messages.rs", 1082],
// Residual repos_dir integration in ensure_nest_at: REPOS is provisioned
// outside NEST_DIRS (it may be a symlink), so it needs its own create +
// chmod-only-when-real-dir handling plus integration test coverage. The
Expand All @@ -82,8 +88,13 @@ const overrides = new Map([
// threaded through Tauri invokes for configurable repos_dir, plus the
// harness-persona-sync `harnessOverride` create-input bit — load-bearing
// parameter plumbing, not generic debt growth. Approved override; still
// queued to split.
["src/shared/api/tauri.ts", 1235],
// queued to split. Read-path lanes 1+2 add server-side fetch bindings
// (getThreadReplies + getChannelMessagesBefore) and paged people-search
// reachability — load-bearing reachability plumbing, not generic debt.
// #1418 read-path fix: +3 doc-only lines correcting the getThreadReplies
// contract (replies-only, root excluded — the query keys on root_event_id,
// which root rows lack). Documentation accuracy, not code growth.
["src/shared/api/tauri.ts", 1340],
// harness-persona-sync feature growth, queued to split in the resolver-unify
// refactor followup. discovery.rs is dominated by the new test module
// (the effective_agent_command / divergent / create-time override matrix);
Expand Down
Loading
Loading