diff --git a/desktop/src/features/messages/lib/messageQueryKeys.ts b/desktop/src/features/messages/lib/messageQueryKeys.ts index 1d46ec4e8..29864f5de 100644 --- a/desktop/src/features/messages/lib/messageQueryKeys.ts +++ b/desktop/src/features/messages/lib/messageQueryKeys.ts @@ -50,7 +50,7 @@ export function sortMessages(messages: RelayEvent[]) { }); } -function isTimelineWindowContentEvent(event: RelayEvent) { +export function isTimelineWindowContentEvent(event: RelayEvent) { return ( event.kind === KIND_STREAM_MESSAGE || event.kind === KIND_STREAM_MESSAGE_V2 || @@ -66,6 +66,31 @@ function isTimelineWindowContentEvent(event: RelayEvent) { ); } +/** + * The oldest timestamp the older-history pager may anchor its `until` cursor + * on: the oldest cached content event that arrived through contiguous history + * loading. Events merged out-of-band (`nonContiguous`: thread ancestors, + * thread-panel subtrees) are skipped — anchoring on such an "island" pages + * from the island backward and permanently skips the unloaded gap between the + * island and the contiguous window (the June-14 → June-9 scrollback hole). + * Auxiliary events are skipped too: they are always newer than the content + * they reference, so an aux event older than every contiguous content event + * can only reference an island — making it an island itself. + * + * Assumes `events` is sorted ascending (every cache write stores sorted). + * Returns null when no contiguously loaded content event is cached. + */ +export function oldestContiguousHistoryTimestamp( + events: RelayEvent[], +): number | null { + for (const event of events) { + if (!event.nonContiguous && isTimelineWindowContentEvent(event)) { + return event.created_at; + } + } + return null; +} + function capNewestTimelineMessages(normalized: RelayEvent[]) { const contentEvents = normalized.filter(isTimelineWindowContentEvent); @@ -114,3 +139,27 @@ export function mergeTimelineHistoryMessages( ) { return sortMessages([...current, ...history]); } + +/** + * Merge events fetched out-of-band — by id (missing thread ancestors) or by + * thread reference (thread-panel subtrees) — into the timeline cache, marked + * `nonContiguous` so {@link oldestContiguousHistoryTimestamp} never anchors + * the older-history pager on them. Events already in the cache are left + * untouched: a copy that arrived contiguously must not be downgraded to an + * island. The mark heals itself — when contiguous paging reaches an island, + * the history merge's last-copy-wins dedupe replaces the flagged copy with + * the unflagged one. + */ +export function mergeNonContiguousTimelineMessages( + current: RelayEvent[], + events: RelayEvent[], +) { + const knownIds = new Set(current.map((event) => event.id)); + const additions = events + .filter((event) => !knownIds.has(event.id)) + .map((event) => ({ ...event, nonContiguous: true })); + if (additions.length === 0) { + return current; + } + return sortMessages([...current, ...additions]); +} diff --git a/desktop/src/features/messages/lib/pageOlderMessages.test.mjs b/desktop/src/features/messages/lib/pageOlderMessages.test.mjs new file mode 100644 index 000000000..ee0e1775c --- /dev/null +++ b/desktop/src/features/messages/lib/pageOlderMessages.test.mjs @@ -0,0 +1,219 @@ +import assert from "node:assert/strict"; +import test from "node:test"; + +import { + maxEventIdAtSecond, + pageOlderMessagesUntilRowFloor, +} from "./pageOlderMessages.ts"; +import { + channelMessagesKey, + mergeNonContiguousTimelineMessages, + mergeTimelineHistoryMessages, + oldestContiguousHistoryTimestamp, + sortMessages, +} from "./messageQueryKeys.ts"; +import { relayClient } from "@/shared/api/relayClient"; + +const PUBKEY = "a".repeat(64); + +function event({ id, kind = 9, createdAt, channelId, tags }) { + return { + id, + pubkey: PUBKEY, + created_at: createdAt, + kind, + tags: tags ?? [["h", channelId]], + content: "", + sig: "mocksig".repeat(20).slice(0, 128), + }; +} + +function id(prefix, index) { + return `${prefix}${String(index).padStart(64 - prefix.length, "0")}`; +} + +function makeQueryClientStub(queryKey, initialEvents) { + const store = new Map([[JSON.stringify(queryKey), initialEvents]]); + return { + getQueryData(key) { + return store.get(JSON.stringify(key)); + }, + setQueryData(key, updater) { + const k = JSON.stringify(key); + const next = + typeof updater === "function" ? updater(store.get(k) ?? []) : updater; + store.set(k, next); + return next; + }, + }; +} + +/** + * Relay double: serves WS-style `until` pages over a fixed ascending dataset, + * newest `limit` events strictly older than `before`. + */ +function serveHistoryFrom(dataset) { + return async (_channelId, before, limit) => + dataset.filter((e) => e.created_at < before).slice(-limit); +} + +test("ancestor island does not poison the older-history cursor (June 14 → June 9 skip)", async (t) => { + const channelId = "island-cursor-regression"; + // Relay holds a contiguous channel history: 400 events spanning the "gap + // days" (t=5000..5399), then the newest window (t=10000..10059). + const gapDays = []; + for (let index = 0; index < 400; index += 1) { + gapDays.push( + event({ id: id("gap", index), createdAt: 5_000 + index, channelId }), + ); + } + const newestWindow = []; + for (let index = 0; index < 60; index += 1) { + newestWindow.push( + event({ id: id("new", index), createdAt: 10_000 + index, channelId }), + ); + } + const islandRoot = event({ id: id("old", 0), createdAt: 1_000, channelId }); + const relayDataset = sortMessages([...gapDays, ...newestWindow, islandRoot]); + + // Cache shape at the moment of the bug: contiguous newest window plus one + // much older thread root injected out-of-band by useLoadMissingAncestors. + const cache = mergeNonContiguousTimelineMessages(newestWindow, [islandRoot]); + const queryKey = channelMessagesKey(channelId); + const queryClient = makeQueryClientStub(queryKey, cache); + + const originalFetch = relayClient.fetchChannelHistoryBefore; + relayClient.fetchChannelHistoryBefore = serveHistoryFrom(relayDataset); + t.after(() => { + relayClient.fetchChannelHistoryBefore = originalFetch; + }); + + await pageOlderMessagesUntilRowFloor(queryClient, channelId, () => true); + + const merged = queryClient.getQueryData(queryKey); + const gapLoaded = merged.filter((e) => e.id.startsWith("gap")).length; + // The pager must page from the contiguous frontier (t=10000) into the gap + // days — not from the island (t=1000), which skips them forever. + assert.ok( + gapLoaded > 0, + "pager anchored on the out-of-band island and skipped the gap days", + ); +}); + +test("contiguous paging heals the island: re-fetched copy loses the mark and anchors later passes", async (t) => { + const channelId = "island-heal-regression"; + const older = []; + for (let index = 0; index < 10; index += 1) { + older.push( + event({ id: id("old", index), createdAt: 1_000 + index, channelId }), + ); + } + const newestWindow = []; + for (let index = 0; index < 60; index += 1) { + newestWindow.push( + event({ id: id("new", index), createdAt: 10_000 + index, channelId }), + ); + } + const relayDataset = sortMessages([...older, ...newestWindow]); + + // older[5] was injected out-of-band; the rest of `older` is unloaded. + const cache = mergeNonContiguousTimelineMessages(newestWindow, [older[5]]); + const queryKey = channelMessagesKey(channelId); + const queryClient = makeQueryClientStub(queryKey, cache); + + const originalFetch = relayClient.fetchChannelHistoryBefore; + relayClient.fetchChannelHistoryBefore = serveHistoryFrom(relayDataset); + t.after(() => { + relayClient.fetchChannelHistoryBefore = originalFetch; + }); + + await pageOlderMessagesUntilRowFloor(queryClient, channelId, () => true); + + const merged = queryClient.getQueryData(queryKey); + assert.equal(merged.filter((e) => e.id.startsWith("old")).length, 10); + const healed = merged.find((e) => e.id === older[5].id); + assert.ok( + !healed.nonContiguous, + "contiguous re-fetch must clear the nonContiguous mark (last-copy-wins)", + ); + // With the island healed, the frontier is now the true oldest event. + assert.equal(oldestContiguousHistoryTimestamp(merged), 1_000); +}); + +test("frontier falls back to the oldest cached event when nothing is contiguous", async () => { + const events = [ + { + ...event({ id: id("iso", 0), createdAt: 500, channelId: "x" }), + nonContiguous: true, + }, + ]; + assert.equal(oldestContiguousHistoryTimestamp(events), null); + // mergeTimelineHistoryMessages keeps the incoming (unflagged) copy on collision. + const healed = mergeTimelineHistoryMessages(events, [ + event({ id: id("iso", 0), createdAt: 500, channelId: "x" }), + ]); + assert.equal(oldestContiguousHistoryTimestamp(healed), 500); +}); + +test("late island response never downgrades a contiguous copy (downgrade race)", () => { + const channelId = "island-downgrade-race"; + // The event was missing when the ancestor fetch started, but a contiguous + // history page fetched it (unmarked) before the island response landed. + const contiguousCopy = event({ + id: id("race", 0), + createdAt: 1_000, + channelId, + }); + const cache = mergeTimelineHistoryMessages( + [], + [contiguousCopy, event({ id: id("new", 0), createdAt: 10_000, channelId })], + ); + + // Late ancestor/thread response for the same id must be a no-op — marking + // it here would re-poison the frontier that contiguous paging just fixed. + const merged = mergeNonContiguousTimelineMessages(cache, [ + { ...contiguousCopy }, + ]); + + const kept = merged.find((e) => e.id === contiguousCopy.id); + assert.ok(!kept.nonContiguous, "island merge downgraded a contiguous copy"); + assert.equal(oldestContiguousHistoryTimestamp(merged), 1_000); + // And the reverse direction: thread replies already contiguous stay put. + assert.equal(merged.length, cache.length); +}); + +test("dense-second seed ignores aux and island ids at the boundary second", () => { + const channelId = "dense-second-seed"; + const second = 7_000; + // Two content events already held at the stalled second... + const contentA = event({ id: id("caa", 0), createdAt: second, channelId }); + const contentB = event({ id: id("cbb", 0), createdAt: second, channelId }); + // ...an unmarked reaction whose id sorts after both (aux kinds are absent + // from the bridge keyset pages, so its id proves nothing about content)... + const auxLater = event({ + id: id("zaux", 0), + kind: 7, + createdAt: second, + channelId, + tags: [ + ["h", channelId], + ["e", contentA.id], + ], + }); + // ...and a marked island whose id sorts after everything. + const islandLater = { + ...event({ id: id("zisl", 0), createdAt: second, channelId }), + nonContiguous: true, + }; + + const seed = maxEventIdAtSecond( + [contentA, contentB, auxLater, islandLater], + second, + ); + + // Seeding from the aux/island id would tell the relay "everything up to + // this id is held" and skip unseen content rows in the same second. + assert.equal(seed, contentB.id); + // No contiguous content at the second -> no fabricated cursor. + assert.equal(maxEventIdAtSecond([auxLater, islandLater], second), null); +}); diff --git a/desktop/src/features/messages/lib/pageOlderMessages.ts b/desktop/src/features/messages/lib/pageOlderMessages.ts index dd639b1e9..66dd5122e 100644 --- a/desktop/src/features/messages/lib/pageOlderMessages.ts +++ b/desktop/src/features/messages/lib/pageOlderMessages.ts @@ -4,7 +4,9 @@ import { countTopLevelTimelineRows } from "@/features/messages/lib/formatTimelin import { backfillAuxForMessages } from "@/features/messages/lib/auxBackfill"; import { channelMessagesKey, + isTimelineWindowContentEvent, mergeTimelineHistoryMessages, + oldestContiguousHistoryTimestamp, } from "@/features/messages/lib/messageQueryKeys"; import { relayClient } from "@/shared/api/relayClient"; import { getChannelMessagesBefore } from "@/shared/api/tauri"; @@ -45,14 +47,27 @@ const inFlightPasses = new Map>(); * **max** event id among all cached/fetched events at `created_at === second`. * Seeding from the min id (e.g. `baseline[0].id`) would re-request rows already * held; seeding from the max id asks the relay for the strictly-unreached tail. + * Non-contiguous events are excluded: an out-of-band island at the boundary + * second is a *point*, not proof that everything up to its id is held — + * seeding past it would skip the unfetched rows before it. The island itself + * just dedupes away when re-fetched. Auxiliary events are excluded for the + * same reason: the bridge keyset pages timeline-content kinds only, so an aux + * id at the boundary second says nothing about which *content* rows are held + * — seeding from a later aux id would skip unseen content in that second. + * + * Exported for tests only — the pager is the sole runtime caller. */ -function maxEventIdAtSecond( +export function maxEventIdAtSecond( events: RelayEvent[], second: number, ): string | null { let maxId: string | null = null; for (const event of events) { - if (event.created_at !== second) { + if ( + event.created_at !== second || + event.nonContiguous || + !isTimelineWindowContentEvent(event) + ) { continue; } if (maxId === null || event.id > maxId) { @@ -201,7 +216,14 @@ async function runPageOlderPass( // visible rows, so the user sees the loader dribble messages in 1-5 at a // time. One commit = one bounded growth step. const fetched: RelayEvent[] = []; - let oldestTimestamp = baseline[0].created_at; + // Anchor the cursor on the oldest *contiguously loaded* event, not the + // oldest event in cache: out-of-band merges (thread ancestors, thread-panel + // subtrees) plant isolated older "islands", and anchoring on an island pages + // backward from it — permanently skipping the unloaded history between the + // island and the contiguous window. Islands dedupe away (and lose their + // mark) once contiguous paging reaches them. + let oldestTimestamp = + oldestContiguousHistoryTimestamp(baseline) ?? baseline[0].created_at; while (hasOlderMessages && shouldContinue()) { const olderMessages = await relayClient.fetchChannelHistoryBefore( diff --git a/desktop/src/features/messages/useLoadMissingAncestors.ts b/desktop/src/features/messages/useLoadMissingAncestors.ts index 7928d4632..e69c38471 100644 --- a/desktop/src/features/messages/useLoadMissingAncestors.ts +++ b/desktop/src/features/messages/useLoadMissingAncestors.ts @@ -1,8 +1,10 @@ import * as React from "react"; import { useQueryClient } from "@tanstack/react-query"; -import { channelMessagesKey } from "@/features/messages/lib/messageQueryKeys"; -import { mergeMessages } from "@/features/messages/hooks"; +import { + channelMessagesKey, + mergeNonContiguousTimelineMessages, +} from "@/features/messages/lib/messageQueryKeys"; import { getChannelIdFromTags, getThreadReference, @@ -91,7 +93,12 @@ export function useLoadMissingAncestors( queryClient.setQueryData( channelMessagesKey(activeChannel.id), - (current = []) => mergeMessages(current, event), + // Non-contiguous merge: an ancestor fetched by id is typically far + // older than the loaded window, and anchoring the older-history + // pager on it would skip everything in between (see + // mergeNonContiguousTimelineMessages). + (current = []) => + mergeNonContiguousTimelineMessages(current, [event]), ); } catch (error) { console.error("Failed to load ancestor event", eventId, error); diff --git a/desktop/src/features/messages/useThreadReplies.ts b/desktop/src/features/messages/useThreadReplies.ts index ede6ef1dc..f803d327d 100644 --- a/desktop/src/features/messages/useThreadReplies.ts +++ b/desktop/src/features/messages/useThreadReplies.ts @@ -1,8 +1,10 @@ import * as React from "react"; import { useQueryClient } from "@tanstack/react-query"; -import { channelMessagesKey } from "@/features/messages/lib/messageQueryKeys"; -import { mergeMessages } from "@/features/messages/hooks"; +import { + channelMessagesKey, + mergeNonContiguousTimelineMessages, +} from "@/features/messages/lib/messageQueryKeys"; import { getThreadReplies } from "@/shared/api/tauri"; import type { Channel, RelayEvent, ThreadCursor } from "@/shared/api/types"; @@ -82,7 +84,12 @@ export function useThreadReplies( if (response.events.length > 0) { queryClient.setQueryData( channelMessagesKey(channelId), - (current = []) => response.events.reduce(mergeMessages, current), + // Non-contiguous merge: a deep/old thread's replies can predate + // the loaded window; anchoring the older-history pager on them + // would skip everything in between (see + // mergeNonContiguousTimelineMessages). + (current = []) => + mergeNonContiguousTimelineMessages(current, response.events), ); } diff --git a/desktop/src/shared/api/types.ts b/desktop/src/shared/api/types.ts index c5e4b9e80..8d59a6ed7 100644 --- a/desktop/src/shared/api/types.ts +++ b/desktop/src/shared/api/types.ts @@ -167,6 +167,14 @@ export type RelayEvent = { id: string; /** Local-only render identity for optimistic events that are later acknowledged. */ localKey?: string; + /** + * Local-only: merged out-of-band (fetched by id or thread reference, e.g. a + * missing thread ancestor), so the history between it and the contiguously + * loaded window may be unloaded. The older-history pager must not anchor its + * cursor on such an event; the flag clears when a contiguous history page + * re-fetches it (history merges keep the incoming copy on id collision). + */ + nonContiguous?: boolean; pubkey: string; created_at: number; kind: number;