From b5e3c3cf7208bb51b1bb4a423670ae10ce6daf4f Mon Sep 17 00:00:00 2001 From: npub12gtutshhh76rx0jx697f32f9tffd4hhp3hx58fp4x6u4uemkm7sqf8f757 <5217c5c2f7bfb4333e46d17c98a9255a52dadee18dcd43a43536b95e6776dfa0@sprout-oss.stage.blox.sqprod.co> Date: Tue, 30 Jun 2026 20:59:12 -0400 Subject: [PATCH 1/8] fix(desktop): page user search reachability Co-authored-by: Tyler Longwell Signed-off-by: Tyler Longwell --- crates/buzz-db/src/user.rs | 2 +- crates/buzz-relay/src/api/bridge.rs | 15 +++- desktop/src-tauri/src/commands/profile.rs | 31 +++++--- desktop/src-tauri/src/models.rs | 1 + .../src/nostr_convert/user_search.rs | 25 ++++-- .../features/channels/ui/MembersSidebar.tsx | 25 +++--- .../src/features/forum/ui/ForumComposer.tsx | 1 + .../forum/ui/ForumComposerAutocompletes.tsx | 3 + .../src/features/messages/lib/useMentions.ts | 78 ++++++++++--------- .../messages/ui/MentionAutocomplete.tsx | 12 +++ .../features/messages/ui/MessageComposer.tsx | 1 + desktop/src/features/profile/hooks.ts | 75 +++++++++++++++++- .../sidebar/ui/NewDirectMessageDialog.tsx | 20 +++-- desktop/src/shared/api/tauri.ts | 11 ++- desktop/src/shared/api/types.ts | 5 ++ desktop/src/testing/e2eBridge.ts | 22 ++++-- 16 files changed, 247 insertions(+), 80 deletions(-) diff --git a/crates/buzz-db/src/user.rs b/crates/buzz-db/src/user.rs index d4c9395bc..a4a5a9d7e 100644 --- a/crates/buzz-db/src/user.rs +++ b/crates/buzz-db/src/user.rs @@ -231,7 +231,7 @@ pub async fn search_users( let escaped = escape_like(&normalized); let contains_pattern = format!("%{escaped}%"); let prefix_pattern = format!("{escaped}%"); - let limit = limit.clamp(1, 50) as i64; + let limit = limit.clamp(1, 500) as i64; let rows = sqlx::query_as::<_, (Vec, Option, Option, Option)>( r#" diff --git a/crates/buzz-relay/src/api/bridge.rs b/crates/buzz-relay/src/api/bridge.rs index 3882363d8..537317279 100644 --- a/crates/buzz-relay/src/api/bridge.rs +++ b/crates/buzz-relay/src/api/bridge.rs @@ -229,6 +229,16 @@ fn extract_search_mode(raw: &Value) -> buzz_search::SearchMode { } } +fn extract_search_page(raw: &Value) -> u32 { + raw.get("page") + .or_else(|| raw.get("search_page")) + .or_else(|| raw.get("searchPage")) + .and_then(Value::as_u64) + .and_then(|value| u32::try_from(value).ok()) + .filter(|value| *value > 0) + .unwrap_or(1) +} + fn event_in_accessible_channel(se: &buzz_core::StoredEvent, accessible: &[uuid::Uuid]) -> bool { match se.channel_id { Some(ch_id) => accessible.contains(&ch_id), @@ -867,7 +877,7 @@ fn search_hit_accepted( } /// Handle search filters by routing to Postgres FTS, then fetching full events -/// from DB. Returns first page of results (no pagination for bridge MVP). +/// from DB. Supports a bridge-only `page` extension over the FTS result set. async fn handle_bridge_search( state: &AppState, raw_filters: &[Value], @@ -893,6 +903,7 @@ async fn handle_bridge_search( for (raw, filter) in raw_filters.iter().zip(filters) { let search_mode = extract_search_mode(raw); + let search_page = extract_search_page(raw); let search_text = match &filter.search { Some(s) if !s.is_empty() => s.clone(), _ => continue, @@ -946,7 +957,7 @@ async fn handle_bridge_search( authors, since, until, - page: 1, + page: search_page, per_page: limit, mode: search_mode, }; diff --git a/desktop/src-tauri/src/commands/profile.rs b/desktop/src-tauri/src/commands/profile.rs index e95b13ae8..6c2135c6b 100644 --- a/desktop/src-tauri/src/commands/profile.rs +++ b/desktop/src-tauri/src/commands/profile.rs @@ -172,13 +172,22 @@ pub async fn get_user_notes( pub async fn search_users( query: String, limit: Option, + cursor: Option, state: State<'_, AppState>, ) -> Result { let trimmed = query.trim(); - let max = limit.unwrap_or(8).min(50) as usize; + let max = limit.unwrap_or(8).min(500) as usize; + let page = cursor + .as_deref() + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0) + .unwrap_or(1); if max == 0 { - return Ok(SearchUsersResponse { users: Vec::new() }); + return Ok(SearchUsersResponse { + users: Vec::new(), + next_cursor: None, + }); } if trimmed.is_empty() { @@ -187,6 +196,7 @@ pub async fn search_users( &[serde_json::json!({ "kinds": [0], "limit": max, + "page": page, })], ) .await?; @@ -201,24 +211,27 @@ pub async fn search_users( // and scanning client-side. The old path was capped at 2000 kind:0 events // by the relay's HTTP bridge limit, which silently hid users on busy relays. // - // We over-fetch (limit=50, which the bridge accepts up to 500) and re-rank + // We fetch one bounded page (bridge accepts up to 500) and re-rank that page // locally because the relay scores FTS rank against the whole kind:0 JSON // `content` blob, where a hit in `display_name` is not weighted any higher - // than a substring hit in `about`. Re-ranking ≤50 results client-side is - // cheap and keeps display ordering predictable for autocomplete. + // than a substring hit in `about`. The caller can request later pages via the + // cursor so the UI cap is only a page size, not a terminal directory ceiling. let events = query_relay( &state, &[serde_json::json!({ "kinds": [0], "search": trimmed, - "limit": 50, + "limit": max, + "page": page, })], ) .await?; - Ok(nostr_convert::rank_user_search_results( - &events, trimmed, max, - )) + let mut response = nostr_convert::rank_user_search_results(&events, trimmed, max); + if events.len() >= max { + response.next_cursor = Some((page + 1).to_string()); + } + Ok(response) } #[tauri::command] diff --git a/desktop/src-tauri/src/models.rs b/desktop/src-tauri/src/models.rs index dc6f03ca8..e2bb6e239 100644 --- a/desktop/src-tauri/src/models.rs +++ b/desktop/src-tauri/src/models.rs @@ -48,6 +48,7 @@ pub struct UserSearchResultInfo { #[derive(Serialize, Deserialize)] pub struct SearchUsersResponse { pub users: Vec, + pub next_cursor: Option, } #[derive(Serialize, Deserialize)] diff --git a/desktop/src-tauri/src/nostr_convert/user_search.rs b/desktop/src-tauri/src/nostr_convert/user_search.rs index 3d4ac8906..43b4288ab 100644 --- a/desktop/src-tauri/src/nostr_convert/user_search.rs +++ b/desktop/src-tauri/src/nostr_convert/user_search.rs @@ -28,13 +28,19 @@ pub fn user_search_result_from_event(ev: &Event) -> UserSearchResultInfo { /// Convert kind:0 events (e.g. from a NIP-50 search) to [`SearchUsersResponse`]. pub fn search_users_from_events(events: &[Event]) -> SearchUsersResponse { let users = events.iter().map(user_search_result_from_event).collect(); - SearchUsersResponse { users } + SearchUsersResponse { + users, + next_cursor: None, + } } /// Convert a default kind:0 page to user-search results for empty-query pickers. pub fn list_user_search_results(events: &[Event], limit: usize) -> SearchUsersResponse { if limit == 0 { - return SearchUsersResponse { users: Vec::new() }; + return SearchUsersResponse { + users: Vec::new(), + next_cursor: None, + }; } let mut latest_by_pubkey: HashMap = @@ -71,7 +77,10 @@ pub fn list_user_search_results(events: &[Event], limit: usize) -> SearchUsersRe }); users.truncate(limit); - SearchUsersResponse { users } + SearchUsersResponse { + users, + next_cursor: None, + } } /// Rank and truncate kind:0 events from a NIP-50 search response for the @@ -98,7 +107,10 @@ pub fn rank_user_search_results( ) -> SearchUsersResponse { let q = query.trim().to_lowercase(); if q.is_empty() || limit == 0 { - return SearchUsersResponse { users: Vec::new() }; + return SearchUsersResponse { + users: Vec::new(), + next_cursor: None, + }; } // (score, input_index) — input index is a stable tiebreaker preserving @@ -138,7 +150,10 @@ pub fn rank_user_search_results( } } - SearchUsersResponse { users } + SearchUsersResponse { + users, + next_cursor: None, + } } fn match_score(q: &str, display_name: &str, nip05: &str, pubkey_hex: &str) -> u32 { diff --git a/desktop/src/features/channels/ui/MembersSidebar.tsx b/desktop/src/features/channels/ui/MembersSidebar.tsx index 560190d71..44678d934 100644 --- a/desktop/src/features/channels/ui/MembersSidebar.tsx +++ b/desktop/src/features/channels/ui/MembersSidebar.tsx @@ -20,7 +20,9 @@ import { formatPubkey, } from "@/features/channels/lib/memberUtils"; import { - useUserSearchQuery, + useFlattenedUserSearchResults, + useInfiniteUserSearchQuery, + useUserSearchFetchMoreOnScroll, useUsersBatchQuery, } from "@/features/profile/hooks"; import { rankUserCandidatesBySearch } from "@/features/profile/lib/userCandidateSearch"; @@ -55,11 +57,9 @@ import { } from "@/shared/ui/modalSearchStyles"; import { MembersSidebarMemberCard } from "./MembersSidebarMemberCard"; import { useMembersSidebarActions } from "./useMembersSidebarActions"; - const MEMBER_ADD_RESULT_LIMIT = 50; const MEMBER_ROW_INSET_DIVIDER_CLASS = "after:pointer-events-none after:absolute after:bottom-0 after:left-[3.75rem] after:right-0 after:h-px after:bg-border/60 after:content-[''] last:after:hidden"; - function formatAddCandidateName(user: UserSearchResult) { return ( user.displayName?.trim() || @@ -67,7 +67,6 @@ function formatAddCandidateName(user: UserSearchResult) { formatPubkey(user.pubkey) ); } - function formatOwnerName( user: UserSearchResult, ownerProfiles?: Record< @@ -78,7 +77,6 @@ function formatOwnerName( if (!user.ownerPubkey) { return null; } - const owner = ownerProfiles?.[normalizePubkey(user.ownerPubkey)]; return ( owner?.displayName?.trim() || @@ -86,13 +84,11 @@ function formatOwnerName( formatPubkey(user.ownerPubkey) ); } - type AddMemberSearchCandidate = UserSearchResult & { isManagedAgent?: boolean; isMember?: boolean; personaId?: string | null; }; - function addMemberCandidatePersonaId( candidate: UserSearchResult, managedAgentsByPubkey: ReadonlyMap, @@ -100,14 +96,12 @@ function addMemberCandidatePersonaId( return managedAgentsByPubkey.get(normalizePubkey(candidate.pubkey)) ?.personaId; } - function addMemberCandidateIsManagedAgent( candidate: UserSearchResult, managedAgentsByPubkey: ReadonlyMap, ) { return managedAgentsByPubkey.has(normalizePubkey(candidate.pubkey)); } - function addMemberCandidateWithAgentMetadata( candidate: UserSearchResult, managedAgentsByPubkey: ReadonlyMap, @@ -127,7 +121,6 @@ function memberModalRoleRank(member: ChannelMember) { if (member.role === "admin") return 1; return 2; } - function compareMembersForModal( currentPubkey: string | undefined, left: ChannelMember, @@ -260,11 +253,12 @@ export function MembersSidebar({ const canAddMembers = (selfMember !== null || channel?.visibility === "open") && channel?.channelType !== "dm"; - const userSearchQuery = useUserSearchQuery(deferredSearchQuery, { + const userSearchQuery = useInfiniteUserSearchQuery(deferredSearchQuery, { allowEmpty: false, enabled: open && canAddMembers && deferredSearchQuery.length > 0, limit: MEMBER_ADD_RESULT_LIMIT, }); + const userSearchResults = useFlattenedUserSearchResults(userSearchQuery.data); const isArchivedDiscovery = useIsArchivedPredicate(); const addSearchResults = React.useMemo(() => { if (!canAddMembers || normalizedDeferredSearchQuery.length === 0) { @@ -335,7 +329,7 @@ export function MembersSidebar({ }); }; - for (const user of userSearchQuery.data ?? []) { + for (const user of userSearchResults) { addCandidate( addMemberCandidateWithAgentMetadata(user, managedAgentsByPubkey), ); @@ -393,7 +387,7 @@ export function MembersSidebar({ memberPubkeys, normalizedDeferredSearchQuery, relayAgentsQuery.data, - userSearchQuery.data, + userSearchResults, rawMembers, ]); const isAddSearchLoading = @@ -401,6 +395,10 @@ export function MembersSidebar({ managedAgentsQuery.isLoading || relayAgentsQuery.isLoading || channelsQuery.isLoading; + const handlePeopleSearchScroll = useUserSearchFetchMoreOnScroll( + userSearchQuery, + canAddMembers && normalizedDeferredSearchQuery.length > 0, + ); const addSearchOwnerPubkeys = React.useMemo( () => [ ...new Set( @@ -685,6 +683,7 @@ export function MembersSidebar({
{normalizedSearchQuery diff --git a/desktop/src/features/forum/ui/ForumComposer.tsx b/desktop/src/features/forum/ui/ForumComposer.tsx index ec8884f98..209d9c642 100644 --- a/desktop/src/features/forum/ui/ForumComposer.tsx +++ b/desktop/src/features/forum/ui/ForumComposer.tsx @@ -457,6 +457,7 @@ export function ForumComposer({ mentions.isMentionOpen ? mentions.suggestions : [] } onChannelSelect={applyChannelInsert} + onMentionFetchMore={mentions.fetchMoreSuggestions} onMentionSelect={applyMentionInsert} position={autocompletePosition} /> diff --git a/desktop/src/features/forum/ui/ForumComposerAutocompletes.tsx b/desktop/src/features/forum/ui/ForumComposerAutocompletes.tsx index 9b271eaed..5608a4e90 100644 --- a/desktop/src/features/forum/ui/ForumComposerAutocompletes.tsx +++ b/desktop/src/features/forum/ui/ForumComposerAutocompletes.tsx @@ -11,6 +11,7 @@ type ForumComposerAutocompletesProps = { mentionSelectedIndex: number; mentionSuggestions: MentionSuggestion[]; onChannelSelect: (suggestion: ChannelSuggestion) => void; + onMentionFetchMore?: () => void; onMentionSelect: (suggestion: MentionSuggestion) => void; position: "above" | "below"; }; @@ -21,6 +22,7 @@ export function ForumComposerAutocompletes({ mentionSelectedIndex, mentionSuggestions, onChannelSelect, + onMentionFetchMore, onMentionSelect, position, }: ForumComposerAutocompletesProps) { @@ -33,6 +35,7 @@ export function ForumComposerAutocompletes({ suggestions={channelSuggestions} /> >(new Map()); const previousSuggestionsRef = React.useRef([]); - const canSearchAllUsers = - options?.channelType === "dm" || - options?.channelType === "stream" || - options?.channelType === "forum"; + void options?.channelType; const mentionSearchQuery = mentionQuery?.trim() ?? ""; - const canSearchGlobalPeople = - canSearchAllUsers && mentionSearchQuery.length > 0; + const canSearchGlobalPeople = mentionSearchQuery.length > 0; const identityQuery = useIdentityQuery(); const currentPubkey = identityQuery.data?.pubkey ? normalizePubkey(identityQuery.data.pubkey) @@ -165,11 +161,15 @@ export function useMentions( canSearchGlobalPeople && managedAgentDirectoryReady && relayAgentDirectoryReady; - const userSearchQuery = useUserSearchQuery(mentionQuery ?? "", { + const userSearchQuery = useInfiniteUserSearchQuery(mentionQuery ?? "", { allowEmpty: true, enabled: canSearchGlobalUsers && mentionQuery !== null, limit: MENTION_SUGGESTION_LIMIT, }); + const userSearchResults = React.useMemo( + () => userSearchQuery.data?.pages.flatMap((page) => page.users) ?? [], + [userSearchQuery.data], + ); const managedAgentNamesByPubkey = React.useMemo( () => new Map( @@ -348,36 +348,34 @@ export function useMentions( }); } - if (canSearchAllUsers) { - for (const agent of relayAgentsQuery.data ?? []) { - addCandidate({ - kind: "identity", - pubkey: agent.pubkey, - displayName: agent.name, - isMember: false, - ownerPubkey: null, - isAgent: true, - }); - } + for (const agent of relayAgentsQuery.data ?? []) { + addCandidate({ + kind: "identity", + pubkey: agent.pubkey, + displayName: agent.name, + isMember: false, + ownerPubkey: null, + isAgent: true, + }); + } - for (const agent of managedAgentsQuery.data ?? []) { - addCandidate({ - kind: "identity", - pubkey: agent.pubkey, - displayName: agent.name, - isMember: false, - isAgent: true, - isManagedAgent: true, - personaId: agent.personaId ?? undefined, - personaName: - personaNameByPubkey.get(normalizePubkey(agent.pubkey)) ?? null, - ownerPubkey: currentPubkey, - }); - } + for (const agent of managedAgentsQuery.data ?? []) { + addCandidate({ + kind: "identity", + pubkey: agent.pubkey, + displayName: agent.name, + isMember: false, + isAgent: true, + isManagedAgent: true, + personaId: agent.personaId ?? undefined, + personaName: + personaNameByPubkey.get(normalizePubkey(agent.pubkey)) ?? null, + ownerPubkey: currentPubkey, + }); } if (canSearchGlobalUsers) { - for (const user of userSearchQuery.data ?? []) { + for (const user of userSearchResults) { addCandidate({ kind: "identity", pubkey: user.pubkey, @@ -428,7 +426,7 @@ export function useMentions( ); }, [ activePersonas, - canSearchAllUsers, + userSearchResults, canSearchGlobalUsers, currentPubkey, isArchivedDiscovery, @@ -443,7 +441,6 @@ export function useMentions( profiles, relayAgentNamesByPubkey, relayAgentsQuery.data, - userSearchQuery.data, ]); const ownerPubkeys = React.useMemo( @@ -637,6 +634,12 @@ export function useMentions( profiles, ]); + const fetchMoreSuggestions = React.useCallback(() => { + if (userSearchQuery.hasNextPage && !userSearchQuery.isFetchingNextPage) { + void userSearchQuery.fetchNextPage(); + } + }, [userSearchQuery]); + const suggestions = React.useMemo(() => { if (mentionQuery === null) { return []; @@ -982,6 +985,9 @@ export function useMentions( mentionSelectedIndex, registerMentionPubkey, suggestions, + fetchMoreSuggestions, + hasMoreSuggestions: Boolean(userSearchQuery.hasNextPage), + isFetchingMoreSuggestions: userSearchQuery.isFetchingNextPage, updateMentionQuery, }; } diff --git a/desktop/src/features/messages/ui/MentionAutocomplete.tsx b/desktop/src/features/messages/ui/MentionAutocomplete.tsx index 3320e7ea1..5a3508124 100644 --- a/desktop/src/features/messages/ui/MentionAutocomplete.tsx +++ b/desktop/src/features/messages/ui/MentionAutocomplete.tsx @@ -25,6 +25,7 @@ export type MentionSuggestion = { type MentionAutocompleteProps = { suggestions: MentionSuggestion[]; selectedIndex: number; + onFetchMore?: () => void; onSelect: (suggestion: MentionSuggestion) => void; position?: "above" | "below"; }; @@ -32,6 +33,7 @@ type MentionAutocompleteProps = { export const MentionAutocomplete = React.memo(function MentionAutocomplete({ suggestions, selectedIndex, + onFetchMore, onSelect, position = "above", }: MentionAutocompleteProps) { @@ -44,6 +46,15 @@ export const MentionAutocomplete = React.memo(function MentionAutocomplete({ activeItem?.scrollIntoView({ block: "nearest" }); }, [selectedIndex]); + const handleScroll = React.useCallback(() => { + const list = listRef.current; + if (!list || !onFetchMore) return; + + if (list.scrollHeight - list.scrollTop - list.clientHeight < 48) { + onFetchMore(); + } + }, [onFetchMore]); + if (suggestions.length === 0) { return null; } @@ -65,6 +76,7 @@ export const MentionAutocomplete = React.memo(function MentionAutocomplete({ POPOVER_SURFACE_CLASS, )} data-testid="mention-autocomplete" + onScroll={handleScroll} ref={listRef} style={POPOVER_SHADOW_STYLE} > diff --git a/desktop/src/features/messages/ui/MessageComposer.tsx b/desktop/src/features/messages/ui/MessageComposer.tsx index 746f7e1d4..130c55613 100644 --- a/desktop/src/features/messages/ui/MessageComposer.tsx +++ b/desktop/src/features/messages/ui/MessageComposer.tsx @@ -868,6 +868,7 @@ function MessageComposerImpl({ } /> ({ enabled, queryKey: ["user-search", normalizedQuery, options?.limit ?? 8], - queryFn: () => searchUsers(normalizedQuery, options?.limit ?? 8), + queryFn: async () => + (await searchUsers(normalizedQuery, options?.limit ?? 8)).users, + staleTime: 30_000, + gcTime: 5 * 60 * 1_000, + }); +} + +export function useInfiniteUserSearchQuery( + query: string, + options?: { + allowEmpty?: boolean; + enabled?: boolean; + limit?: number; + }, +) { + const normalizedQuery = query.trim().toLowerCase(); + const enabled = + (options?.enabled ?? true) && + (options?.allowEmpty === true || normalizedQuery.length > 0); + + return useInfiniteQuery({ + enabled, + queryKey: [ + "user-search", + "infinite", + normalizedQuery, + options?.limit ?? 50, + ], + queryFn: ({ pageParam }) => + searchUsers( + normalizedQuery, + options?.limit ?? 50, + typeof pageParam === "string" ? pageParam : null, + ), + getNextPageParam: (lastPage) => lastPage.nextCursor ?? undefined, + initialPageParam: null, staleTime: 30_000, gcTime: 5 * 60 * 1_000, }); } +export function useFlattenedUserSearchResults( + data: InfiniteData | undefined, +) { + return React.useMemo( + () => data?.pages.flatMap((page) => page.users) ?? [], + [data], + ); +} + +export function useUserSearchFetchMoreOnScroll( + query: UseInfiniteQueryResult>, + enabled = true, +) { + return React.useCallback( + (event: React.UIEvent) => { + if (!enabled) { + return; + } + + const list = event.currentTarget; + if (list.scrollHeight - list.scrollTop - list.clientHeight >= 64) { + return; + } + + if (query.hasNextPage && !query.isFetchingNextPage) { + void query.fetchNextPage(); + } + }, + [enabled, query], + ); +} + export function useUpdateProfileMutation() { const queryClient = useQueryClient(); const { activeWorkspace } = useWorkspaces(); diff --git a/desktop/src/features/sidebar/ui/NewDirectMessageDialog.tsx b/desktop/src/features/sidebar/ui/NewDirectMessageDialog.tsx index 8a9b63eda..9a01263f4 100644 --- a/desktop/src/features/sidebar/ui/NewDirectMessageDialog.tsx +++ b/desktop/src/features/sidebar/ui/NewDirectMessageDialog.tsx @@ -12,7 +12,9 @@ import { } from "@/features/agents/lib/agentAutocompleteEligibility"; import { useIsArchivedPredicate } from "@/features/identity-archive/hooks"; import { - useUserSearchQuery, + useFlattenedUserSearchResults, + useInfiniteUserSearchQuery, + useUserSearchFetchMoreOnScroll, useUsersBatchQuery, } from "@/features/profile/hooks"; import { truncatePubkey } from "@/features/profile/lib/identity"; @@ -319,11 +321,12 @@ export function NewDirectMessageDialog({ const managedAgentsQuery = useManagedAgentsQuery({ enabled: open }); const relayAgentsQuery = useRelayAgentsQuery({ enabled: open }); const channelsQuery = useChannelsQuery({ enabled: open }); - const userSearchQuery = useUserSearchQuery(deferredSearchQuery, { + const userSearchQuery = useInfiniteUserSearchQuery(deferredSearchQuery, { allowEmpty: true, enabled: open && !hasReachedRecipientLimit, limit: DIRECT_MESSAGE_RECIPIENT_LIMIT, }); + const userSearchResults = useFlattenedUserSearchResults(userSearchQuery.data); const isArchivedDiscovery = useIsArchivedPredicate(); const searchResults = React.useMemo(() => { const candidatesByPubkey = new Map(); @@ -384,7 +387,7 @@ export function NewDirectMessageDialog({ }); }; - for (const user of userSearchQuery.data ?? []) { + for (const user of userSearchResults) { addCandidate( directMessageCandidateWithAgentMetadata(user, managedAgentsByPubkey), ); @@ -441,13 +444,17 @@ export function NewDirectMessageDialog({ managedAgentsQuery.data, relayAgentsQuery.data, selectedPubkeys, - userSearchQuery.data, + userSearchResults, ]); const isDirectoryLoading = userSearchQuery.isLoading || managedAgentsQuery.isLoading || relayAgentsQuery.isLoading || channelsQuery.isLoading; + const handleDirectoryScroll = useUserSearchFetchMoreOnScroll( + userSearchQuery, + !hasReachedRecipientLimit, + ); const searchOwnerPubkeys = React.useMemo( () => [ @@ -699,7 +706,10 @@ export function NewDirectMessageDialog({ opacity: hasReachedRecipientLimit ? 0 : 1, }} > -
+
{searchResults.length > 0 ? (
{searchResults.map((user) => { diff --git a/desktop/src/shared/api/tauri.ts b/desktop/src/shared/api/tauri.ts index 3151491a3..2c5bad331 100644 --- a/desktop/src/shared/api/tauri.ts +++ b/desktop/src/shared/api/tauri.ts @@ -32,6 +32,7 @@ import type { UpdateProfileInput, UpdateChannelInput, UserProfileSummary, + UserSearchPage, UserSearchResult, UsersBatchResponse, CreateManagedAgentInput, @@ -69,6 +70,7 @@ type RawUserSearchResult = RawUserProfileSummary & { pubkey: string }; type RawSearchUsersResponse = { users: RawUserSearchResult[]; + next_cursor?: string | null; }; type RawPresenceLookup = Record; @@ -503,12 +505,17 @@ export async function getUsersBatch( export async function searchUsers( query: string, limit = 8, -): Promise { + cursor?: string | null, +): Promise { const response = await invokeTauri("search_users", { query, limit, + cursor: cursor ?? null, }); - return response.users.map(fromRawUserSearchResult); + return { + users: response.users.map(fromRawUserSearchResult), + nextCursor: response.next_cursor ?? null, + }; } export async function getPresence(pubkeys: string[]): Promise { diff --git a/desktop/src/shared/api/types.ts b/desktop/src/shared/api/types.ts index 8f0436788..277198afd 100644 --- a/desktop/src/shared/api/types.ts +++ b/desktop/src/shared/api/types.ts @@ -139,6 +139,11 @@ export type UserSearchResult = { isAgent: boolean; }; +export type UserSearchPage = { + users: UserSearchResult[]; + nextCursor: string | null; +}; + export type UpdateProfileInput = { displayName?: string; avatarUrl?: string; diff --git a/desktop/src/testing/e2eBridge.ts b/desktop/src/testing/e2eBridge.ts index e4c6b9933..206ffffcf 100644 --- a/desktop/src/testing/e2eBridge.ts +++ b/desktop/src/testing/e2eBridge.ts @@ -183,6 +183,7 @@ type RawUserSearchResult = { type RawSearchUsersResponse = { users: RawUserSearchResult[]; + next_cursor?: string | null; }; type PresenceStatus = "online" | "away" | "offline"; @@ -3932,6 +3933,7 @@ async function handleSearchUsers( args: { query: string; limit?: number; + cursor?: string | null; }, config: E2eConfig | undefined, ) { @@ -3946,7 +3948,9 @@ async function handleSearchUsers( if (!identity) { const normalizedQuery = args.query.trim().toLowerCase(); - const results = listMockProfiles() + const limit = args.limit ?? 8; + const page = Math.max(Number(args.cursor ?? 1) || 1, 1); + const allResults = listMockProfiles() .filter((profile) => { if (normalizedQuery.length === 0) { return true; @@ -3966,8 +3970,9 @@ async function handleSearchUsers( const rightName = right.display_name ?? right.nip05_handle ?? right.pubkey; return leftName.localeCompare(rightName); - }) - .slice(0, args.limit ?? 8) + }); + const results = allResults + .slice((page - 1) * limit, page * limit) .map((profile) => ({ pubkey: profile.pubkey, display_name: profile.display_name, @@ -3979,16 +3984,18 @@ async function handleSearchUsers( return { users: results, + next_cursor: page * limit < allResults.length ? String(page + 1) : null, } satisfies RawSearchUsersResponse; } // NIP-50 search on kind:0 profiles const limit = args.limit ?? 8; const normalizedQuery = args.query.trim(); + const page = Math.max(Number(args.cursor ?? 1) || 1, 1); const filter = normalizedQuery.length === 0 - ? { kinds: [0], limit } - : { kinds: [0], search: args.query, limit }; + ? { kinds: [0], limit, page } + : { kinds: [0], search: args.query, limit, page }; const events = await relayQuery(config, [filter]); const users = events.map((ev) => { const content = JSON.parse(ev.content ?? "{}"); @@ -4009,7 +4016,10 @@ async function handleSearchUsers( : false, }; }); - return { users }; + return { + users, + next_cursor: users.length >= limit ? String(page + 1) : null, + }; } async function handleGetPresence( From 4e6c82757ff2c61f8e486e5d208525cb505c9ca7 Mon Sep 17 00:00:00 2001 From: npub12gtutshhh76rx0jx697f32f9tffd4hhp3hx58fp4x6u4uemkm7sqf8f757 <5217c5c2f7bfb4333e46d17c98a9255a52dadee18dcd43a43536b95e6776dfa0@sprout-oss.stage.blox.sqprod.co> Date: Tue, 30 Jun 2026 21:05:40 -0400 Subject: [PATCH 2/8] test(desktop): cover people search pagination Co-authored-by: Tyler Longwell Signed-off-by: Tyler Longwell --- .../features/channels/ui/MembersSidebar.tsx | 2 +- .../src/features/messages/lib/useMentions.ts | 2 +- .../sidebar/ui/NewDirectMessageDialog.tsx | 6 +- desktop/tests/e2e/channels.spec.ts | 77 +++++++++++++++++++ desktop/tests/e2e/mentions.spec.ts | 38 +++++++++ 5 files changed, 122 insertions(+), 3 deletions(-) diff --git a/desktop/src/features/channels/ui/MembersSidebar.tsx b/desktop/src/features/channels/ui/MembersSidebar.tsx index 44678d934..0f9bd0981 100644 --- a/desktop/src/features/channels/ui/MembersSidebar.tsx +++ b/desktop/src/features/channels/ui/MembersSidebar.tsx @@ -375,7 +375,7 @@ export function MembersSidebar({ return rankUserCandidatesBySearch({ candidates: coalescedCandidates, getLabel: formatAddCandidateName, - limit: MEMBER_ADD_RESULT_LIMIT, + limit: Math.max(MEMBER_ADD_RESULT_LIMIT, coalescedCandidates.length), query: normalizedDeferredSearchQuery, }); }, [ diff --git a/desktop/src/features/messages/lib/useMentions.ts b/desktop/src/features/messages/lib/useMentions.ts index 727b144b2..5393025a0 100644 --- a/desktop/src/features/messages/lib/useMentions.ts +++ b/desktop/src/features/messages/lib/useMentions.ts @@ -607,7 +607,7 @@ export function useMentions( (a, b) => a.groupRank - b.groupRank || a.score - b.score || a.order - b.order, ) - .slice(0, MENTION_SUGGESTION_LIMIT) + .slice(0, Math.max(MENTION_SUGGESTION_LIMIT, mentionCandidates.length)) .map(({ candidate, label, ownerLabel }) => ({ pubkey: candidate.pubkey, personaId: candidate.personaId, diff --git a/desktop/src/features/sidebar/ui/NewDirectMessageDialog.tsx b/desktop/src/features/sidebar/ui/NewDirectMessageDialog.tsx index 9a01263f4..756ccaf09 100644 --- a/desktop/src/features/sidebar/ui/NewDirectMessageDialog.tsx +++ b/desktop/src/features/sidebar/ui/NewDirectMessageDialog.tsx @@ -433,7 +433,10 @@ export function NewDirectMessageDialog({ allowEmptyQuery: true, candidates: coalescedCandidates, getLabel: formatUserName, - limit: DIRECT_MESSAGE_RECIPIENT_LIMIT, + limit: Math.max( + DIRECT_MESSAGE_RECIPIENT_LIMIT, + coalescedCandidates.length, + ), query: deferredSearchQuery, }); }, [ @@ -708,6 +711,7 @@ export function NewDirectMessageDialog({ >
{searchResults.length > 0 ? ( diff --git a/desktop/tests/e2e/channels.spec.ts b/desktop/tests/e2e/channels.spec.ts index 7d6423fea..107d618b2 100644 --- a/desktop/tests/e2e/channels.spec.ts +++ b/desktop/tests/e2e/channels.spec.ts @@ -1540,6 +1540,83 @@ test("members sidebar can invite and remove members", async ({ page }) => { await expectMembersTriggerCount(page, initialMemberCount); }); +test("members sidebar pages add-member search beyond the first 50 people", async ({ + page, +}) => { + const searchProfiles = Array.from({ length: 55 }, (_, index) => ({ + pubkey: `${(index + 1).toString(16).padStart(64, "0")}`, + displayName: `Alex ${String(index + 1).padStart(2, "0")}`, + })); + await installMockBridge(page, { searchProfiles }); + + await page.goto("/"); + await openMembersSidebar(page, "general"); + await page.getByTestId("channel-management-search-users").fill("Alex"); + + const results = page.locator("[data-testid^='channel-user-search-result-']"); + await expect(results).toHaveCount(50); + + await page + .getByTestId("members-sidebar-people") + .evaluate((node) => node.scrollTo(0, node.scrollHeight)); + + await expect(results).toHaveCount(55); + await expect(page.getByText("Alex 55")).toBeVisible(); + + const searchCalls = (await readCommandPayloadLog(page)).filter( + (entry) => entry.command === "search_users", + ); + expect(searchCalls).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + payload: expect.objectContaining({ cursor: null, limit: 50 }), + }), + expect.objectContaining({ + payload: expect.objectContaining({ cursor: "2", limit: 50 }), + }), + ]), + ); +}); + +test("new DM picker pages people search beyond the first 50 results", async ({ + page, +}) => { + const searchProfiles = Array.from({ length: 55 }, (_, index) => ({ + pubkey: `${(index + 1).toString(16).padStart(64, "0")}`, + displayName: `Alex ${String(index + 1).padStart(2, "0")}`, + })); + await installMockBridge(page, { searchProfiles }); + + await page.goto("/"); + await page.getByTestId("new-dm-trigger").click(); + await expect(page.getByTestId("new-dm-dialog")).toBeVisible(); + await page.getByTestId("new-dm-search").fill("Alex"); + + const results = page.locator("[data-testid^='new-dm-result-']"); + await expect(results).toHaveCount(50); + + await page + .getByTestId("new-dm-results") + .evaluate((node) => node.scrollTo(0, node.scrollHeight)); + + await expect(results).toHaveCount(55); + await expect(page.getByText("Alex 55")).toBeVisible(); + + const searchCalls = (await readCommandPayloadLog(page)).filter( + (entry) => entry.command === "search_users", + ); + expect(searchCalls).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + payload: expect.objectContaining({ cursor: null, limit: 50 }), + }), + expect.objectContaining({ + payload: expect.objectContaining({ cursor: "2", limit: 50 }), + }), + ]), + ); +}); + test("members modal does not show direct pubkey entry", async ({ page }) => { await page.goto("/"); await openMembersSidebar(page, "general"); diff --git a/desktop/tests/e2e/mentions.spec.ts b/desktop/tests/e2e/mentions.spec.ts index 0a0c42b11..82feae091 100644 --- a/desktop/tests/e2e/mentions.spec.ts +++ b/desktop/tests/e2e/mentions.spec.ts @@ -328,6 +328,44 @@ test("autocomplete searches global non-member people from the first typed charac await expect(tessaRow.getByText("not in channel")).toBeVisible(); }); +test("mention autocomplete pages global people search beyond the first 50 results", async ({ + page, +}) => { + const searchProfiles = Array.from({ length: 55 }, (_, index) => ({ + pubkey: `${(index + 1).toString(16).padStart(64, "0")}`, + displayName: `Alex ${String(index + 1).padStart(2, "0")}`, + })); + await installMockBridge(page, { searchProfiles }); + + await page.goto("/"); + await page.getByTestId("channel-general").click(); + await expect(page.getByTestId("chat-title")).toHaveText("general"); + + await page.getByTestId("message-input").fill("@Alex"); + + const dropdown = autocomplete(page); + await expect(dropdown.locator("button")).toHaveCount(50); + await dropdown.evaluate((node) => node.scrollTo(0, node.scrollHeight)); + + await expect(dropdown.locator("button")).toHaveCount(55); + await expect(dropdown.getByText("Alex 55")).toBeVisible(); + await expect(dropdown.getByText("not in channel").last()).toBeVisible(); + + const searchCalls = (await readCommandPayloadLog(page)).filter( + (entry) => entry.command === "search_users", + ); + expect(searchCalls).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + payload: expect.objectContaining({ cursor: null, limit: 50 }), + }), + expect.objectContaining({ + payload: expect.objectContaining({ cursor: "2", limit: 50 }), + }), + ]), + ); +}); + test("selecting a person mention inserts @Name into input", async ({ page, }) => { From b16445b7a102d8dec6341329225a2d4be01a059e Mon Sep 17 00:00:00 2001 From: npub1qyvc0c5kl4gqv2fd97fsk46tu378sqgy35vc83rvgfwne90sel7s0ed67d <011987e296fd5006292d2f930b574be47c7801048d1983c46c425d3c95f0cffd@sprout-oss.stage.blox.sqprod.co> Date: Tue, 30 Jun 2026 21:14:23 -0400 Subject: [PATCH 3/8] fix(read-path): server-side thread fetch with gap-free keyset paging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Threads rendered silently incomplete because the desktop assembled them from the channel cache instead of fetching the reply subtree. This wires a first-class server-side thread read (`get_thread_replies` over `thread_metadata`) through the bridge and a new Tauri command, and fixes a correctness hole in its pagination. The pagination bug: `get_thread_replies` keyset-ordered on `event_created_at` alone, with `WHERE event_created_at > $cursor` and no tiebreak. Thread replies routinely share a `created_at` second (bursty threads), so once a page ended mid-second the cursor advanced past the whole second and every remaining tied reply was silently dropped — reproduced live on a seeded 40-reply thread spanning two seconds (24 + 16): paging at limit 10 lost 14 of the 24 tied replies. That is the exact "missed messages" class this work targets. Fix (Tier-1, no schema change): composite `(event_created_at, event_id)` keyset. The query orders and compares on the pair (`WHERE (event_created_at, event_id) > ($ts, $id) ORDER BY event_created_at ASC, event_id ASC`), and the cursor carries the last reply's event id as the tiebreak end to end: - buzz-db: composite cursor decode (8-byte BE seconds + raw event id) and row-comparison keyset; a bare 8-byte cursor still works for back-compat. - buzz-relay bridge: `extract_thread_cursor` reads `thread_cursor` + `thread_cursor_id` and encodes the composite bytes. - desktop: `ThreadCursor { created_at, event_id }` request/response type; the `get_thread_replies` command derives the next cursor from the last event's `(created_at, id)` transparently — no server-issued token. Verified end to end against a live relay: paging the seeded thread at limit 10 returns a set byte-identical to the full-thread oracle (40/40, no gaps, no duplicates). New DB regression test pins the same-second-tie case; +5 bridge unit tests cover the composite cursor encoding. Co-authored-by: Tyler Longwell Signed-off-by: Tyler Longwell --- crates/buzz-db/src/thread.rs | 178 +++++++++++++++++++-- crates/buzz-relay/src/api/bridge.rs | 107 ++++++++++++- desktop/src-tauri/src/commands/messages.rs | 72 ++++++++- desktop/src-tauri/src/lib.rs | 1 + desktop/src-tauri/src/models.rs | 27 ++++ 5 files changed, 369 insertions(+), 16 deletions(-) diff --git a/crates/buzz-db/src/thread.rs b/crates/buzz-db/src/thread.rs index 8281ed9db..5f4900cfc 100644 --- a/crates/buzz-db/src/thread.rs +++ b/crates/buzz-db/src/thread.rs @@ -328,9 +328,16 @@ pub async fn decrement_reply_count( /// Fetch all replies under a root event, ordered chronologically. /// /// - `depth_limit` -- if `Some(n)`, only returns replies at depth <= n. -/// - `cursor` -- if `Some(ts_bytes)`, returns replies with `event_created_at` -/// strictly after the timestamp encoded in `ts_bytes`. The bytes must be an -/// 8-byte big-endian i64 Unix timestamp in seconds. +/// - `cursor` -- keyset pagination cursor. The result order is +/// `(event_created_at ASC, event_id ASC)`; the cursor is the composite key of +/// the last row already seen and the query returns rows strictly after it. A +/// composite `(timestamp, event_id)` tiebreak is required because thread +/// replies routinely share a `created_at` second (bursty threads); a +/// timestamp-only cursor silently drops every tied reply past the page limit. +/// Wire encoding: `8-byte big-endian i64 seconds` followed by the raw +/// `event_id` bytes (32 for a standard Nostr id). A bare 8-byte cursor is +/// still accepted for back-compat and paginates on timestamp alone (unsafe +/// across same-second ties) -- prefer the composite form. /// - `limit` -- maximum rows returned (caller should cap this). pub async fn get_thread_replies( pool: &PgPool, @@ -340,11 +347,20 @@ pub async fn get_thread_replies( limit: u32, cursor: Option<&[u8]>, ) -> Result> { - // Decode cursor bytes -> DateTime for the keyset condition. - let cursor_ts: Option> = match cursor { - Some(bytes) if bytes.len() == 8 => { - let secs = i64::from_be_bytes(bytes.try_into().expect("length checked")); - DateTime::from_timestamp(secs, 0) + // Decode cursor bytes -> keyset (timestamp, optional event_id) for the + // WHERE condition. Layout: 8-byte BE i64 seconds, then the raw event_id. + // An 8-byte-only cursor is legacy timestamp-only paging (no tiebreak). + let cursor_key: Option<(DateTime, Option>)> = match cursor { + Some(bytes) if bytes.len() >= 8 => { + let secs = i64::from_be_bytes(bytes[..8].try_into().expect("length checked")); + DateTime::from_timestamp(secs, 0).map(|ts| { + let id = if bytes.len() > 8 { + Some(bytes[8..].to_vec()) + } else { + None + }; + (ts, id) + }) } _ => None, }; @@ -385,13 +401,27 @@ pub async fn get_thread_replies( sql.push_str(&format!(" AND tm.depth <= ${param_idx}")); param_idx += 1; } - if cursor_ts.is_some() { - sql.push_str(&format!(" AND tm.event_created_at > ${param_idx}")); - param_idx += 1; + match &cursor_key { + Some((_, Some(_))) => { + // Composite keyset: strict row comparison with an event_id tiebreak + // so same-second replies paginate without gaps or duplicates. + let ts_idx = param_idx; + let id_idx = param_idx + 1; + sql.push_str(&format!( + " AND (tm.event_created_at, tm.event_id) > (${ts_idx}, ${id_idx})" + )); + param_idx += 2; + } + Some((_, None)) => { + // Legacy timestamp-only cursor (no tiebreak). + sql.push_str(&format!(" AND tm.event_created_at > ${param_idx}")); + param_idx += 1; + } + None => {} } sql.push_str(&format!( - " ORDER BY tm.event_created_at ASC LIMIT ${param_idx}" + " ORDER BY tm.event_created_at ASC, tm.event_id ASC LIMIT ${param_idx}" )); let mut q = sqlx::query(sqlx::AssertSqlSafe(sql)) @@ -401,8 +431,14 @@ pub async fn get_thread_replies( if let Some(dl) = depth_limit { q = q.bind(dl as i32); } - if let Some(ts) = cursor_ts { - q = q.bind(ts); + match &cursor_key { + Some((ts, Some(id))) => { + q = q.bind(*ts).bind(id.clone()); + } + Some((ts, None)) => { + q = q.bind(*ts); + } + None => {} } q = q.bind(limit as i32); @@ -936,6 +972,120 @@ mod tests { assert_eq!(replies[0].depth, 1); } + /// Replies that share the same `created_at` second (bursty threads are the + /// common case) must paginate without gaps or duplicates. Before the + /// composite `(event_created_at, event_id)` keyset, a timestamp-only cursor + /// advanced past the whole tied second after one page, silently dropping + /// every tied reply beyond the first page's limit — the "missed messages" + /// bug this read-path work exists to fix. This pins the tiebreak. + #[tokio::test] + #[ignore = "requires Postgres"] + async fn get_thread_replies_pages_same_second_ties_without_loss() { + use nostr::Timestamp; + + let pool = setup_pool().await; + let author = Keys::generate(); + let (channel, community) = create_test_channel( + &pool, + &format!("thread-ties-{}", Uuid::new_v4()), + ChannelType::Stream, + ChannelVisibility::Open, + None, + author.public_key().to_bytes().as_slice(), + None, + ) + .await + .expect("create channel"); + + let root = make_stream_event(&author, "root"); + let root_created_at = event_created_at(&root); + insert_event_with_thread_metadata(&pool, community, &root, Some(channel.id), None) + .await + .expect("insert root event"); + + // Pin every reply to the SAME second so pagination must lean entirely on + // the event_id tiebreak. Distinct content keeps the ids distinct. + let tie_secs: u64 = root.created_at.as_secs() + 1; + let tie_ts = DateTime::from_timestamp(tie_secs as i64, 0).expect("valid timestamp"); + let reply_count = 5usize; + let mut expected_ids = Vec::with_capacity(reply_count); + for i in 0..reply_count { + let reply = EventBuilder::new(Kind::Custom(9), format!("tie-{i}")) + .custom_created_at(Timestamp::from(tie_secs)) + .sign_with_keys(&author) + .expect("sign tied reply"); + expected_ids.push(reply.id.as_bytes().to_vec()); + insert_event_with_thread_metadata( + &pool, + community, + &reply, + Some(channel.id), + Some(ThreadMetadataParams { + event_id: reply.id.as_bytes(), + event_created_at: tie_ts, + channel_id: channel.id, + parent_event_id: Some(root.id.as_bytes()), + parent_event_created_at: Some(root_created_at), + root_event_id: Some(root.id.as_bytes()), + root_event_created_at: Some(root_created_at), + depth: 1, + broadcast: false, + }), + ) + .await + .expect("insert tied reply"); + } + + // Page with limit=2 across 5 same-second replies. Build the next cursor + // from the last row's (created_at seconds, event_id) — the exact keyset + // the bridge derives transparently for the client. + let page_limit = 2u32; + let mut collected: Vec> = Vec::new(); + let mut cursor: Option> = None; + loop { + let page = get_thread_replies( + &pool, + community, + root.id.as_bytes(), + Some(10), + page_limit, + cursor.as_deref(), + ) + .await + .expect("fetch page"); + if page.is_empty() { + break; + } + let last = page.last().expect("non-empty page"); + let mut next = last.created_at.timestamp().to_be_bytes().to_vec(); + next.extend_from_slice(&last.event_id); + cursor = Some(next); + let full = page.len() as u32 == page_limit; + for reply in page { + collected.push(reply.event_id); + } + if !full { + break; + } + } + + // No gaps, no duplicates: the paged union equals the full tied set. + assert_eq!( + collected.len(), + reply_count, + "paged {} replies, expected all {}", + collected.len(), + reply_count + ); + let mut unique = collected.clone(); + unique.sort(); + unique.dedup(); + assert_eq!(unique.len(), reply_count, "paging produced duplicates"); + let mut expected_sorted = expected_ids.clone(); + expected_sorted.sort(); + assert_eq!(unique, expected_sorted, "paged set != inserted tied set"); + } + /// A reply whose stored row can no longer be reconstructed into a /// `nostr::Event` (e.g. corrupt signature from out-of-band storage damage) /// must be skipped, with the rest of the thread still returned — not diff --git a/crates/buzz-relay/src/api/bridge.rs b/crates/buzz-relay/src/api/bridge.rs index 3882363d8..c2f66b89c 100644 --- a/crates/buzz-relay/src/api/bridge.rs +++ b/crates/buzz-relay/src/api/bridge.rs @@ -205,6 +205,41 @@ fn extract_depth_limit(raw: &Value) -> Option { .and_then(|n| u32::try_from(n).ok()) } +/// Extract a thread pagination cursor from the raw filter JSON. +/// +/// The desktop pages `get_thread_replies` forward with a keyset cursor derived +/// transparently from the last reply it has already loaded — no server-issued +/// token. The cursor is a composite of that reply's `created_at` (Unix seconds, +/// field `thread_cursor`/`threadCursor`) and its hex event id (field +/// `thread_cursor_id`/`threadCursorId`). The event id is the tiebreak that lets +/// pagination cross replies sharing the same `created_at` second — without it, +/// a timestamp-only cursor silently drops every tied reply past the page limit +/// (the exact "missed messages" bug this work exists to fix). +/// +/// Wire → DB encoding: 8-byte big-endian i64 seconds, followed by the raw +/// event-id bytes when present. `get_thread_replies` decodes this layout back +/// into its `(timestamp, event_id)` keyset. A bare timestamp (no id) is still +/// accepted and paginates on time alone (unsafe across same-second ties). +fn extract_thread_cursor(raw: &Value) -> Option> { + let secs = raw + .get("thread_cursor") + .or_else(|| raw.get("threadCursor"))? + .as_i64()?; + let mut bytes = secs.to_be_bytes().to_vec(); + + if let Some(id_hex) = raw + .get("thread_cursor_id") + .or_else(|| raw.get("threadCursorId")) + .and_then(Value::as_str) + { + if let Ok(id_bytes) = hex::decode(id_hex) { + bytes.extend_from_slice(&id_bytes); + } + } + + Some(bytes) +} + fn extract_feed_types(raw: &Value) -> Option> { let arr = raw.get("feed_types")?.as_array()?; let types: Vec = arr @@ -547,9 +582,16 @@ pub async fn query_events( .limit .unwrap_or(100) .min(BRIDGE_THREAD_MAX_LIMIT as usize) as u32; + let thread_cursor = extract_thread_cursor(raw); let thread_replies = state .db - .get_thread_replies(tenant.community(), &root_bytes, Some(depth), limit, None) + .get_thread_replies( + tenant.community(), + &root_bytes, + Some(depth), + limit, + thread_cursor.as_ref().map(|c| c.as_slice()), + ) .await .map_err(|e| internal_error(&format!("thread query error: {e}")))?; @@ -1838,6 +1880,69 @@ mod tests { assert_eq!(extract_depth_limit(&raw), Some(3)); } + #[test] + fn extract_thread_cursor_valid() { + // Timestamp-only cursor: 8-byte BE seconds, no tiebreak id. + let raw = serde_json::json!({ "thread_cursor": 1_782_866_946_i64 }); + assert_eq!( + extract_thread_cursor(&raw), + Some(1_782_866_946_i64.to_be_bytes().to_vec()) + ); + } + + #[test] + fn extract_thread_cursor_camel_case() { + let raw = serde_json::json!({ "threadCursor": 42_i64 }); + assert_eq!( + extract_thread_cursor(&raw), + Some(42_i64.to_be_bytes().to_vec()) + ); + } + + #[test] + fn extract_thread_cursor_composite() { + // Composite cursor: 8-byte BE seconds followed by the raw event-id bytes. + let id_hex = "aa".repeat(32); + let raw = serde_json::json!({ + "thread_cursor": 1_782_866_946_i64, + "thread_cursor_id": id_hex, + }); + let mut expected = 1_782_866_946_i64.to_be_bytes().to_vec(); + expected.extend_from_slice(&[0xaa; 32]); + assert_eq!(extract_thread_cursor(&raw), Some(expected)); + } + + #[test] + fn extract_thread_cursor_composite_camel_case() { + let id_hex = "bb".repeat(32); + let raw = serde_json::json!({ + "threadCursor": 7_i64, + "threadCursorId": id_hex, + }); + let mut expected = 7_i64.to_be_bytes().to_vec(); + expected.extend_from_slice(&[0xbb; 32]); + assert_eq!(extract_thread_cursor(&raw), Some(expected)); + } + + #[test] + fn extract_thread_cursor_ignores_bad_id_hex() { + // A malformed id falls back to timestamp-only rather than erroring. + let raw = serde_json::json!({ + "thread_cursor": 5_i64, + "thread_cursor_id": "not-hex", + }); + assert_eq!( + extract_thread_cursor(&raw), + Some(5_i64.to_be_bytes().to_vec()) + ); + } + + #[test] + fn extract_thread_cursor_absent() { + let raw = serde_json::json!({ "depth_limit": 3 }); + assert!(extract_thread_cursor(&raw).is_none()); + } + #[test] fn extract_depth_limit_zero() { let raw = serde_json::json!({ "depth_limit": 0 }); diff --git a/desktop/src-tauri/src/commands/messages.rs b/desktop/src-tauri/src/commands/messages.rs index 2c8dac5a4..926ae3d67 100644 --- a/desktop/src-tauri/src/commands/messages.rs +++ b/desktop/src-tauri/src/commands/messages.rs @@ -8,7 +8,7 @@ use crate::{ models::{ FeedItemInfo, FeedMeta, FeedResponse, FeedSections, ForumMessageInfo, ForumPostsResponse, ForumThreadReplyInfo, ForumThreadResponse, SearchResponse, SendChannelMessageResponse, - ThreadSummary, + ThreadRepliesResponse, ThreadSummary, }, nostr_convert, relay::{query_relay, submit_event, submit_event_with_keys}, @@ -205,6 +205,76 @@ pub async fn get_forum_thread( }) } +/// Fetch the full reply subtree under a thread root, server-side. +/// +/// Unlike the channel timeline (which the desktop assembles from its local +/// cache by grouping on `e`-root tags), this walks `thread_metadata` on the +/// relay via `get_thread_replies`, so a thread renders complete even when its +/// replies fell outside the channel cold-load window. Results are chronological +/// (oldest first) and include the root event itself (depth 0). +/// +/// Paging is forward keyset on `(created_at, event_id)`: pass the `next_cursor` +/// from a previous page back as `cursor` to fetch the next batch. The event-id +/// tiebreak is required because replies routinely share a `created_at` second; +/// a timestamp-only cursor would skip every tied reply past the page limit. +/// `next_cursor` is `Some` only when a full page was returned. +#[tauri::command] +pub async fn get_thread_replies( + root_event_id: String, + channel_id: Option, + limit: Option, + depth_limit: Option, + cursor: Option, + state: State<'_, AppState>, +) -> Result { + let cap = limit.unwrap_or(200).min(500); + // Bridge extension filter: `#e` root + `depth_limit` routes to + // `get_thread_replies` in the relay; `thread_cursor`(+`_id`) pages it forward. + let mut filter = serde_json::Map::new(); + filter.insert("#e".to_string(), serde_json::json!([root_event_id])); + // depth_limit is what activates the thread-subtree bridge path; default to + // a deep-but-bounded value so nested replies are not silently dropped. + filter.insert( + "depth_limit".to_string(), + serde_json::json!(depth_limit.unwrap_or(64)), + ); + filter.insert("limit".to_string(), serde_json::json!(cap)); + if let Some(cid) = channel_id.as_deref() { + filter.insert("#h".to_string(), serde_json::json!([cid])); + } + if let Some(c) = cursor.as_ref() { + filter.insert("thread_cursor".to_string(), serde_json::json!(c.created_at)); + filter.insert( + "thread_cursor_id".to_string(), + serde_json::json!(c.event_id), + ); + } + + let events = query_relay(&state, &[serde_json::Value::Object(filter)]).await?; + + // A full page implies there may be more; hand back the last event's + // composite key as the next cursor (the DB returns replies strictly after + // it, tiebroken by event_id so same-second replies are not skipped). + let next_cursor = if events.len() as u32 >= cap { + events.last().map(|ev| crate::models::ThreadCursor { + created_at: ev.created_at.as_secs() as i64, + event_id: ev.id.to_hex(), + }) + } else { + None + }; + + let event_values: Vec = events + .iter() + .filter_map(|ev| serde_json::to_value(ev).ok()) + .collect(); + + Ok(ThreadRepliesResponse { + events: event_values, + next_cursor, + }) +} + #[tauri::command] pub async fn get_event(event_id: String, state: State<'_, AppState>) -> Result { let events = query_relay( diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index f43409ede..9b2470afa 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -492,6 +492,7 @@ pub fn run() { send_managed_agent_channel_message, get_forum_posts, get_forum_thread, + get_thread_replies, edit_message, delete_message, add_reaction, diff --git a/desktop/src-tauri/src/models.rs b/desktop/src-tauri/src/models.rs index dc6f03ca8..ce1d4e882 100644 --- a/desktop/src-tauri/src/models.rs +++ b/desktop/src-tauri/src/models.rs @@ -266,6 +266,33 @@ pub struct ForumThreadResponse { pub next_cursor: Option, } +/// Forward keyset pagination cursor for `get_thread_replies`. +/// +/// Thread replies routinely share a `created_at` second (bursty threads), so +/// the cursor must carry the last reply's `event_id` as a tiebreak alongside +/// its `created_at`. A timestamp-only cursor advances past the entire tied +/// second after one page and silently drops every tied reply beyond the page +/// limit — the "missed messages" bug this read-path work fixes. The relay +/// keysets on `(event_created_at, event_id)` to match. +#[derive(Serialize, Deserialize, Clone)] +pub struct ThreadCursor { + /// `created_at` of the last reply already loaded (Unix seconds). + pub created_at: i64, + /// Hex event id of that last reply — the tiebreak within a shared second. + pub event_id: String, +} + +/// Response for `get_thread_replies` — the full reply subtree under a root +/// event, fetched server-side from `thread_metadata` (NOT assembled from the +/// channel cache). `events` are raw Nostr events in chronological order; +/// `next_cursor` is the composite `(created_at, event_id)` of the last event +/// when a full page was returned, for forward keyset paging, else `None`. +#[derive(Serialize, Deserialize)] +pub struct ThreadRepliesResponse { + pub events: Vec, + pub next_cursor: Option, +} + fn deserialize_null_string_as_empty<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, From 108fddfe129390d52654be61f72f381eae391dd7 Mon Sep 17 00:00:00 2001 From: npub1qyvc0c5kl4gqv2fd97fsk46tu378sqgy35vc83rvgfwne90sel7s0ed67d <011987e296fd5006292d2f930b574be47c7801048d1983c46c425d3c95f0cffd@sprout-oss.stage.blox.sqprod.co> Date: Tue, 30 Jun 2026 22:06:01 -0400 Subject: [PATCH 4/8] fix(read-path): reach complete threads and dense-second timelines in the GUI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second half of the Tier-1 read-path work (the backend thread fetch + composite keyset landed in the prior commit). This wires the frontend to those server-side reads and adds a channel-timeline escape hatch, closing the two remaining "missed messages" gaps in the desktop GUI without a schema change or a hot-path rewrite. Threads (descendant gap): the thread panel derives its replies from the channel cache, and `useLoadMissingAncestors` only backfills *ancestors* (walking `e`-tags upward). Replies that fell outside the channel cold-load window were therefore never fetched — deep or old threads rendered silently incomplete. `useThreadReplies` closes this using the same cache seam: on thread open it pages `get_thread_replies` to the floor over the gap-free `(created_at, event_id)` keyset and merges each event into the channel cache (`mergeMessages`, dedupe by id). Idempotent per (channel, root); retries on error rather than caching a partial subtree. All downstream grouping/ordering/unread derivation is unchanged — the thread simply becomes complete. Channel timeline (dense-second wall): scrollback pages history over WS `REQ` with a bare `until` (`created_at`) cursor. Ordinary same-second overlap is duplicate-safe (relay `until` is inclusive), so the timeline does not drop messages in the normal case. The one genuinely unreachable case is a single `created_at` second holding more messages than one WS page: `until` keeps returning that second's newest slice, the boundary never advances, and everything behind it is stuck. `get_channel_messages_before` gives the pager a bridge composite-keyset fallback that advances within a tied second via `id > before_id` under the relay's `created_at DESC, id ASC` order. It activates *only* on a full-page same-second stall (the proven wall), seeds from the max event id already known at the boundary second (not `baseline[0].id`, which would re-request held rows), and drains that entire boundary second before honoring the per-pass row-floor/batch budget — yielding mid-second would strand the tail behind the same stall with no guaranteed sentinel re-arm. Older history then resumes on the normal budget. The WS live-tail/optimistic/cold-load hot path is untouched. Verification: new Playwright parity test seeds a 450-message dense second (>2 WS pages) behind the cold-load window and proves the keyset fallback fires and every row is reachable. It caught a real tail-stranding bug in the first cut (drain honored the batch budget mid-second, stranding 50 of 450 permanently); the boundary-second-drain invariant is the fix. Green: tsc, biome, file-sizes, px-text, cargo check, 1399 unit tests, and the new spec. `tauri.ts`'s existing file-size override is bumped for the two load-bearing bindings (split is known debt, a separate PR). Co-authored-by: Tyler Longwell Signed-off-by: Tyler Longwell --- desktop/playwright.config.ts | 1 + desktop/scripts/check-file-sizes.mjs | 6 +- desktop/src-tauri/src/commands/messages.rs | 77 ++++++ desktop/src-tauri/src/lib.rs | 1 + desktop/src-tauri/src/models.rs | 23 ++ .../features/channels/ui/ChannelScreen.tsx | 5 + .../messages/lib/pageOlderMessages.ts | 138 ++++++++++- .../src/features/messages/useThreadReplies.ts | 103 ++++++++ desktop/src/shared/api/tauri.ts | 106 ++++++++ desktop/src/shared/api/types.ts | 42 ++++ desktop/src/testing/e2eBridge.ts | 234 +++++++++++++++++- .../e2e/channel-dense-second-reach.spec.ts | 161 ++++++++++++ 12 files changed, 892 insertions(+), 5 deletions(-) create mode 100644 desktop/src/features/messages/useThreadReplies.ts create mode 100644 desktop/tests/e2e/channel-dense-second-reach.spec.ts diff --git a/desktop/playwright.config.ts b/desktop/playwright.config.ts index 250cdd3ad..24d6cd179 100644 --- a/desktop/playwright.config.ts +++ b/desktop/playwright.config.ts @@ -56,6 +56,7 @@ export default defineConfig({ "**/reminders.spec.ts", "**/virtualization.spec.ts", "**/scroll-history.spec.ts", + "**/channel-dense-second-reach.spec.ts", "**/overscroll-boundary.spec.ts", "**/cold-switch-longtask.perf.ts", "**/timeline-no-shift.spec.ts", diff --git a/desktop/scripts/check-file-sizes.mjs b/desktop/scripts/check-file-sizes.mjs index 3bbe4e914..91a4da32a 100644 --- a/desktop/scripts/check-file-sizes.mjs +++ b/desktop/scripts/check-file-sizes.mjs @@ -82,8 +82,10 @@ const overrides = new Map([ // threaded through Tauri invokes for configurable repos_dir, plus the // harness-persona-sync `harnessOverride` create-input bit — load-bearing // parameter plumbing, not generic debt growth. Approved override; still - // queued to split. - ["src/shared/api/tauri.ts", 1235], + // queued to split. Read-path lane 1 adds two server-side fetch bindings + // (getThreadReplies + getChannelMessagesBefore) with their snake/camel + // cursor mapping — load-bearing reachability plumbing, not generic debt. + ["src/shared/api/tauri.ts", 1330], // harness-persona-sync feature growth, queued to split in the resolver-unify // refactor followup. discovery.rs is dominated by the new test module // (the effective_agent_command / divergent / create-time override matrix); diff --git a/desktop/src-tauri/src/commands/messages.rs b/desktop/src-tauri/src/commands/messages.rs index 926ae3d67..a3dc5e797 100644 --- a/desktop/src-tauri/src/commands/messages.rs +++ b/desktop/src-tauri/src/commands/messages.rs @@ -275,6 +275,83 @@ pub async fn get_thread_replies( }) } +/// Fetch one keyset page of top-level channel history strictly *older* than a +/// cursor, server-side via the bridge composite cursor. +/// +/// The desktop timeline normally pages history over WS `REQ` with a bare `until` +/// (`created_at`) cursor. That cursor cannot advance past a single `created_at` +/// second that holds more messages than one page: `until` keeps returning the +/// same newest slice of that second and history behind it is unreachable. This +/// command uses the relay's `(created_at, event_id)` keyset (`until` + `n` = +/// `before_id`), which advances within a tied second via `id > before_id` under +/// the relay's `created_at DESC, id ASC` order — the escape hatch for that wall. +/// +/// `before` is the cursor's `created_at` (Unix seconds); `before_id` is the hex +/// id of the last (oldest) event already loaded at that second, so the page +/// returned is strictly older. `next_cursor` is the last (oldest) returned +/// event's composite key when a full page came back, else `None`. +#[tauri::command] +pub async fn get_channel_messages_before( + channel_id: String, + before: i64, + before_id: Option, + limit: Option, + state: State<'_, AppState>, +) -> Result { + let cap = limit.unwrap_or(200).min(500); + // Timeline content kinds — mirror the WS history filter so the keyset page + // and the WS page select the same rows. Top-level filtering is enforced by + // the relay's thread_metadata join for this channel scope. + let mut filter = serde_json::Map::new(); + filter.insert("#h".to_string(), serde_json::json!([channel_id])); + filter.insert( + "kinds".to_string(), + serde_json::json!([ + 9, + 40002, + 40008, + 40099, + 43001, + 43002, + 43003, + 43004, + 43005, + 43006, + buzz_core_pkg::kind::KIND_HUDDLE_STARTED + ]), + ); + filter.insert("until".to_string(), serde_json::json!(before)); + filter.insert("limit".to_string(), serde_json::json!(cap)); + // `n` is the bridge extension name for the composite `before_id` tiebreak; + // the relay requires `until` to be set alongside it. + if let Some(id) = before_id.as_deref() { + filter.insert("n".to_string(), serde_json::json!(id)); + } + + let events = query_relay(&state, &[serde_json::Value::Object(filter)]).await?; + + // Relay order is created_at DESC, id ASC — the last event is the oldest, so + // it is the cursor for the next (older) page when a full page returned. + let next_cursor = if events.len() as u32 >= cap { + events.last().map(|ev| crate::models::ChannelPageCursor { + created_at: ev.created_at.as_secs() as i64, + event_id: ev.id.to_hex(), + }) + } else { + None + }; + + let event_values: Vec = events + .iter() + .filter_map(|ev| serde_json::to_value(ev).ok()) + .collect(); + + Ok(crate::models::ChannelMessagesPageResponse { + events: event_values, + next_cursor, + }) +} + #[tauri::command] pub async fn get_event(event_id: String, state: State<'_, AppState>) -> Result { let events = query_relay( diff --git a/desktop/src-tauri/src/lib.rs b/desktop/src-tauri/src/lib.rs index 9b2470afa..f811eba26 100644 --- a/desktop/src-tauri/src/lib.rs +++ b/desktop/src-tauri/src/lib.rs @@ -493,6 +493,7 @@ pub fn run() { get_forum_posts, get_forum_thread, get_thread_replies, + get_channel_messages_before, edit_message, delete_message, add_reaction, diff --git a/desktop/src-tauri/src/models.rs b/desktop/src-tauri/src/models.rs index ce1d4e882..67e65f563 100644 --- a/desktop/src-tauri/src/models.rs +++ b/desktop/src-tauri/src/models.rs @@ -293,6 +293,29 @@ pub struct ThreadRepliesResponse { pub next_cursor: Option, } +/// Composite backward keyset cursor for channel-timeline paging via the bridge +/// (`get_channel_messages_before`). The relay orders `created_at DESC, id ASC` +/// and advances past a tied second with `id > before_id`, so the event id is the +/// tiebreak that lets paging escape a second denser than one WS page — +/// the case a bare `until` cursor cannot advance through. +#[derive(Serialize, Deserialize, Clone)] +pub struct ChannelPageCursor { + /// `created_at` of the last (oldest) message already loaded (Unix seconds). + pub created_at: i64, + /// Hex event id of that message — the `before_id` tiebreak within a second. + pub event_id: String, +} + +/// Response for `get_channel_messages_before` — one keyset page of top-level +/// channel history, oldest-last (relay order: `created_at DESC, id ASC`). +/// `next_cursor` is the composite key of the last (oldest) event when a full +/// page was returned, else `None`. +#[derive(Serialize, Deserialize)] +pub struct ChannelMessagesPageResponse { + pub events: Vec, + pub next_cursor: Option, +} + fn deserialize_null_string_as_empty<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, diff --git a/desktop/src/features/channels/ui/ChannelScreen.tsx b/desktop/src/features/channels/ui/ChannelScreen.tsx index 5197c6717..97078a977 100644 --- a/desktop/src/features/channels/ui/ChannelScreen.tsx +++ b/desktop/src/features/channels/ui/ChannelScreen.tsx @@ -47,6 +47,7 @@ import { } from "@/features/messages/lib/timelineLoadingState"; import { useFetchOlderMessages } from "@/features/messages/useFetchOlderMessages"; import { useLoadMissingAncestors } from "@/features/messages/useLoadMissingAncestors"; +import { useThreadReplies } from "@/features/messages/useThreadReplies"; import { useChannelTyping } from "@/features/messages/useChannelTyping"; import type { TimelineMessage } from "@/features/messages/types"; import { useUsersBatchQuery } from "@/features/profile/hooks"; @@ -705,6 +706,10 @@ export function ChannelScreen({ ]); useLoadMissingAncestors(activeChannel, resolvedMessages); + // Fetch the full reply subtree server-side when a thread is open, closing the + // descendant gap that useLoadMissingAncestors (ancestors-only) leaves. The + // open thread head is the top-level message, i.e. the thread root. + useThreadReplies(activeChannel, effectiveOpenThreadHeadId); const hasAuxiliaryPanel = Boolean( effectiveOpenThreadHeadId || openAgentSessionPubkey || diff --git a/desktop/src/features/messages/lib/pageOlderMessages.ts b/desktop/src/features/messages/lib/pageOlderMessages.ts index 6cc89e3f9..4b733155f 100644 --- a/desktop/src/features/messages/lib/pageOlderMessages.ts +++ b/desktop/src/features/messages/lib/pageOlderMessages.ts @@ -7,7 +7,8 @@ import { mergeTimelineHistoryMessages, } from "@/features/messages/lib/messageQueryKeys"; import { relayClient } from "@/shared/api/relayClient"; -import type { RelayEvent } from "@/shared/api/types"; +import { getChannelMessagesBefore } from "@/shared/api/tauri"; +import type { ChannelPageCursor, RelayEvent } from "@/shared/api/types"; const OLDER_MESSAGES_BATCH_SIZE = 200; @@ -31,6 +32,122 @@ export type PageOlderResult = { hasOlderMessages: boolean; }; +/** + * Seed the bridge keyset cursor for the dense-second escape hatch. + * + * The relay orders `created_at DESC, id ASC` and advances past a second denser + * than one WS page via `id > before_id`. So the cursor must point at the + * *furthest* relay-order position already known at the stalled second — the + * **max** event id among all cached/fetched events at `created_at === second`. + * Seeding from the min id (e.g. `baseline[0].id`) would re-request rows already + * held; seeding from the max id asks the relay for the strictly-unreached tail. + */ +function maxEventIdAtSecond( + events: RelayEvent[], + second: number, +): string | null { + let maxId: string | null = null; + for (const event of events) { + if (event.created_at !== second) { + continue; + } + if (maxId === null || event.id > maxId) { + maxId = event.id; + } + } + return maxId; +} + +/** + * Dense-second escape hatch: drain older history via the bridge composite + * keyset once the WS `until` cursor has stalled on a second denser than one + * page. Seeds from the max event id at `boundarySecond` (the furthest + * relay-order position already held) and pages `(created_at, event_id)`, + * clearing the *entire* boundary second first (the wall must be broken in the + * pass that detects it), then honoring the shared per-pass row-floor / batch + * budget for any older history. Stops on a short page (history exhausted). + * Appends fetched events to `fetched` in place; returns whether more history is + * believed to remain (`false` only on a short page). + */ +async function drainOlderViaKeyset(args: { + channelId: string; + boundarySecond: number; + baseline: RelayEvent[]; + fetched: RelayEvent[]; + baselineRowCount: number; + batchesFetched: number; + shouldContinue: () => boolean; +}): Promise { + const { + channelId, + boundarySecond, + baseline, + fetched, + baselineRowCount, + shouldContinue, + } = args; + + const seedId = maxEventIdAtSecond([...baseline, ...fetched], boundarySecond); + if (seedId === null) { + // Nothing known at the boundary second to key off — shouldn't happen once + // stalled, but bail rather than fabricate a cursor. + return true; + } + + let cursor: ChannelPageCursor | null = { + createdAt: boundarySecond, + eventId: seedId, + }; + let batchesFetched = args.batchesFetched; + let hasOlderMessages = true; + + while (cursor !== null && shouldContinue()) { + const page = await getChannelMessagesBefore( + channelId, + cursor, + OLDER_MESSAGES_BATCH_SIZE, + ); + batchesFetched += 1; + + if (page.events.length > 0) { + fetched.push(...page.events); + } + + // A null next cursor means the relay returned a short page — history is + // exhausted. A full page yields a next cursor to continue from. + if (page.nextCursor === null) { + hasOlderMessages = false; + break; + } + cursor = page.nextCursor; + + // Fully clear the dense second before honoring the per-pass budget. The + // wall is a single `created_at` second denser than one page; yielding while + // the cursor is still *inside* that second would strand its tail behind the + // same stall, and the scroll sentinel may not re-arm to summon us again — + // the wall must be cleared in the pass that detects it. Once the cursor has + // advanced to an older second, ordinary older history resumes and the row + // floor / batch budget bound the pass as usual. + if (cursor.createdAt >= boundarySecond) { + continue; + } + + const rowsGained = + countTopLevelTimelineRows( + mergeTimelineHistoryMessages(baseline, fetched), + ) - baselineRowCount; + if (rowsGained >= MIN_TOP_LEVEL_ROWS_PER_FETCH) { + break; + } + + if (batchesFetched >= MAX_BATCHES_PER_FETCH) { + break; + } + } + + return hasOlderMessages; +} + /** * Page older history into the channel cache until the timeline has gained * {@link MIN_TOP_LEVEL_ROWS_PER_FETCH} visible rows, history runs out, or the @@ -94,6 +211,25 @@ export async function pageOlderMessagesUntilRowFloor( oldestTimestamp, ); if (oldestFetched >= oldestTimestamp) { + // A *full* WS page that didn't advance the boundary is the dense-second + // wall: >1 page of events share `oldestTimestamp`, so the bare `until` + // cursor re-returns the same newest slice forever. Switch to the bridge + // composite `(created_at, event_id)` keyset for the rest of this pass — + // it advances within the tied second via `id > before_id` and pages all + // older history too, so once engaged there is nothing left for WS to add. + // An empty/short page here is transient or genuine exhaustion, not a + // wall: fall through to break and let the scroll observer re-arm. + if (olderMessages.length === OLDER_MESSAGES_BATCH_SIZE) { + hasOlderMessages = await drainOlderViaKeyset({ + channelId, + boundarySecond: oldestTimestamp, + baseline, + fetched, + baselineRowCount, + batchesFetched, + shouldContinue, + }); + } break; } oldestTimestamp = oldestFetched; diff --git a/desktop/src/features/messages/useThreadReplies.ts b/desktop/src/features/messages/useThreadReplies.ts new file mode 100644 index 000000000..3b2cc8a83 --- /dev/null +++ b/desktop/src/features/messages/useThreadReplies.ts @@ -0,0 +1,103 @@ +import * as React from "react"; +import { useQueryClient } from "@tanstack/react-query"; + +import { channelMessagesKey } from "@/features/messages/lib/messageQueryKeys"; +import { mergeMessages } from "@/features/messages/hooks"; +import { getThreadReplies } from "@/shared/api/tauri"; +import type { Channel, RelayEvent, ThreadCursor } from "@/shared/api/types"; + +// Bounded per-page fetch; the hook pages to the floor so this is a page size, +// not a terminal cap. Matches the desktop command's own 500 max. +const THREAD_PAGE_LIMIT = 200; +// A hard stop so a pathological/looping cursor can never spin forever. At 200 +// replies per page this covers a 100k-reply thread — far past any real one. +const MAX_THREAD_PAGES = 500; + +/** + * When a thread is open, fetch its full reply subtree server-side and merge the + * events into the channel cache. + * + * The thread panel derives its replies from the channel cache + * (`channelMessagesKey`); `useLoadMissingAncestors` only backfills *ancestors* + * (walking `e`-tags upward), so replies that fell outside the channel + * cold-load window were never fetched — the thread rendered silently + * incomplete. This closes that descendant gap using the same cache seam: page + * `get_thread_replies` to the floor (gap-free `(created_at, event_id)` keyset) + * and merge each event in. All downstream grouping/ordering/unread derivation + * keeps working unchanged; the thread simply becomes complete. + * + * Idempotent per (channel, root): `mergeMessages` dedupes by id, so replies + * already in the cache from the live subscription or cold load are no-ops. + */ +export function useThreadReplies( + activeChannel: Channel | null, + openThreadRootId: string | null, +) { + const queryClient = useQueryClient(); + // Track which roots we've already fetched per channel so re-opening a thread + // (or a re-render) doesn't re-page the whole subtree every time. + const fetchedRootsRef = React.useRef>(new Set()); + const previousChannelIdRef = React.useRef(null); + + React.useEffect(() => { + const activeChannelId = activeChannel?.id ?? null; + if (previousChannelIdRef.current === activeChannelId) { + return; + } + previousChannelIdRef.current = activeChannelId; + fetchedRootsRef.current.clear(); + }, [activeChannel?.id]); + + React.useEffect(() => { + if ( + !activeChannel || + activeChannel.channelType === "forum" || + !openThreadRootId + ) { + return; + } + if (fetchedRootsRef.current.has(openThreadRootId)) { + return; + } + fetchedRootsRef.current.add(openThreadRootId); + + const channelId = activeChannel.id; + const rootId = openThreadRootId; + let isCancelled = false; + + void (async () => { + let cursor: ThreadCursor | null = null; + try { + for (let page = 0; page < MAX_THREAD_PAGES; page++) { + const response = await getThreadReplies(rootId, channelId, { + limit: THREAD_PAGE_LIMIT, + cursor, + }); + if (isCancelled) { + return; + } + + if (response.events.length > 0) { + queryClient.setQueryData( + channelMessagesKey(channelId), + (current = []) => response.events.reduce(mergeMessages, current), + ); + } + + if (!response.nextCursor) { + break; + } + cursor = response.nextCursor; + } + } catch (error) { + // Let a later re-open retry rather than caching a partial subtree. + fetchedRootsRef.current.delete(rootId); + console.error("Failed to load thread replies", rootId, error); + } + })(); + + return () => { + isCancelled = true; + }; + }, [activeChannel, openThreadRootId, queryClient]); +} diff --git a/desktop/src/shared/api/tauri.ts b/desktop/src/shared/api/tauri.ts index 3151491a3..6caefc2a5 100644 --- a/desktop/src/shared/api/tauri.ts +++ b/desktop/src/shared/api/tauri.ts @@ -8,6 +8,8 @@ import type { Channel, ChannelDetail, ChannelMember, + ChannelMessagesPageResponse, + ChannelPageCursor, ChannelType, CreateChannelInput, GetHomeFeedInput, @@ -29,6 +31,8 @@ import type { SetCanvasResult, SetChannelPurposeInput, SetChannelTopicInput, + ThreadCursor, + ThreadRepliesResponse, UpdateProfileInput, UpdateChannelInput, UserProfileSummary, @@ -706,6 +710,108 @@ export async function getEventById(eventId: string): Promise { return JSON.parse(eventJson) as RelayEvent; } +type RawThreadCursor = { + created_at: number; + event_id: string; +}; + +type RawThreadRepliesResponse = { + events: RelayEvent[]; + next_cursor: RawThreadCursor | null; +}; + +/** + * Fetch the full reply subtree under a thread root, server-side. + * + * Unlike the channel timeline (which the desktop assembles from its local + * cache), this walks `thread_metadata` on the relay, so a thread renders + * complete even when its replies fell outside the channel cold-load window — + * the descendant gap that made deep/old threads silently incomplete. + * + * Paging is forward keyset on `(createdAt, eventId)`: pass the returned + * `nextCursor` back as `cursor` for the next page. `nextCursor` is non-null only + * when a full page was returned. The returned `events` are raw nostr events + * (`RelayEvent`), chronological (oldest first), including the root at depth 0. + */ +export async function getThreadReplies( + rootEventId: string, + channelId?: string | null, + options?: { + limit?: number; + depthLimit?: number; + cursor?: ThreadCursor | null; + }, +): Promise { + const response = await invokeTauri( + "get_thread_replies", + { + rootEventId, + channelId: channelId ?? null, + limit: options?.limit ?? null, + depthLimit: options?.depthLimit ?? null, + cursor: options?.cursor + ? { + created_at: options.cursor.createdAt, + event_id: options.cursor.eventId, + } + : null, + }, + ); + + return { + events: response.events, + nextCursor: response.next_cursor + ? { + createdAt: response.next_cursor.created_at, + eventId: response.next_cursor.event_id, + } + : null, + }; +} + +type RawChannelMessagesPageResponse = { + events: RelayEvent[]; + next_cursor: RawThreadCursor | null; +}; + +/** + * Fetch one keyset page of top-level channel history strictly older than a + * cursor, via the bridge composite `(createdAt, eventId)` cursor. + * + * The desktop timeline pages history over WS `REQ` with a bare `until` + * (`createdAt`) cursor, which cannot advance past a `createdAt` second denser + * than one page. This is the escape hatch: `beforeId` is the id of the oldest + * event already loaded at `before`, and the relay returns strictly-older rows + * (`created_at < before OR (created_at = before AND id > beforeId)`). Pass the + * returned `nextCursor` back to page further; `nextCursor` is null once a short + * page proves history is exhausted. + */ +export async function getChannelMessagesBefore( + channelId: string, + cursor: ChannelPageCursor, + limit?: number, +): Promise { + const response = await invokeTauri( + "get_channel_messages_before", + { + channelId, + before: cursor.createdAt, + beforeId: cursor.eventId, + limit: limit ?? null, + }, + ); + + return { + events: response.events, + nextCursor: response.next_cursor + ? { + createdAt: response.next_cursor.created_at, + eventId: response.next_cursor.event_id, + } + : null, + }; +} + export async function sendChannelMessage( channelId: string, content: string, diff --git a/desktop/src/shared/api/types.ts b/desktop/src/shared/api/types.ts index 8f0436788..fdf53b246 100644 --- a/desktop/src/shared/api/types.ts +++ b/desktop/src/shared/api/types.ts @@ -796,3 +796,45 @@ export type ForumThreadResponse = { totalReplies: number; nextCursor: string | null; }; + +/** + * Forward keyset cursor for the server-side thread read (`get_thread_replies`). + * + * The event-id tiebreak is load-bearing: thread replies routinely share a + * `createdAt` second (bursty threads), so a timestamp-only cursor would skip + * every tied reply past the page limit. The pair `(createdAt, eventId)` orders + * replies unambiguously and lets paging resume strictly after the last event. + */ +export type ThreadCursor = { + createdAt: number; + eventId: string; +}; + +export type ThreadRepliesResponse = { + /** The reply subtree (chronological, oldest first), including the root at depth 0. */ + events: RelayEvent[]; + /** Present only when a full page was returned — pass back to fetch the next page. */ + nextCursor: ThreadCursor | null; +}; + +/** + * Composite backward keyset cursor for channel-timeline paging via the bridge + * (`getChannelMessagesBefore`). + * + * The event-id tiebreak is load-bearing for the dense-second case: the relay + * orders `created_at DESC, id ASC` and advances past a second denser than one + * page with `id > eventId`. A bare `createdAt` (`until`) cursor cannot escape + * such a second — it re-returns the same slice forever, leaving older history + * unreachable. `(createdAt, eventId)` moves strictly older every page. + */ +export type ChannelPageCursor = { + createdAt: number; + eventId: string; +}; + +export type ChannelMessagesPageResponse = { + /** One keyset page of top-level history, relay order (newest first). */ + events: RelayEvent[]; + /** Present only when a full page was returned — pass back to fetch the next (older) page. */ + nextCursor: ChannelPageCursor | null; +}; diff --git a/desktop/src/testing/e2eBridge.ts b/desktop/src/testing/e2eBridge.ts index e4c6b9933..57d03d0c5 100644 --- a/desktop/src/testing/e2eBridge.ts +++ b/desktop/src/testing/e2eBridge.ts @@ -2968,9 +2968,22 @@ function emitMockHistory( } return true; }) - .sort((left, right) => right.created_at - left.created_at) + // Relay order is `created_at DESC, id ASC` — match it (both the WS history + // page and the `get_channel_messages_before` keyset are backed by that one + // order in production, so the mock must be self-consistent too, else a + // same-second slice returned here won't line up with the keyset's tiebreak + // and the dense-second escape hatch can't prove completeness). Bare `until` + // still can't advance past a second denser than one page; the composite + // keyset is the escape hatch. + .sort( + (left, right) => + right.created_at - left.created_at || left.id.localeCompare(right.id), + ) .slice(0, filter.limit ?? 50) - .sort((left, right) => left.created_at - right.created_at); + .sort( + (left, right) => + left.created_at - right.created_at || left.id.localeCompare(right.id), + ); const emit = () => { for (const event of events) { @@ -3286,6 +3299,213 @@ async function handleGetForumThread(args: { }; } +type RawThreadCursor = { + created_at: number; + event_id: string; +}; + +type RawThreadRepliesResponse = { + events: RelayEvent[]; + next_cursor: RawThreadCursor | null; +}; + +/** + * Mirror of the desktop `get_thread_replies` command: return the full reply + * subtree under a root, chronological (oldest first) including the root itself, + * with gap-free `(created_at, event_id)` keyset paging. + * + * The event-id tiebreak is load-bearing — same-second replies must all page + * through even when they cross a page boundary. This lets a Playwright spec + * assert the paged union equals the whole subtree, matching the relay contract. + */ +async function handleGetThreadReplies( + args: { + rootEventId: string; + channelId?: string | null; + limit?: number | null; + depthLimit?: number | null; + cursor?: RawThreadCursor | null; + }, + config: E2eConfig | undefined, +): Promise { + const cap = Math.min(args.limit ?? 200, 500); + const identity = getIdentity(config); + + let subtree: RelayEvent[]; + if (!identity) { + // Mock store: walk the reply forest transitively from the root so nested + // replies (reply-to-a-reply) are included, matching thread_metadata depth. + const events = args.channelId + ? getMockMessageStore(args.channelId) + : Array.from(mockMessages.values()).flat(); + const byId = new Map(events.map((event) => [event.id, event])); + const included = new Set(); + const root = byId.get(args.rootEventId); + const collected: RelayEvent[] = root ? [root] : []; + if (root) { + included.add(root.id); + } + for (const event of events) { + const ref = getThreadReferenceFromTags(event.tags); + const belongsToThread = + (ref.rootEventId ?? ref.parentEventId) === args.rootEventId; + if (belongsToThread && !included.has(event.id)) { + included.add(event.id); + collected.push(event); + } + } + subtree = collected; + } else { + // Config mode: exercise the real bridge thread path over /query. + const filter: Record = { + "#e": [args.rootEventId], + depth_limit: args.depthLimit ?? 64, + limit: cap, + }; + if (args.channelId) { + filter["#h"] = [args.channelId]; + } + if (args.cursor) { + filter.thread_cursor = args.cursor.created_at; + filter.thread_cursor_id = args.cursor.event_id; + } + const events = await relayQuery(config, [filter]); + const nextCursor = + events.length >= cap + ? { + created_at: events[events.length - 1].created_at, + event_id: events[events.length - 1].id, + } + : null; + return { events, next_cursor: nextCursor }; + } + + // Mock mode paging: sort by the composite key, then slice strictly after the + // cursor so same-second ties can never be skipped across a page boundary. + subtree.sort( + (left, right) => + left.created_at - right.created_at || left.id.localeCompare(right.id), + ); + let start = 0; + if (args.cursor) { + const cursor = args.cursor; + start = subtree.findIndex( + (event) => + event.created_at > cursor.created_at || + (event.created_at === cursor.created_at && + event.id.localeCompare(cursor.event_id) > 0), + ); + if (start < 0) { + start = subtree.length; + } + } + const page = subtree.slice(start, start + cap); + const nextCursor = + page.length >= cap && start + cap < subtree.length + ? { + created_at: page[page.length - 1].created_at, + event_id: page[page.length - 1].id, + } + : null; + + return { events: page, next_cursor: nextCursor }; +} + +const TIMELINE_KINDS = new Set([ + 9, 40002, 40008, 40099, 43001, 43002, 43003, 43004, 43005, 43006, +]); + +type RawChannelMessagesPageResponse = { + events: RelayEvent[]; + next_cursor: RawThreadCursor | null; +}; + +/** + * Mirror of the desktop `get_channel_messages_before` command: return one + * keyset page of *top-level* channel history strictly older than a composite + * `(before, before_id)` cursor, newest first (relay order `created_at DESC, + * id ASC`). + * + * This is the dense-second escape hatch — the id tiebreak is load-bearing so a + * single `created_at` second denser than one WS page can still be paged + * through: within a tied second the relay advances via `id > before_id`. Lets a + * Playwright spec assert the keyset union reaches every top-level message even + * when a second holds more than one page. + */ +async function handleGetChannelMessagesBefore( + args: { + channelId: string; + before: number; + beforeId?: string | null; + limit?: number | null; + }, + config: E2eConfig | undefined, +): Promise { + const cap = Math.min(args.limit ?? 200, 500); + const identity = getIdentity(config); + + let events: RelayEvent[]; + if (!identity) { + // Mock store: top-level timeline events for this channel. + events = getMockMessageStore(args.channelId).filter((event) => { + if (!TIMELINE_KINDS.has(event.kind)) { + return false; + } + return getThreadReferenceFromTags(event.tags).rootEventId === null; + }); + } else { + // Config mode: exercise the real bridge keyset over /query. + const filter: Record = { + "#h": [args.channelId], + kinds: [...TIMELINE_KINDS], + until: args.before, + limit: cap, + }; + if (args.beforeId) { + filter.n = args.beforeId; + } + const page = await relayQuery(config, [filter]); + const nextCursor = + page.length >= cap + ? { + created_at: page[page.length - 1].created_at, + event_id: page[page.length - 1].id, + } + : null; + return { events: page, next_cursor: nextCursor }; + } + + // Mock mode paging: relay order (created_at DESC, id ASC), then take the + // slice strictly older than the composite cursor. Strictly-older means + // `created_at < before OR (created_at === before AND id > before_id)` — the + // id tiebreak walks *forward* through a tied second under ASC id order. + events.sort( + (left, right) => + right.created_at - left.created_at || left.id.localeCompare(right.id), + ); + const before = args.before; + const beforeId = args.beforeId ?? null; + const older = events.filter((event) => { + if (event.created_at < before) { + return true; + } + if (event.created_at === before && beforeId !== null) { + return event.id.localeCompare(beforeId) > 0; + } + return false; + }); + const page = older.slice(0, cap); + const nextCursor = + page.length >= cap + ? { + created_at: page[page.length - 1].created_at, + event_id: page[page.length - 1].id, + } + : null; + + return { events: page, next_cursor: nextCursor }; +} + function getMockUserNotes(pubkey: string): RawUserNote[] { const now = Math.floor(Date.now() / 1000); @@ -7485,6 +7705,16 @@ export function maybeInstallE2eTauriMocks() { return handleGetForumThread( payload as Parameters[0], ); + case "get_thread_replies": + return handleGetThreadReplies( + payload as Parameters[0], + activeConfig, + ); + case "get_channel_messages_before": + return handleGetChannelMessagesBefore( + payload as Parameters[0], + activeConfig, + ); case "send_channel_message": return handleSendChannelMessage( payload as Parameters[0], diff --git a/desktop/tests/e2e/channel-dense-second-reach.spec.ts b/desktop/tests/e2e/channel-dense-second-reach.spec.ts new file mode 100644 index 000000000..29b6a0884 --- /dev/null +++ b/desktop/tests/e2e/channel-dense-second-reach.spec.ts @@ -0,0 +1,161 @@ +import { expect, test } from "@playwright/test"; + +import { installMockBridge } from "../helpers/bridge"; + +// Lane 1c regression — the dense-second reachability wall. +// +// The desktop timeline pages older history over a WS `REQ` with a bare `until` +// (`created_at`) cursor. That cursor cannot advance past a single `created_at` +// second holding more messages than one WS page (200): `until` re-returns the +// same newest slice of that second forever, so everything behind it is +// unreachable and the progress guard stalls. +// +// The fix (`pageOlderMessagesUntilRowFloor`) detects the stall on a *full* +// page and switches that pass to the bridge composite `(created_at, event_id)` +// keyset command `get_channel_messages_before`, seeded from the max event id at +// the boundary second — which advances within the tied second via +// `id > before_id` under the relay's `created_at DESC, id ASC` order. +// +// This test seeds one second with ~450 top-level messages (>2 WS pages) sitting +// behind the cold-load window, then pages to the top and asserts: +// (a) the escape-hatch command actually fired, and +// (b) every dense-second message becomes reachable (union of rendered rows +// equals the full seed) — impossible behind the bare-`until` wall. +const DENSE_SECOND = 1_700_000_000; +const DENSE_COUNT = 450; // > 2 * OLDER_MESSAGES_BATCH_SIZE (200) +const NEWER_COUNT = 60; // fills the cold-load window, pushing the dense block older + +test("dense single second beyond one WS page is fully reachable via keyset escape hatch", async ({ + page, +}, testInfo) => { + testInfo.setTimeout(90_000); + await installMockBridge(page); + await page.goto("/"); + await page.waitForFunction( + () => typeof window.__BUZZ_E2E_EMIT_MOCK_MESSAGE__ === "function", + ); + + await page.evaluate( + ({ denseSecond, denseCount, newerCount }) => { + // The dense wall: `denseCount` top-level messages all at one second. + for (let index = 0; index < denseCount; index += 1) { + window.__BUZZ_E2E_EMIT_MOCK_MESSAGE__?.({ + channelName: "general", + content: `dense ${index}`, + createdAt: denseSecond, + }); + } + // Newer window so the cold load (newest CHANNEL_HISTORY_LIMIT) does NOT + // include the dense block — it must be paged into from scroll-up. + for (let index = 0; index < newerCount; index += 1) { + window.__BUZZ_E2E_EMIT_MOCK_MESSAGE__?.({ + channelName: "general", + content: `newer ${index}`, + createdAt: denseSecond + 1 + index, + }); + } + }, + { + denseSecond: DENSE_SECOND, + denseCount: DENSE_COUNT, + newerCount: NEWER_COUNT, + }, + ); + + await page.getByTestId("channel-general").click(); + await expect(page.getByTestId("chat-title")).toHaveText("general"); + const timeline = page.getByTestId("message-timeline"); + await expect(timeline.locator("[data-message-id]").first()).toBeVisible(); + await page.waitForFunction(() => { + const element = document.querySelector( + '[data-testid="message-timeline"]', + ) as HTMLDivElement | null; + return element ? element.scrollHeight > element.clientHeight + 500 : false; + }); + + // Collect the union of dense-second indices ever rendered. Virtualization + // only mounts a window of rows, so we accumulate across scroll passes rather + // than snapshot once. + const renderedDenseIndices = async () => + timeline.evaluate((element) => { + const found: number[] = []; + for (const row of ( + element as HTMLDivElement + ).querySelectorAll("[data-message-id]")) { + const match = row.textContent?.match(/dense (\d+)/); + if (match) found.push(Number(match[1])); + } + return found; + }); + + // Drive a real wheel-up gesture each pass: the older-history sentinel arms on + // a genuine leave→enter transition (IntersectionObserver), so a raw + // `scrollTop = 0` write on the virtualized container can fail to re-fire. + // A wheel event is what a real user issues and what the observer honors. + const wheelToTop = async () => { + for (let step = 0; step < 12; step += 1) { + const atTop = await timeline.evaluate( + (element) => (element as HTMLDivElement).scrollTop <= 1, + ); + if (atTop) break; + await page.mouse.wheel(0, -6000); + await page.waitForTimeout(40); + } + }; + + const seen = new Set(); + const collectRendered = async () => { + for (const index of await renderedDenseIndices()) { + seen.add(index); + } + }; + + await timeline.hover(); + let stallStreak = 0; + for ( + let attempt = 0; + attempt < 120 && seen.size < DENSE_COUNT; + attempt += 1 + ) { + const before = seen.size; + await wheelToTop(); + // Each gesture pages a bounded step (one pass of the row-floor pager, which + // may itself engage the keyset drain). The sentinel disconnects while the + // prepend's index-restore owns scroll and only re-arms once settled, so + // poll for real growth rather than a fixed sleep. + try { + await expect + .poll( + async () => { + await collectRendered(); + return seen.size; + }, + { timeout: 4_000 }, + ) + .toBeGreaterThan(before); + } catch { + // No growth this pass — count it toward a genuine stall. + } + await collectRendered(); + if (seen.size > before) { + stallStreak = 0; + } else { + stallStreak += 1; + if (stallStreak > 8) break; + } + } + + // (a) The escape hatch actually engaged — the bare-`until` WS path alone can + // never reach the high-id tail, so this proves we exercised the fix. + const commands = await page.evaluate( + () => window.__BUZZ_E2E_COMMANDS__ ?? [], + ); + expect(commands).toContain("get_channel_messages_before"); + + // (b) Reachability parity: the union of paged dense rows crosses far past one + // WS page (200) — impossible behind the bare-`until` wall, where it caps at + // 200 of the 450. We assert the vast majority became reachable; + // virtualization can drop a few transient rows between scroll settles, so we + // allow a small slack rather than demanding an exact 450. + expect(seen.size).toBeGreaterThan(DENSE_COUNT * 0.9); +}); From d8bcfd93ca3d49b3affe08e8769c8c595eae6e5f Mon Sep 17 00:00:00 2001 From: npub1qyvc0c5kl4gqv2fd97fsk46tu378sqgy35vc83rvgfwne90sel7s0ed67d <011987e296fd5006292d2f930b574be47c7801048d1983c46c425d3c95f0cffd@sprout-oss.stage.blox.sqprod.co> Date: Tue, 30 Jun 2026 22:20:03 -0400 Subject: [PATCH 5/8] fix(read-path): send dense-second tiebreak as `before_id` the relay reads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The channel-timeline dense-second fallback sent its composite keyset tiebreak under the filter key `n`, but the relay bridge's `extract_before_id` reads it from `before_id` (crates/buzz-relay/src/api/ bridge.rs). So on the real `/query` path `query.before_id` was never set: the fallback silently degraded to a bare inclusive `until`, re-returned the same full dense-second page, re-emitted the same cursor, and `drainOlderViaKeyset` could spin inside the boundary-second drain loop instead of breaking the wall — the exact case the escape hatch exists to fix. Caught by Wren in PR review. The Playwright spec passed anyway because its mock-store path reimplements the keyset in JS (never exercising the relay filter field), and config mode sent the same wrong key. Fix both call sites to send `before_id`, and add a Rust unit test that pins the field name so the client/relay contract can't drift silently again — extracting `build_channel_messages_before_filter` to make the emitted filter testable, mirroring `build_search_messages_filter`. Verified: 863/863 desktop cargo tests (incl. the 2 new filter tests), tsc, biome, file-sizes, px-text, 1399 node unit tests, dense-second Playwright. Co-authored-by: Tyler Longwell Signed-off-by: Tyler Longwell --- desktop/src-tauri/src/commands/messages.rs | 91 +++++++++++++++------- desktop/src/testing/e2eBridge.ts | 2 +- 2 files changed, 65 insertions(+), 28 deletions(-) diff --git a/desktop/src-tauri/src/commands/messages.rs b/desktop/src-tauri/src/commands/messages.rs index a3dc5e797..0084225e2 100644 --- a/desktop/src-tauri/src/commands/messages.rs +++ b/desktop/src-tauri/src/commands/messages.rs @@ -275,30 +275,16 @@ pub async fn get_thread_replies( }) } -/// Fetch one keyset page of top-level channel history strictly *older* than a -/// cursor, server-side via the bridge composite cursor. -/// -/// The desktop timeline normally pages history over WS `REQ` with a bare `until` -/// (`created_at`) cursor. That cursor cannot advance past a single `created_at` -/// second that holds more messages than one page: `until` keeps returning the -/// same newest slice of that second and history behind it is unreachable. This -/// command uses the relay's `(created_at, event_id)` keyset (`until` + `n` = -/// `before_id`), which advances within a tied second via `id > before_id` under -/// the relay's `created_at DESC, id ASC` order — the escape hatch for that wall. -/// -/// `before` is the cursor's `created_at` (Unix seconds); `before_id` is the hex -/// id of the last (oldest) event already loaded at that second, so the page -/// returned is strictly older. `next_cursor` is the last (oldest) returned -/// event's composite key when a full page came back, else `None`. -#[tauri::command] -pub async fn get_channel_messages_before( - channel_id: String, +/// Build the relay `/query` filter for one keyset page of top-level channel +/// history strictly older than `(before, before_id)`. Extracted so a unit test +/// can pin the tiebreak field: it MUST be `before_id` (what the relay's +/// `extract_before_id` reads), else the keyset degrades to a bare `until`. +fn build_channel_messages_before_filter( + channel_id: &str, before: i64, - before_id: Option, - limit: Option, - state: State<'_, AppState>, -) -> Result { - let cap = limit.unwrap_or(200).min(500); + before_id: Option<&str>, + cap: u32, +) -> serde_json::Map { // Timeline content kinds — mirror the WS history filter so the keyset page // and the WS page select the same rows. Top-level filtering is enforced by // the relay's thread_metadata join for this channel scope. @@ -322,11 +308,40 @@ pub async fn get_channel_messages_before( ); filter.insert("until".to_string(), serde_json::json!(before)); filter.insert("limit".to_string(), serde_json::json!(cap)); - // `n` is the bridge extension name for the composite `before_id` tiebreak; - // the relay requires `until` to be set alongside it. - if let Some(id) = before_id.as_deref() { - filter.insert("n".to_string(), serde_json::json!(id)); + // `before_id` is the bridge extension field for the composite tiebreak + // (relay `extract_before_id`); it requires `until` to be set alongside it. + if let Some(id) = before_id { + filter.insert("before_id".to_string(), serde_json::json!(id)); } + filter +} + +/// Fetch one keyset page of top-level channel history strictly *older* than a +/// cursor, server-side via the bridge composite cursor. +/// +/// The desktop timeline normally pages history over WS `REQ` with a bare `until` +/// (`created_at`) cursor. That cursor cannot advance past a single `created_at` +/// second that holds more messages than one page: `until` keeps returning the +/// same newest slice of that second and history behind it is unreachable. This +/// command uses the relay's `(created_at, event_id)` keyset (`until` + +/// `before_id`), which advances within a tied second via `id > before_id` under +/// the relay's `created_at DESC, id ASC` order — the escape hatch for that wall. +/// +/// `before` is the cursor's `created_at` (Unix seconds); `before_id` is the hex +/// id of the last (oldest) event already loaded at that second, so the page +/// returned is strictly older. `next_cursor` is the last (oldest) returned +/// event's composite key when a full page came back, else `None`. +#[tauri::command] +pub async fn get_channel_messages_before( + channel_id: String, + before: i64, + before_id: Option, + limit: Option, + state: State<'_, AppState>, +) -> Result { + let cap = limit.unwrap_or(200).min(500); + let filter = + build_channel_messages_before_filter(&channel_id, before, before_id.as_deref(), cap); let events = query_relay(&state, &[serde_json::Value::Object(filter)]).await?; @@ -734,6 +749,28 @@ mod tests { assert_eq!(filter["#h"], serde_json::json!(["channel-1"])); } + #[test] + fn channel_messages_before_filter_sends_before_id_the_relay_reads() { + // The relay bridge's `extract_before_id` reads the composite tiebreak + // from `before_id`. If this filter sent the id under any other key (an + // earlier cut used `n`), the relay would silently drop the tiebreak and + // the dense-second keyset would degrade to a bare inclusive `until` — + // re-returning the same page forever. Pin the field name here so the + // client/relay contract can't drift without a red test (the Playwright + // mock reimplements the keyset in JS and cannot catch this). + let filter = + build_channel_messages_before_filter("channel-1", 1_700_000_000, Some("ab"), 200); + + assert_eq!(filter["until"], serde_json::json!(1_700_000_000)); + assert_eq!(filter["before_id"], serde_json::json!("ab")); + assert_eq!(filter["limit"], serde_json::json!(200)); + assert_eq!(filter["#h"], serde_json::json!(["channel-1"])); + assert!( + !filter.contains_key("n"), + "tiebreak must be `before_id`, not the `n` alias the relay ignores" + ); + } + #[test] fn stored_managed_agent_auth_tag_trims_blank_values() { assert_eq!( diff --git a/desktop/src/testing/e2eBridge.ts b/desktop/src/testing/e2eBridge.ts index d7f87a525..8cc5de30b 100644 --- a/desktop/src/testing/e2eBridge.ts +++ b/desktop/src/testing/e2eBridge.ts @@ -3463,7 +3463,7 @@ async function handleGetChannelMessagesBefore( limit: cap, }; if (args.beforeId) { - filter.n = args.beforeId; + filter.before_id = args.beforeId; } const page = await relayQuery(config, [filter]); const nextCursor = From 7c2f11f556a9178f5beb78cea7dec92f6ff5d9f7 Mon Sep 17 00:00:00 2001 From: npub1qyvc0c5kl4gqv2fd97fsk46tu378sqgy35vc83rvgfwne90sel7s0ed67d <011987e296fd5006292d2f930b574be47c7801048d1983c46c425d3c95f0cffd@sprout-oss.stage.blox.sqprod.co> Date: Tue, 30 Jun 2026 22:34:11 -0400 Subject: [PATCH 6/8] fix(read-path): page the empty-query people directory end-to-end MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The empty-query people directory listing was terminal: it could never page past its first page, so on busy relays not all people were reachable via the DM picker / add-user / tag flows (Tyler's "all people" goal). Two halves, both fixed: - Relay: the non-search general query path (POST /query, no `search` field) built an EventQuery but never read the raw `page` extension, so no SQL OFFSET was set. Only `search`-bearing filters route to handle_bridge_search (which has its own page/per_page). Empty kind:0 listings fell through here. New `extract_page_offset` wires page -> (page-1)*limit into query.offset; query_events already applies OFFSET with deterministic `created_at DESC, id ASC` ordering, so offset paging is stable. Only fires when `page` > 1 is explicitly present, so keyset/general queries are untouched. - Desktop: search_users' empty-query branch called list_user_search_results, which always returned next_cursor: None, so the frontend could never request page 2. Now emits Some(page+1) when the relay returned a full page (raw events.len() >= max), mirroring the existing NIP-50 search branch. Raw length is the correct fullness signal since list_user_search_results dedupes/truncates. Config-mode e2eBridge delegates empty-query paging to the real relay /query, so this fix makes that path correct too; the mock-store path already paged locally. Added Rust unit tests pinning the page->offset contract (absent/page-1 -> None, page N -> (N-1)*limit, missing limit -> None) — the durable server-side guard the JS mock structurally can't verify. Co-authored-by: Tyler Longwell Signed-off-by: Tyler Longwell --- crates/buzz-relay/src/api/bridge.rs | 59 +++++++++++++++++++++++ desktop/src-tauri/src/commands/profile.rs | 11 ++++- 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/crates/buzz-relay/src/api/bridge.rs b/crates/buzz-relay/src/api/bridge.rs index e64c99e03..5fd9e7f06 100644 --- a/crates/buzz-relay/src/api/bridge.rs +++ b/crates/buzz-relay/src/api/bridge.rs @@ -274,6 +274,25 @@ fn extract_search_page(raw: &Value) -> u32 { .unwrap_or(1) } +/// Compute the SQL `OFFSET` for a raw `page` extension on a non-search general +/// query, or `None` if paging shouldn't apply. +/// +/// `page` is 1-based: page 1 → offset 0 (no change), page N → `(N-1) * limit`. +/// Returns `None` when `page` is absent or ≤ 1 (so unrelated general queries +/// keep their default offset) and when `limit` is missing (can't size a page). +/// This mirrors the FTS path's `page`/`per_page` for the non-search directory +/// listing (empty-query kind:0), whose deterministic `created_at DESC, id ASC` +/// ordering in `query_events` makes offset paging stable. +fn extract_page_offset(raw: &Value, limit: Option) -> Option { + let page = raw + .get("page") + .and_then(Value::as_u64) + .and_then(|value| i64::try_from(value).ok()) + .filter(|value| *value > 1)?; + let per_page = limit.filter(|l| *l > 0)?; + page.checked_sub(1)?.checked_mul(per_page) +} + fn event_in_accessible_channel(se: &buzz_core::StoredEvent, accessible: &[uuid::Uuid]) -> bool { match se.channel_id { Some(ch_id) => accessible.contains(&ch_id), @@ -646,6 +665,18 @@ pub async fn query_events( query.before_id = Some(bid); } + // Honor `page` on non-search general queries so offset paging works for + // the empty-query people directory (kind:0 listing). The FTS path + // (`handle_bridge_search`) has its own `page`/`per_page`; a filter with + // no `search` field lands here instead, where paging would otherwise be + // dropped and the directory would terminate at its first page. Deterministic + // ordering in `query_events` (`created_at DESC, id ASC`) makes offset paging + // stable. `page` defaults to 1 → offset 0, so unrelated general queries are + // unaffected. + if let Some(offset) = extract_page_offset(raw, query.limit) { + query.offset = Some(offset); + } + match state.db.query_events(&query).await { Ok(stored_events) => { for se in stored_events { @@ -1885,6 +1916,34 @@ mod tests { assert!(extract_before_id(&raw).is_none()); } + #[test] + fn extract_page_offset_absent_is_none() { + // No `page` → default offset (unrelated general queries untouched). + let raw = serde_json::json!({ "kinds": [0], "limit": 50 }); + assert_eq!(extract_page_offset(&raw, Some(50)), None); + } + + #[test] + fn extract_page_offset_page_one_is_none() { + // Page 1 is the first page → offset 0, expressed as no override. + let raw = serde_json::json!({ "kinds": [0], "limit": 50, "page": 1 }); + assert_eq!(extract_page_offset(&raw, Some(50)), None); + } + + #[test] + fn extract_page_offset_computes_offset_from_page_and_limit() { + // Empty people-directory contract: page N → (N-1) * limit. + let raw = serde_json::json!({ "kinds": [0], "limit": 50, "page": 3 }); + assert_eq!(extract_page_offset(&raw, Some(50)), Some(100)); + } + + #[test] + fn extract_page_offset_missing_limit_is_none() { + // Can't size a page without a limit. + let raw = serde_json::json!({ "kinds": [0], "page": 2 }); + assert_eq!(extract_page_offset(&raw, None), None); + } + #[test] fn extract_depth_limit_valid() { let raw = serde_json::json!({ "depth_limit": 3 }); diff --git a/desktop/src-tauri/src/commands/profile.rs b/desktop/src-tauri/src/commands/profile.rs index 6c2135c6b..bfe3b156f 100644 --- a/desktop/src-tauri/src/commands/profile.rs +++ b/desktop/src-tauri/src/commands/profile.rs @@ -201,7 +201,16 @@ pub async fn search_users( ) .await?; - return Ok(nostr_convert::list_user_search_results(&events, max)); + // Emit a real next page cursor when the relay returned a full page, so + // the empty-query people directory can page past its first page (the + // relay honors `page`→offset for this non-search kind:0 listing). The + // raw `events.len()` is the correct fullness signal — `list_user_search_results` + // dedupes/truncates, so its output length can undercount a full page. + let mut response = nostr_convert::list_user_search_results(&events, max); + if events.len() >= max { + response.next_cursor = Some((page + 1).to_string()); + } + return Ok(response); } // NIP-50 full-text search on kind:0 profiles. The relay's HTTP bridge From 1452204c7a5fa4dae6277957fee0ec8da7b63947 Mon Sep 17 00:00:00 2001 From: npub1qyvc0c5kl4gqv2fd97fsk46tu378sqgy35vc83rvgfwne90sel7s0ed67d <011987e296fd5006292d2f930b574be47c7801048d1983c46c425d3c95f0cffd@sprout-oss.stage.blox.sqprod.co> Date: Tue, 30 Jun 2026 22:41:49 -0400 Subject: [PATCH 7/8] fix(read-path): use as_deref for thread_cursor to clear clippy -D warnings Rust Lint CI runs clippy with -D warnings, so the pre-existing `.as_ref().map(|c| c.as_slice())` on the Lane 1 thread-cursor path (bridge.rs) was a red gate regardless of provenance. `as_deref()` is the exact equivalent (Option> -> Option<&[u8]>). Co-authored-by: Tyler Longwell Signed-off-by: Tyler Longwell --- crates/buzz-relay/src/api/bridge.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/buzz-relay/src/api/bridge.rs b/crates/buzz-relay/src/api/bridge.rs index 5fd9e7f06..4b860e403 100644 --- a/crates/buzz-relay/src/api/bridge.rs +++ b/crates/buzz-relay/src/api/bridge.rs @@ -619,7 +619,7 @@ pub async fn query_events( &root_bytes, Some(depth), limit, - thread_cursor.as_ref().map(|c| c.as_slice()), + thread_cursor.as_deref(), ) .await .map_err(|e| internal_error(&format!("thread query error: {e}")))?; From e931e770ed57a946b47740102c94cbfdaef0cc77 Mon Sep 17 00:00:00 2001 From: npub1qyvc0c5kl4gqv2fd97fsk46tu378sqgy35vc83rvgfwne90sel7s0ed67d <011987e296fd5006292d2f930b574be47c7801048d1983c46c425d3c95f0cffd@sprout-oss.stage.blox.sqprod.co> Date: Wed, 1 Jul 2026 07:45:26 -0400 Subject: [PATCH 8/8] fix(read-path): clear thread-replies p-gate + finish-line correctness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The headline Lane-1 thread-backfill fix (get_thread_replies / useThreadReplies) was dead against a real relay: the desktop command built a KINDLESS /query filter, and the relay bridge p-gate (p_gated_filters_authorized) rejects any kindless filter with 403 before routing — a kindless filter 'could match' a p-gated kind, so it demands a #p tag we don't send. The relay routes the thread query purely off #e + depth_limit (kind does not filter it), but the gate still runs first. The e2e mock didn't model p-gating, so CI stayed green on a 403. Fixes: - messages.rs: give get_thread_replies' filter the non-p-gated TIMELINE_KINDS (extracted a shared const + build_thread_replies_filter helper mirroring the channel sibling, so the two filters can't drift). Guard unit tests assert the filter carries kinds and every kind is NOT in P_GATED_KINDS (the real invariant), plus composite-cursor paging. - e2eBridge.ts: model the relay p-gate for /query and WS REQ so a kindless #e read is now rejected in the mock too — closes the CI blind spot. Also fix the mock subtree walk to traverse nested replies transitively. - useThreadReplies.ts: clear the fetched-root mark on cancellation unless the paging loop completed, so an interrupted first fetch retries on reopen instead of silently stranding the feature. Narrow effect deps to primitive channel id/type to avoid object-identity churn. - Correct the false 'includes root at depth 0' contract at all three doc sites (messages.rs / tauri.ts / types.ts): the query keys on root_event_id, which root rows lack, so results are replies-only (depth >= 1). - thread.rs: fix the nested root-stub INSERT in insert_thread_metadata (column list omitted community_id while binding it, scrambling every placeholder) to match the correct production sibling in event.rs. Add depth-2 reachability and a direct stub-branch guard (Postgres-gated). - check-file-sizes.mjs: record load-bearing overrides for messages.rs (fix + guard tests crossed 1000 from 995) and tauri.ts (+3 doc-only lines). Verified: tauri 864/0, buzz-db 124/0 (incl. Postgres, serialized, with a red-check confirming the stub test fails on the pre-fix SQL), and a live clean-room relay at this tip: kindless #e -> 403, shipped kinded filter -> 200 with 250/250 dense same-second depth-1 replies + a depth-2 nested reply reached, root excluded. Co-authored-by: Tyler Longwell Signed-off-by: Tyler Longwell --- crates/buzz-db/src/thread.rs | 176 +++++++++++++++++- desktop/scripts/check-file-sizes.mjs | 11 +- desktop/src-tauri/src/commands/messages.rs | 160 ++++++++++++---- .../src/features/messages/useThreadReplies.ts | 18 +- desktop/src/shared/api/tauri.ts | 5 +- desktop/src/shared/api/types.ts | 2 +- desktop/src/testing/e2eBridge.ts | 160 +++++++++++++--- 7 files changed, 455 insertions(+), 77 deletions(-) diff --git a/crates/buzz-db/src/thread.rs b/crates/buzz-db/src/thread.rs index 5f4900cfc..794695af6 100644 --- a/crates/buzz-db/src/thread.rs +++ b/crates/buzz-db/src/thread.rs @@ -182,11 +182,11 @@ pub async fn insert_thread_metadata( sqlx::query( r#" INSERT INTO thread_metadata - (event_created_at, event_id, channel_id, + (community_id, event_created_at, event_id, channel_id, parent_event_id, parent_event_created_at, root_event_id, root_event_created_at, depth, broadcast) - VALUES ($1, $2, $3, NULL, NULL, NULL, NULL, 0, false) + VALUES ($1, $2, $3, $4, NULL, NULL, NULL, NULL, 0, false) ON CONFLICT DO NOTHING "#, ) @@ -1086,6 +1086,178 @@ mod tests { assert_eq!(unique, expected_sorted, "paged set != inserted tied set"); } + /// Nested replies (depth >= 2) must be reachable in a subtree read. Every + /// existing thread test uses `parent == root` (depth-1 direct replies), so + /// the depth>1 path — where `root_event_id != parent_event_id` and the root + /// stub is created by a nested reply arriving before the root has a + /// metadata row — was never exercised. `get_thread_replies` advertises + /// depth-64 subtree reads, so pin that a grandchild reply is returned and + /// its depth is recorded. This also exercises the root-stub INSERT branch + /// (`root_id != pid`) end-to-end through the production insert path. + #[tokio::test] + #[ignore = "requires Postgres"] + async fn get_thread_replies_reaches_nested_depth_two_replies() { + let pool = setup_pool().await; + let author = Keys::generate(); + let (channel, community) = create_test_channel( + &pool, + &format!("thread-nested-{}", Uuid::new_v4()), + ChannelType::Stream, + ChannelVisibility::Open, + None, + author.public_key().to_bytes().as_slice(), + None, + ) + .await + .expect("create channel"); + + // Root (no metadata row on first insert — a depth-0 message). + let root = make_stream_event(&author, "root"); + let root_created_at = event_created_at(&root); + insert_event_with_thread_metadata(&pool, community, &root, Some(channel.id), None) + .await + .expect("insert root event"); + + // Depth-1 direct reply to the root (parent == root). + let child = make_stream_event(&author, "child"); + let child_created_at = event_created_at(&child); + insert_event_with_thread_metadata( + &pool, + community, + &child, + Some(channel.id), + Some(ThreadMetadataParams { + event_id: child.id.as_bytes(), + event_created_at: child_created_at, + channel_id: channel.id, + parent_event_id: Some(root.id.as_bytes()), + parent_event_created_at: Some(root_created_at), + root_event_id: Some(root.id.as_bytes()), + root_event_created_at: Some(root_created_at), + depth: 1, + broadcast: false, + }), + ) + .await + .expect("insert depth-1 child"); + + // Depth-2 grandchild: parent is the child, root is the root. This is the + // `root_id != parent_id` case that fires the nested root-stub branch. + let grandchild = make_stream_event(&author, "grandchild"); + let grandchild_created_at = event_created_at(&grandchild); + insert_event_with_thread_metadata( + &pool, + community, + &grandchild, + Some(channel.id), + Some(ThreadMetadataParams { + event_id: grandchild.id.as_bytes(), + event_created_at: grandchild_created_at, + channel_id: channel.id, + parent_event_id: Some(child.id.as_bytes()), + parent_event_created_at: Some(child_created_at), + root_event_id: Some(root.id.as_bytes()), + root_event_created_at: Some(root_created_at), + depth: 2, + broadcast: false, + }), + ) + .await + .expect("insert depth-2 grandchild"); + + // Read the whole subtree under the root. + let replies = get_thread_replies(&pool, community, root.id.as_bytes(), Some(64), 100, None) + .await + .expect("fetch subtree"); + + let by_id: std::collections::HashMap, i32> = replies + .iter() + .map(|r| (r.event_id.clone(), r.depth)) + .collect(); + assert_eq!( + replies.len(), + 2, + "both the child and grandchild must be reached" + ); + assert_eq!( + by_id.get(child.id.as_bytes().as_slice()), + Some(&1), + "depth-1 child must be present at depth 1" + ); + assert_eq!( + by_id.get(grandchild.id.as_bytes().as_slice()), + Some(&2), + "depth-2 grandchild must be reached (subtree read must not stop at depth 1)" + ); + } + + /// Direct guard on [`insert_thread_metadata`]'s nested root-stub INSERT. The + /// column list once omitted `community_id` while still binding it, scrambling + /// every placeholder (the bind for `community_id` landed on `event_created_at` + /// etc.), so any nested reply (`root_id != parent_id`) whose root lacked a + /// metadata row failed the whole insert. The production ingest path uses + /// `insert_event_with_thread_metadata` (event.rs), whose copy was already + /// correct, which is why no test caught this. Pin the standalone function so + /// the scrambled SQL can't return. + #[tokio::test] + #[ignore = "requires Postgres"] + async fn insert_thread_metadata_nested_reply_creates_root_stub() { + let pool = setup_pool().await; + let author = Keys::generate(); + let (channel, community) = create_test_channel( + &pool, + &format!("thread-stub-{}", Uuid::new_v4()), + ChannelType::Stream, + ChannelVisibility::Open, + None, + author.public_key().to_bytes().as_slice(), + None, + ) + .await + .expect("create channel"); + + // Insert the events themselves (no thread metadata yet) so the FK/row + // exists; then drive `insert_thread_metadata` directly for a depth-2 + // reply whose root has no metadata row — the root-stub branch. + let root = make_stream_event(&author, "root"); + let child = make_stream_event(&author, "child"); + let grandchild = make_stream_event(&author, "grandchild"); + for ev in [&root, &child, &grandchild] { + insert_event_with_thread_metadata(&pool, community, ev, Some(channel.id), None) + .await + .expect("insert event"); + } + + // Depth-2 reply where root_id != parent_id. Before the fix this errored + // inside the transaction (UUID bound to a TIMESTAMPTZ placeholder). + insert_thread_metadata( + &pool, + community, + grandchild.id.as_bytes(), + event_created_at(&grandchild), + channel.id, + Some(child.id.as_bytes()), + Some(event_created_at(&child)), + Some(root.id.as_bytes()), + Some(event_created_at(&root)), + 2, + false, + ) + .await + .expect("nested insert must succeed and create the root stub"); + + // The root stub must now exist and be readable. + let replies = get_thread_replies(&pool, community, root.id.as_bytes(), Some(64), 100, None) + .await + .expect("fetch subtree"); + assert!( + replies + .iter() + .any(|r| r.event_id == grandchild.id.as_bytes()), + "grandchild must be reachable under the root after the nested insert" + ); + } + /// A reply whose stored row can no longer be reconstructed into a /// `nostr::Event` (e.g. corrupt signature from out-of-band storage damage) /// must be skipped, with the rest of the thread still returned — not diff --git a/desktop/scripts/check-file-sizes.mjs b/desktop/scripts/check-file-sizes.mjs index fa975fec5..32377d2fc 100644 --- a/desktop/scripts/check-file-sizes.mjs +++ b/desktop/scripts/check-file-sizes.mjs @@ -61,6 +61,12 @@ const overrides = new Map([ // commands add ~40 lines. Queued to split. // branch cut; override bumped to cover the merged total. Queued to split. ["src-tauri/src/commands/agents.rs", 1437], + // #1418 read-path fix: get_thread_replies' blocker fix (shared TIMELINE_KINDS + // const + build_thread_replies_filter helper, mirroring the channel sibling so + // the two p-gate filters can't drift) plus two guard unit tests. The file was + // already at 995; this load-bearing correctness fix crossed 1000. Not generic + // debt growth. Approved override; queued to split with the rest of this list. + ["src-tauri/src/commands/messages.rs", 1082], // Residual repos_dir integration in ensure_nest_at: REPOS is provisioned // outside NEST_DIRS (it may be a symlink), so it needs its own create + // chmod-only-when-real-dir handling plus integration test coverage. The @@ -85,7 +91,10 @@ const overrides = new Map([ // queued to split. Read-path lanes 1+2 add server-side fetch bindings // (getThreadReplies + getChannelMessagesBefore) and paged people-search // reachability — load-bearing reachability plumbing, not generic debt. - ["src/shared/api/tauri.ts", 1337], + // #1418 read-path fix: +3 doc-only lines correcting the getThreadReplies + // contract (replies-only, root excluded — the query keys on root_event_id, + // which root rows lack). Documentation accuracy, not code growth. + ["src/shared/api/tauri.ts", 1340], // harness-persona-sync feature growth, queued to split in the resolver-unify // refactor followup. discovery.rs is dominated by the new test module // (the effective_agent_command / divergent / create-time override matrix); diff --git a/desktop/src-tauri/src/commands/messages.rs b/desktop/src-tauri/src/commands/messages.rs index 0084225e2..3a057eaa1 100644 --- a/desktop/src-tauri/src/commands/messages.rs +++ b/desktop/src-tauri/src/commands/messages.rs @@ -16,6 +16,27 @@ use crate::{ // ── Reads (pure-nostr) ────────────────────────────────────────────────────── +/// Timeline content kinds — the message/channel-event kinds that make up a +/// channel timeline and a thread's replies. Used to build relay `/query` +/// filters for the keyset readers below. None of these are in +/// `P_GATED_KINDS`, so a filter carrying them clears the bridge p-gate +/// (`p_gated_filters_authorized`) without a `#p` tag — load-bearing for the +/// thread-subtree read, whose relay routing keys off `#e`+`depth_limit` (not +/// kind) but still passes through the p-gate before it runs. +const TIMELINE_KINDS: [u32; 11] = [ + 9, + 40002, + 40008, + 40099, + 43001, + 43002, + 43003, + 43004, + 43005, + 43006, + buzz_core_pkg::kind::KIND_HUDDLE_STARTED, +]; + #[tauri::command] pub async fn get_feed( since: Option, @@ -211,7 +232,10 @@ pub async fn get_forum_thread( /// cache by grouping on `e`-root tags), this walks `thread_metadata` on the /// relay via `get_thread_replies`, so a thread renders complete even when its /// replies fell outside the channel cold-load window. Results are chronological -/// (oldest first) and include the root event itself (depth 0). +/// (oldest first) and are the *replies* under the root (depth >= 1); the root +/// event itself is NOT returned (the relay query keys on `root_event_id`, and a +/// root row has no `root_event_id`). Callers already hold the root — it is the +/// open thread head — so this closes the descendant gap without re-fetching it. /// /// Paging is forward keyset on `(created_at, event_id)`: pass the `next_cursor` /// from a previous page back as `cursor` to fetch the next batch. The event-id @@ -228,27 +252,13 @@ pub async fn get_thread_replies( state: State<'_, AppState>, ) -> Result { let cap = limit.unwrap_or(200).min(500); - // Bridge extension filter: `#e` root + `depth_limit` routes to - // `get_thread_replies` in the relay; `thread_cursor`(+`_id`) pages it forward. - let mut filter = serde_json::Map::new(); - filter.insert("#e".to_string(), serde_json::json!([root_event_id])); - // depth_limit is what activates the thread-subtree bridge path; default to - // a deep-but-bounded value so nested replies are not silently dropped. - filter.insert( - "depth_limit".to_string(), - serde_json::json!(depth_limit.unwrap_or(64)), + let filter = build_thread_replies_filter( + &root_event_id, + channel_id.as_deref(), + depth_limit.unwrap_or(64), + cap, + cursor.as_ref(), ); - filter.insert("limit".to_string(), serde_json::json!(cap)); - if let Some(cid) = channel_id.as_deref() { - filter.insert("#h".to_string(), serde_json::json!([cid])); - } - if let Some(c) = cursor.as_ref() { - filter.insert("thread_cursor".to_string(), serde_json::json!(c.created_at)); - filter.insert( - "thread_cursor_id".to_string(), - serde_json::json!(c.event_id), - ); - } let events = query_relay(&state, &[serde_json::Value::Object(filter)]).await?; @@ -275,6 +285,48 @@ pub async fn get_thread_replies( }) } +/// Build the relay `/query` filter for the server-side thread-subtree read. +/// +/// The relay routes a filter to `get_thread_replies` purely off a single `#e` +/// (root) tag plus `depth_limit` — kind is NOT part of that routing or the +/// underlying DB query (it keys on `root_event_id`). Yet `kinds` is still +/// required here: the bridge runs the p-gate (`p_gated_filters_authorized`) on +/// every filter *before* routing, and a kindless filter "could match" a p-gated +/// kind, so the gate demands a `#p` tag we don't send -> HTTP 403 +/// `restricted: p-gated kinds require #p tag`, before the thread query ever +/// runs. Carrying non-p-gated [`TIMELINE_KINDS`] makes the filter provably +/// un-p-gated so it clears the gate. `build_channel_messages_before_filter` is +/// the sibling that already does this, which is why the dense-second channel +/// pager was never gated and this reader was. Extracted so a unit test can pin +/// that `kinds` is present (the e2e mock does not model p-gating, so only a +/// unit test guards this contract). +fn build_thread_replies_filter( + root_event_id: &str, + channel_id: Option<&str>, + depth_limit: u32, + cap: u32, + cursor: Option<&crate::models::ThreadCursor>, +) -> serde_json::Map { + let mut filter = serde_json::Map::new(); + filter.insert("#e".to_string(), serde_json::json!([root_event_id])); + filter.insert("kinds".to_string(), serde_json::json!(TIMELINE_KINDS)); + // depth_limit is what activates the thread-subtree bridge path; the caller + // defaults it to a deep-but-bounded value so nested replies aren't dropped. + filter.insert("depth_limit".to_string(), serde_json::json!(depth_limit)); + filter.insert("limit".to_string(), serde_json::json!(cap)); + if let Some(cid) = channel_id { + filter.insert("#h".to_string(), serde_json::json!([cid])); + } + if let Some(c) = cursor { + filter.insert("thread_cursor".to_string(), serde_json::json!(c.created_at)); + filter.insert( + "thread_cursor_id".to_string(), + serde_json::json!(c.event_id), + ); + } + filter +} + /// Build the relay `/query` filter for one keyset page of top-level channel /// history strictly older than `(before, before_id)`. Extracted so a unit test /// can pin the tiebreak field: it MUST be `before_id` (what the relay's @@ -290,22 +342,7 @@ fn build_channel_messages_before_filter( // the relay's thread_metadata join for this channel scope. let mut filter = serde_json::Map::new(); filter.insert("#h".to_string(), serde_json::json!([channel_id])); - filter.insert( - "kinds".to_string(), - serde_json::json!([ - 9, - 40002, - 40008, - 40099, - 43001, - 43002, - 43003, - 43004, - 43005, - 43006, - buzz_core_pkg::kind::KIND_HUDDLE_STARTED - ]), - ); + filter.insert("kinds".to_string(), serde_json::json!(TIMELINE_KINDS)); filter.insert("until".to_string(), serde_json::json!(before)); filter.insert("limit".to_string(), serde_json::json!(cap)); // `before_id` is the bridge extension field for the composite tiebreak @@ -771,6 +808,55 @@ mod tests { ); } + #[test] + fn thread_replies_filter_carries_non_p_gated_kinds_to_clear_the_gate() { + // The relay bridge p-gates EVERY filter before routing + // (`p_gated_filters_authorized`): a kindless filter "could match" a + // p-gated kind, so it demands a `#p` tag we don't send -> HTTP 403, + // before the thread-subtree query runs. The headline Lane-1 fix + // (`useThreadReplies` closing the descendant gap) then fails on every + // call against a real relay. So the thread filter MUST carry `kinds`, + // and every kind MUST be non-p-gated (else the gate still fires). The + // Playwright mock does not model p-gating, so this unit test is the + // only guard against the client/relay auth contract drifting. + let filter = build_thread_replies_filter("root-hex", Some("channel-1"), 64, 200, None); + + let kinds = filter + .get("kinds") + .and_then(|v| v.as_array()) + .expect("thread filter must carry `kinds` so the p-gate passes"); + assert!(!kinds.is_empty(), "kinds must be non-empty"); + for kind in kinds { + let k = kind.as_u64().expect("kind is a number") as u32; + assert!( + !buzz_core_pkg::kind::P_GATED_KINDS.contains(&k), + "kind {k} is p-gated; a p-gated kind in the filter re-triggers the \ + 403 that this fix exists to prevent" + ); + } + assert_eq!(filter["#e"], serde_json::json!(["root-hex"])); + assert_eq!(filter["depth_limit"], serde_json::json!(64)); + assert_eq!(filter["#h"], serde_json::json!(["channel-1"])); + } + + #[test] + fn thread_replies_filter_pages_with_composite_cursor() { + // When a cursor is supplied, both the timestamp and the event-id + // tiebreak must be emitted (`thread_cursor` + `thread_cursor_id`), else + // paging degrades to timestamp-only and drops same-second replies. + let cursor = crate::models::ThreadCursor { + created_at: 1_700_000_000, + event_id: "abcd".to_string(), + }; + let filter = build_thread_replies_filter("root-hex", None, 64, 200, Some(&cursor)); + assert_eq!(filter["thread_cursor"], serde_json::json!(1_700_000_000)); + assert_eq!(filter["thread_cursor_id"], serde_json::json!("abcd")); + assert!( + !filter.contains_key("#h"), + "no channel_id -> no #h scope in the filter" + ); + } + #[test] fn stored_managed_agent_auth_tag_trims_blank_values() { assert_eq!( diff --git a/desktop/src/features/messages/useThreadReplies.ts b/desktop/src/features/messages/useThreadReplies.ts index 3b2cc8a83..ede6ef1dc 100644 --- a/desktop/src/features/messages/useThreadReplies.ts +++ b/desktop/src/features/messages/useThreadReplies.ts @@ -34,24 +34,25 @@ export function useThreadReplies( openThreadRootId: string | null, ) { const queryClient = useQueryClient(); + const activeChannelId = activeChannel?.id ?? null; + const activeChannelType = activeChannel?.channelType ?? null; // Track which roots we've already fetched per channel so re-opening a thread // (or a re-render) doesn't re-page the whole subtree every time. const fetchedRootsRef = React.useRef>(new Set()); const previousChannelIdRef = React.useRef(null); React.useEffect(() => { - const activeChannelId = activeChannel?.id ?? null; if (previousChannelIdRef.current === activeChannelId) { return; } previousChannelIdRef.current = activeChannelId; fetchedRootsRef.current.clear(); - }, [activeChannel?.id]); + }, [activeChannelId]); React.useEffect(() => { if ( - !activeChannel || - activeChannel.channelType === "forum" || + !activeChannelId || + activeChannelType === "forum" || !openThreadRootId ) { return; @@ -61,9 +62,10 @@ export function useThreadReplies( } fetchedRootsRef.current.add(openThreadRootId); - const channelId = activeChannel.id; + const channelId = activeChannelId; const rootId = openThreadRootId; let isCancelled = false; + let completed = false; void (async () => { let cursor: ThreadCursor | null = null; @@ -85,6 +87,7 @@ export function useThreadReplies( } if (!response.nextCursor) { + completed = true; break; } cursor = response.nextCursor; @@ -98,6 +101,9 @@ export function useThreadReplies( return () => { isCancelled = true; + if (!completed) { + fetchedRootsRef.current.delete(rootId); + } }; - }, [activeChannel, openThreadRootId, queryClient]); + }, [activeChannelId, activeChannelType, openThreadRootId, queryClient]); } diff --git a/desktop/src/shared/api/tauri.ts b/desktop/src/shared/api/tauri.ts index 674a0b066..d7128eebe 100644 --- a/desktop/src/shared/api/tauri.ts +++ b/desktop/src/shared/api/tauri.ts @@ -738,7 +738,10 @@ type RawThreadRepliesResponse = { * Paging is forward keyset on `(createdAt, eventId)`: pass the returned * `nextCursor` back as `cursor` for the next page. `nextCursor` is non-null only * when a full page was returned. The returned `events` are raw nostr events - * (`RelayEvent`), chronological (oldest first), including the root at depth 0. + * (`RelayEvent`), chronological (oldest first). These are the *replies* under + * the root (depth >= 1); the root event itself is NOT returned (the relay query + * keys on `root_event_id`, which a root row lacks). The caller already holds + * the root — it is the open thread head. */ export async function getThreadReplies( rootEventId: string, diff --git a/desktop/src/shared/api/types.ts b/desktop/src/shared/api/types.ts index d9748cd24..c5e4b9e80 100644 --- a/desktop/src/shared/api/types.ts +++ b/desktop/src/shared/api/types.ts @@ -816,7 +816,7 @@ export type ThreadCursor = { }; export type ThreadRepliesResponse = { - /** The reply subtree (chronological, oldest first), including the root at depth 0. */ + /** The reply subtree (chronological, oldest first), depth >= 1. Excludes the root event (relay keys on `root_event_id`, which a root row lacks); the caller already holds the root. */ events: RelayEvent[]; /** Present only when a full page was returned — pass back to fetch the next page. */ nextCursor: ThreadCursor | null; diff --git a/desktop/src/testing/e2eBridge.ts b/desktop/src/testing/e2eBridge.ts index 8cc5de30b..36d9a1046 100644 --- a/desktop/src/testing/e2eBridge.ts +++ b/desktop/src/testing/e2eBridge.ts @@ -13,8 +13,12 @@ import { KIND_EMOJI_SET, } from "@/shared/api/customEmoji"; import { + KIND_AGENT_OBSERVER_FRAME, KIND_DM_VISIBILITY, KIND_EVENT_REMINDER, + KIND_HUDDLE_STARTED, + KIND_MEMBER_ADDED_NOTIFICATION, + KIND_MEMBER_REMOVED_NOTIFICATION, KIND_STREAM_MESSAGE_EDIT, KIND_SYSTEM_MESSAGE, KIND_USER_STATUS, @@ -497,7 +501,9 @@ type MockFilter = { "#d"?: string[]; "#e"?: string[]; "#h"?: string[]; + "#p"?: string[]; authors?: string[]; + ids?: string[]; kinds?: number[]; limit?: number; since?: number; @@ -3312,7 +3318,7 @@ type RawThreadRepliesResponse = { /** * Mirror of the desktop `get_thread_replies` command: return the full reply - * subtree under a root, chronological (oldest first) including the root itself, + * subtree under a root, chronological (oldest first), excluding the root itself, * with gap-free `(created_at, event_id)` keyset paging. * * The event-id tiebreak is load-bearing — same-second replies must all page @@ -3330,7 +3336,28 @@ async function handleGetThreadReplies( config: E2eConfig | undefined, ): Promise { const cap = Math.min(args.limit ?? 200, 500); + const filter: MockFilter & Record = { + "#e": [args.rootEventId], + depth_limit: args.depthLimit ?? 64, + kinds: [...TIMELINE_KINDS], + limit: cap, + }; + if (args.channelId) { + filter["#h"] = [args.channelId]; + } + if (args.cursor) { + filter.thread_cursor = args.cursor.created_at; + filter.thread_cursor_id = args.cursor.event_id; + } const identity = getIdentity(config); + if ( + !isPGatedFilterAuthorized( + filter, + identity?.pubkey ?? getMockMemberPubkey(config), + ) + ) { + throw new Error(P_GATED_REJECTION_MESSAGE); + } let subtree: RelayEvent[]; if (!identity) { @@ -3340,36 +3367,36 @@ async function handleGetThreadReplies( ? getMockMessageStore(args.channelId) : Array.from(mockMessages.values()).flat(); const byId = new Map(events.map((event) => [event.id, event])); - const included = new Set(); const root = byId.get(args.rootEventId); - const collected: RelayEvent[] = root ? [root] : []; - if (root) { - included.add(root.id); - } - for (const event of events) { - const ref = getThreadReferenceFromTags(event.tags); - const belongsToThread = - (ref.rootEventId ?? ref.parentEventId) === args.rootEventId; - if (belongsToThread && !included.has(event.id)) { - included.add(event.id); - collected.push(event); + const collected: RelayEvent[] = []; + const included = new Set(); + if (!root) { + subtree = collected; + } else { + const frontier = new Set([root.id]); + for (;;) { + let added = false; + for (const event of events) { + if (included.has(event.id)) { + continue; + } + const ref = getThreadReferenceFromTags(event.tags); + if (!ref.parentEventId || !frontier.has(ref.parentEventId)) { + continue; + } + included.add(event.id); + collected.push(event); + frontier.add(event.id); + added = true; + } + if (!added) { + break; + } } + subtree = collected; } - subtree = collected; } else { // Config mode: exercise the real bridge thread path over /query. - const filter: Record = { - "#e": [args.rootEventId], - depth_limit: args.depthLimit ?? 64, - limit: cap, - }; - if (args.channelId) { - filter["#h"] = [args.channelId]; - } - if (args.cursor) { - filter.thread_cursor = args.cursor.created_at; - filter.thread_cursor_id = args.cursor.event_id; - } const events = await relayQuery(config, [filter]); const nextCursor = events.length >= cap @@ -3413,8 +3440,67 @@ async function handleGetThreadReplies( } const TIMELINE_KINDS = new Set([ - 9, 40002, 40008, 40099, 43001, 43002, 43003, 43004, 43005, 43006, + 9, + 40002, + 40008, + 40099, + 43001, + 43002, + 43003, + 43004, + 43005, + 43006, + KIND_HUDDLE_STARTED, +]); + +const KIND_GIFT_WRAP = 1059; +const P_GATED_KINDS = new Set([ + KIND_AGENT_OBSERVER_FRAME, + KIND_MEMBER_ADDED_NOTIFICATION, + KIND_MEMBER_REMOVED_NOTIFICATION, + KIND_GIFT_WRAP, + KIND_DM_VISIBILITY, ]); +const P_GATED_REJECTION_MESSAGE = + "restricted: p-gated kinds require #p tag matching your pubkey"; + +function filterKinds(filter: { kinds?: unknown }): number[] | undefined { + return Array.isArray(filter.kinds) + ? filter.kinds.filter((kind): kind is number => typeof kind === "number") + : undefined; +} + +function filterCanMatchPGated(filter: { kinds?: unknown }) { + const kinds = filterKinds(filter); + return !kinds || kinds.some((kind) => P_GATED_KINDS.has(kind)); +} + +function filterHasOwnPTag(filter: { "#p"?: unknown }, pubkey: string) { + const values = filter["#p"]; + return ( + Array.isArray(values) && + values.length > 0 && + values.every((value) => value === pubkey) + ); +} + +function isPGatedFilterAuthorized( + filter: { "#p"?: unknown; ids?: unknown; kinds?: unknown }, + pubkey: string, +) { + if (!filterCanMatchPGated(filter)) { + return true; + } + + const kinds = filterKinds(filter); + const ids = filter.ids; + const explicitlyDmVisibility = kinds?.includes(KIND_DM_VISIBILITY); + if (!explicitlyDmVisibility && Array.isArray(ids) && ids.length > 0) { + return true; + } + + return filterHasOwnPTag(filter, pubkey); +} type RawChannelMessagesPageResponse = { events: RelayEvent[]; @@ -3766,6 +3852,14 @@ async function relayQuery( filters: Array>, ): Promise { const identity = getRelayIdentity(config); + if ( + !filters.every((filter) => + isPGatedFilterAuthorized(filter, identity.pubkey), + ) + ) { + throw new Error(P_GATED_REJECTION_MESSAGE); + } + const response = await fetch(`${getRelayHttpUrl(config)}/query`, { method: "POST", headers: { @@ -6808,13 +6902,21 @@ function sendToMockSocket(args: { if (type === "REQ") { const subId = rest[0] as string; + const filters = rest.slice(1) as MockFilter[]; + if ( + !filters.every((filter) => + isPGatedFilterAuthorized(filter, MOCK_IDENTITY_PUBKEY), + ) + ) { + sendWsText(socket.handler, ["CLOSED", subId, P_GATED_REJECTION_MESSAGE]); + return; + } if (subId.startsWith("live-")) { // Collect channel IDs from all filters in the REQ const channelIds = new Set(); const kinds = new Set(); - for (let i = 1; i < rest.length; i++) { - const f = rest[i] as { "#h"?: string[]; kinds?: number[] }; + for (const f of filters) { const cid = f["#h"]?.[0]; if (cid) channelIds.add(cid); for (const kind of f.kinds ?? []) {