Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion desktop/src/features/messages/lib/messageQueryKeys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export function sortMessages(messages: RelayEvent[]) {
});
}

function isTimelineWindowContentEvent(event: RelayEvent) {
export function isTimelineWindowContentEvent(event: RelayEvent) {
return (
event.kind === KIND_STREAM_MESSAGE ||
event.kind === KIND_STREAM_MESSAGE_V2 ||
Expand All @@ -66,6 +66,31 @@ function isTimelineWindowContentEvent(event: RelayEvent) {
);
}

/**
* The oldest timestamp the older-history pager may anchor its `until` cursor
* on: the oldest cached content event that arrived through contiguous history
* loading. Events merged out-of-band (`nonContiguous`: thread ancestors,
* thread-panel subtrees) are skipped — anchoring on such an "island" pages
* from the island backward and permanently skips the unloaded gap between the
* island and the contiguous window (the June-14 → June-9 scrollback hole).
* Auxiliary events are skipped too: they are always newer than the content
* they reference, so an aux event older than every contiguous content event
* can only reference an island — making it an island itself.
*
* Assumes `events` is sorted ascending (every cache write stores sorted).
* Returns null when no contiguously loaded content event is cached.
*/
export function oldestContiguousHistoryTimestamp(
events: RelayEvent[],
): number | null {
for (const event of events) {
if (!event.nonContiguous && isTimelineWindowContentEvent(event)) {
return event.created_at;
}
}
return null;
}

function capNewestTimelineMessages(normalized: RelayEvent[]) {
const contentEvents = normalized.filter(isTimelineWindowContentEvent);

Expand Down Expand Up @@ -114,3 +139,27 @@ export function mergeTimelineHistoryMessages(
) {
return sortMessages([...current, ...history]);
}

/**
* Merge events fetched out-of-band — by id (missing thread ancestors) or by
* thread reference (thread-panel subtrees) — into the timeline cache, marked
* `nonContiguous` so {@link oldestContiguousHistoryTimestamp} never anchors
* the older-history pager on them. Events already in the cache are left
* untouched: a copy that arrived contiguously must not be downgraded to an
* island. The mark heals itself — when contiguous paging reaches an island,
* the history merge's last-copy-wins dedupe replaces the flagged copy with
* the unflagged one.
*/
export function mergeNonContiguousTimelineMessages(
current: RelayEvent[],
events: RelayEvent[],
) {
const knownIds = new Set(current.map((event) => event.id));
const additions = events
.filter((event) => !knownIds.has(event.id))
.map((event) => ({ ...event, nonContiguous: true }));
if (additions.length === 0) {
return current;
}
return sortMessages([...current, ...additions]);
}
219 changes: 219 additions & 0 deletions desktop/src/features/messages/lib/pageOlderMessages.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import assert from "node:assert/strict";
import test from "node:test";

import {
maxEventIdAtSecond,
pageOlderMessagesUntilRowFloor,
} from "./pageOlderMessages.ts";
import {
channelMessagesKey,
mergeNonContiguousTimelineMessages,
mergeTimelineHistoryMessages,
oldestContiguousHistoryTimestamp,
sortMessages,
} from "./messageQueryKeys.ts";
import { relayClient } from "@/shared/api/relayClient";

const PUBKEY = "a".repeat(64);

function event({ id, kind = 9, createdAt, channelId, tags }) {
return {
id,
pubkey: PUBKEY,
created_at: createdAt,
kind,
tags: tags ?? [["h", channelId]],
content: "",
sig: "mocksig".repeat(20).slice(0, 128),
};
}

function id(prefix, index) {
return `${prefix}${String(index).padStart(64 - prefix.length, "0")}`;
}

function makeQueryClientStub(queryKey, initialEvents) {
const store = new Map([[JSON.stringify(queryKey), initialEvents]]);
return {
getQueryData(key) {
return store.get(JSON.stringify(key));
},
setQueryData(key, updater) {
const k = JSON.stringify(key);
const next =
typeof updater === "function" ? updater(store.get(k) ?? []) : updater;
store.set(k, next);
return next;
},
};
}

/**
* Relay double: serves WS-style `until` pages over a fixed ascending dataset,
* newest `limit` events strictly older than `before`.
*/
function serveHistoryFrom(dataset) {
return async (_channelId, before, limit) =>
dataset.filter((e) => e.created_at < before).slice(-limit);
}

test("ancestor island does not poison the older-history cursor (June 14 → June 9 skip)", async (t) => {
const channelId = "island-cursor-regression";
// Relay holds a contiguous channel history: 400 events spanning the "gap
// days" (t=5000..5399), then the newest window (t=10000..10059).
const gapDays = [];
for (let index = 0; index < 400; index += 1) {
gapDays.push(
event({ id: id("gap", index), createdAt: 5_000 + index, channelId }),
);
}
const newestWindow = [];
for (let index = 0; index < 60; index += 1) {
newestWindow.push(
event({ id: id("new", index), createdAt: 10_000 + index, channelId }),
);
}
const islandRoot = event({ id: id("old", 0), createdAt: 1_000, channelId });
const relayDataset = sortMessages([...gapDays, ...newestWindow, islandRoot]);

// Cache shape at the moment of the bug: contiguous newest window plus one
// much older thread root injected out-of-band by useLoadMissingAncestors.
const cache = mergeNonContiguousTimelineMessages(newestWindow, [islandRoot]);
const queryKey = channelMessagesKey(channelId);
const queryClient = makeQueryClientStub(queryKey, cache);

const originalFetch = relayClient.fetchChannelHistoryBefore;
relayClient.fetchChannelHistoryBefore = serveHistoryFrom(relayDataset);
t.after(() => {
relayClient.fetchChannelHistoryBefore = originalFetch;
});

await pageOlderMessagesUntilRowFloor(queryClient, channelId, () => true);

const merged = queryClient.getQueryData(queryKey);
const gapLoaded = merged.filter((e) => e.id.startsWith("gap")).length;
// The pager must page from the contiguous frontier (t=10000) into the gap
// days — not from the island (t=1000), which skips them forever.
assert.ok(
gapLoaded > 0,
"pager anchored on the out-of-band island and skipped the gap days",
);
});

test("contiguous paging heals the island: re-fetched copy loses the mark and anchors later passes", async (t) => {
const channelId = "island-heal-regression";
const older = [];
for (let index = 0; index < 10; index += 1) {
older.push(
event({ id: id("old", index), createdAt: 1_000 + index, channelId }),
);
}
const newestWindow = [];
for (let index = 0; index < 60; index += 1) {
newestWindow.push(
event({ id: id("new", index), createdAt: 10_000 + index, channelId }),
);
}
const relayDataset = sortMessages([...older, ...newestWindow]);

// older[5] was injected out-of-band; the rest of `older` is unloaded.
const cache = mergeNonContiguousTimelineMessages(newestWindow, [older[5]]);
const queryKey = channelMessagesKey(channelId);
const queryClient = makeQueryClientStub(queryKey, cache);

const originalFetch = relayClient.fetchChannelHistoryBefore;
relayClient.fetchChannelHistoryBefore = serveHistoryFrom(relayDataset);
t.after(() => {
relayClient.fetchChannelHistoryBefore = originalFetch;
});

await pageOlderMessagesUntilRowFloor(queryClient, channelId, () => true);

const merged = queryClient.getQueryData(queryKey);
assert.equal(merged.filter((e) => e.id.startsWith("old")).length, 10);
const healed = merged.find((e) => e.id === older[5].id);
assert.ok(
!healed.nonContiguous,
"contiguous re-fetch must clear the nonContiguous mark (last-copy-wins)",
);
// With the island healed, the frontier is now the true oldest event.
assert.equal(oldestContiguousHistoryTimestamp(merged), 1_000);
});

test("frontier falls back to the oldest cached event when nothing is contiguous", async () => {
const events = [
{
...event({ id: id("iso", 0), createdAt: 500, channelId: "x" }),
nonContiguous: true,
},
];
assert.equal(oldestContiguousHistoryTimestamp(events), null);
// mergeTimelineHistoryMessages keeps the incoming (unflagged) copy on collision.
const healed = mergeTimelineHistoryMessages(events, [
event({ id: id("iso", 0), createdAt: 500, channelId: "x" }),
]);
assert.equal(oldestContiguousHistoryTimestamp(healed), 500);
});

test("late island response never downgrades a contiguous copy (downgrade race)", () => {
const channelId = "island-downgrade-race";
// The event was missing when the ancestor fetch started, but a contiguous
// history page fetched it (unmarked) before the island response landed.
const contiguousCopy = event({
id: id("race", 0),
createdAt: 1_000,
channelId,
});
const cache = mergeTimelineHistoryMessages(
[],
[contiguousCopy, event({ id: id("new", 0), createdAt: 10_000, channelId })],
);

// Late ancestor/thread response for the same id must be a no-op — marking
// it here would re-poison the frontier that contiguous paging just fixed.
const merged = mergeNonContiguousTimelineMessages(cache, [
{ ...contiguousCopy },
]);

const kept = merged.find((e) => e.id === contiguousCopy.id);
assert.ok(!kept.nonContiguous, "island merge downgraded a contiguous copy");
assert.equal(oldestContiguousHistoryTimestamp(merged), 1_000);
// And the reverse direction: thread replies already contiguous stay put.
assert.equal(merged.length, cache.length);
});

test("dense-second seed ignores aux and island ids at the boundary second", () => {
const channelId = "dense-second-seed";
const second = 7_000;
// Two content events already held at the stalled second...
const contentA = event({ id: id("caa", 0), createdAt: second, channelId });
const contentB = event({ id: id("cbb", 0), createdAt: second, channelId });
// ...an unmarked reaction whose id sorts after both (aux kinds are absent
// from the bridge keyset pages, so its id proves nothing about content)...
const auxLater = event({
id: id("zaux", 0),
kind: 7,
createdAt: second,
channelId,
tags: [
["h", channelId],
["e", contentA.id],
],
});
// ...and a marked island whose id sorts after everything.
const islandLater = {
...event({ id: id("zisl", 0), createdAt: second, channelId }),
nonContiguous: true,
};

const seed = maxEventIdAtSecond(
[contentA, contentB, auxLater, islandLater],
second,
);

// Seeding from the aux/island id would tell the relay "everything up to
// this id is held" and skip unseen content rows in the same second.
assert.equal(seed, contentB.id);
// No contiguous content at the second -> no fabricated cursor.
assert.equal(maxEventIdAtSecond([auxLater, islandLater], second), null);
});
28 changes: 25 additions & 3 deletions desktop/src/features/messages/lib/pageOlderMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import { countTopLevelTimelineRows } from "@/features/messages/lib/formatTimelin
import { backfillAuxForMessages } from "@/features/messages/lib/auxBackfill";
import {
channelMessagesKey,
isTimelineWindowContentEvent,
mergeTimelineHistoryMessages,
oldestContiguousHistoryTimestamp,
} from "@/features/messages/lib/messageQueryKeys";
import { relayClient } from "@/shared/api/relayClient";
import { getChannelMessagesBefore } from "@/shared/api/tauri";
Expand Down Expand Up @@ -45,14 +47,27 @@ const inFlightPasses = new Map<string, Promise<PageOlderResult>>();
* **max** event id among all cached/fetched events at `created_at === second`.
* Seeding from the min id (e.g. `baseline[0].id`) would re-request rows already
* held; seeding from the max id asks the relay for the strictly-unreached tail.
* Non-contiguous events are excluded: an out-of-band island at the boundary
* second is a *point*, not proof that everything up to its id is held —
* seeding past it would skip the unfetched rows before it. The island itself
* just dedupes away when re-fetched. Auxiliary events are excluded for the
* same reason: the bridge keyset pages timeline-content kinds only, so an aux
* id at the boundary second says nothing about which *content* rows are held
* — seeding from a later aux id would skip unseen content in that second.
*
* Exported for tests only — the pager is the sole runtime caller.
*/
function maxEventIdAtSecond(
export function maxEventIdAtSecond(
events: RelayEvent[],
second: number,
): string | null {
let maxId: string | null = null;
for (const event of events) {
if (event.created_at !== second) {
if (
event.created_at !== second ||
event.nonContiguous ||
!isTimelineWindowContentEvent(event)
) {
continue;
}
if (maxId === null || event.id > maxId) {
Expand Down Expand Up @@ -201,7 +216,14 @@ async function runPageOlderPass(
// visible rows, so the user sees the loader dribble messages in 1-5 at a
// time. One commit = one bounded growth step.
const fetched: RelayEvent[] = [];
let oldestTimestamp = baseline[0].created_at;
// Anchor the cursor on the oldest *contiguously loaded* event, not the
// oldest event in cache: out-of-band merges (thread ancestors, thread-panel
// subtrees) plant isolated older "islands", and anchoring on an island pages
// backward from it — permanently skipping the unloaded history between the
// island and the contiguous window. Islands dedupe away (and lose their
// mark) once contiguous paging reaches them.
let oldestTimestamp =
oldestContiguousHistoryTimestamp(baseline) ?? baseline[0].created_at;

while (hasOlderMessages && shouldContinue()) {
const olderMessages = await relayClient.fetchChannelHistoryBefore(
Expand Down
13 changes: 10 additions & 3 deletions desktop/src/features/messages/useLoadMissingAncestors.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import * as React from "react";
import { useQueryClient } from "@tanstack/react-query";

import { channelMessagesKey } from "@/features/messages/lib/messageQueryKeys";
import { mergeMessages } from "@/features/messages/hooks";
import {
channelMessagesKey,
mergeNonContiguousTimelineMessages,
} from "@/features/messages/lib/messageQueryKeys";
import {
getChannelIdFromTags,
getThreadReference,
Expand Down Expand Up @@ -91,7 +93,12 @@ export function useLoadMissingAncestors(

queryClient.setQueryData<RelayEvent[]>(
channelMessagesKey(activeChannel.id),
(current = []) => mergeMessages(current, event),
// Non-contiguous merge: an ancestor fetched by id is typically far
// older than the loaded window, and anchoring the older-history
// pager on it would skip everything in between (see
// mergeNonContiguousTimelineMessages).
(current = []) =>
mergeNonContiguousTimelineMessages(current, [event]),
);
} catch (error) {
console.error("Failed to load ancestor event", eventId, error);
Expand Down
13 changes: 10 additions & 3 deletions desktop/src/features/messages/useThreadReplies.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import * as React from "react";
import { useQueryClient } from "@tanstack/react-query";

import { channelMessagesKey } from "@/features/messages/lib/messageQueryKeys";
import { mergeMessages } from "@/features/messages/hooks";
import {
channelMessagesKey,
mergeNonContiguousTimelineMessages,
} from "@/features/messages/lib/messageQueryKeys";
import { getThreadReplies } from "@/shared/api/tauri";
import type { Channel, RelayEvent, ThreadCursor } from "@/shared/api/types";

Expand Down Expand Up @@ -82,7 +84,12 @@ export function useThreadReplies(
if (response.events.length > 0) {
queryClient.setQueryData<RelayEvent[]>(
channelMessagesKey(channelId),
(current = []) => response.events.reduce(mergeMessages, current),
// Non-contiguous merge: a deep/old thread's replies can predate
// the loaded window; anchoring the older-history pager on them
// would skip everything in between (see
// mergeNonContiguousTimelineMessages).
(current = []) =>
mergeNonContiguousTimelineMessages(current, response.events),
);
}

Expand Down
Loading
Loading