[POSTPONED post-v0.2.0] fix(clients/typescript): wake consumer on LISTEN/NOTIFY (#147)#194
[POSTPONED post-v0.2.0] fix(clients/typescript): wake consumer on LISTEN/NOTIFY (#147)#194
Conversation
REV review — PR #194VerdictHas potential issues — no hard blockers, but two correctness concerns and two test coverage gaps warrant fixes before merge. SummaryThe LISTEN/NOTIFY design is solid: a dedicated CI statusAll 9 check runs completed successfully: FindingsSecurity
Bugs / correctness
Tests
Guidelines (CLAUDE.md)
Docs
Recommended next stephas-potential — fix the REV-style review. SOC2 checks intentionally omitted per project policy. Generated by Claude Code |
Lifecycle status — ready for your final look
Not merging per your "I'll look myself" instruction. Branch ready when you are. Generated by Claude Code |
REV reviewThe fix correctly races a notification Promise against the existing poll-interval Blocking (must fix before merge)None. Non-blockingFINDING (bug-hunter, confidence 6, MEDIUM)
FINDING (bug-hunter, confidence 5, LOW)
FINDING (test-analyzer, confidence 6, MEDIUM)
FINDING (test-analyzer, confidence 5, LOW)
Skipped (SOC2)Skipped per repo policy. Verdict: APPROVE Generated by Claude Code |
Step 3 — real testingWalked the user-facing claim: consumer with Setup: PG 16, pgque from this branch's Script: import { connect } from '/home/user/pgque/.claude/worktrees/agent-aa65362714010e20b/clients/typescript/src/index.js';
const dsn = 'postgresql://postgres:pgque_test@localhost/pgque_test';
const queue = 'walkthrough_194';
const consumer = 'c194';
const producerClient = await connect(dsn);
const consumerClient = await connect(dsn);
const ac = new AbortController();
const received: { type: string; receivedAt: number }[] = [];
const c = consumerClient.newConsumer(queue, consumer, {
pollInterval: 60_000, // 60 SECONDS — polling alone would never fire in time.
});
c.handle('latency.test', async (msg) => {
received.push({ type: msg.type, receivedAt: Date.now() });
});
const startedAt = Date.now();
const consumerStartPromise = c.start(ac.signal);
// Give the LISTEN connection time to attach BEFORE we tick.
await new Promise(r => setTimeout(r, 500));
// Send + tick. On a vanilla 30s poll-only consumer, this would wait 30+ seconds before delivery.
const sendStart = Date.now();
await producerClient.send(queue, { type: 'latency.test', payload: { i: 1 } });
await producerClient.forceTick(queue);
await producerClient.ticker(queue);
const tickAt = Date.now();
// Wait up to 5 seconds for delivery — well under pollInterval=60s.
const deadline = Date.now() + 5_000;
while (received.length === 0 && Date.now() < deadline) {
await new Promise(r => setTimeout(r, 50));
}
const elapsed = received.length > 0 ? received[0].receivedAt - tickAt : -1;
console.log(`tick → receive elapsed: ${elapsed}ms (received ${received.length} msg)`);
console.log(`pollInterval: 60000ms (sleeping consumer would not have fired)`);
ac.abort();
try { await consumerStartPromise; } catch {}
await producerClient.close();
await consumerClient.close();
if (received.length === 0) {
console.error('FAIL: no message received within 5s — LISTEN/NOTIFY did NOT wake consumer');
process.exit(1);
}
if (elapsed > 5_000) {
console.error(`FAIL: delivery latency ${elapsed}ms exceeds the 5s ceiling (would have to be polling)`);
process.exit(1);
}
console.log(`PASS: delivery in ${elapsed}ms — well under pollInterval=60000ms, proves LISTEN/NOTIFY wakeup`);Output: Verdict: ✓ Notify-driven wakeup works end-to-end. Latency was 7 ms with a 60_000 ms poll interval — polling alone would not have fired. The README's low-latency claim now holds for the TS driver. Generated by Claude Code |
c02bfe8 to
774e88b
Compare
|
Rebased onto What conflicted: Only
Local test results: 55/55 passed ( Generated by Claude Code |
|
Moved out of scope of v0.2.0 and back to draft. Per issue #201 the LISTEN/NOTIFY tick-wakeup semantics still need:
Both are appropriate for v0.2.x rather than blocking the v0.2.0 stamp. The SQL change (tick-level Will resume on this branch (or a fresh one) once #201's verification work is staged. Generated by Claude Code |
Summary
Consumeronly usedsetTimeout-based polling (default 30 s).pgque.ticker()already emitspg_notify('pgque_<queue>', tick_id::text)but the TS driver never listened, causing up to 30 s of unnecessary latency.pg.Client(separate from the shared pool) is created byClient.newConsumervia a factory closure and issuesLISTEN pgque_<queue>. Anotificationevent handler resolves aPromisethat races againstsleep(pollInterval), waking the poll loop immediately on tick. Polling is fully retained as a safety-net fallback (missed notifications, reconnect windows). The LISTEN connection auto-reconnects on drop with 1 s → 2 s → 5 s backoff capped atpollInterval. OnAbortSignalabort,UNLISTENandclient.end()are called beforestart()resolves.Red/green proof
Red commit
6ef4279: unit testNOTIFY wakes the poll loop before the long sleep elapses (stub pg.Client)fails immediately —listenCalledisfalsebecause the old Consumer never opened a LISTEN connection.Green commit
c02bfe8: same test passes in <50 ms. WithpollInterval: 60_000, the notification fires and the poll loop callsreceive()within ~10 ms of the event — well under the 500 ms assertion window. A live-PG integration test (gated onPGQUE_TEST_DSN) additionally asserts delivery within 2 s with a 60 s poll interval.Test plan
bun run check— TypeScript types cleanbun run build— compiles without errorsbun run test— unit/in-memory mock tests pass (no DB required)PGQUE_TEST_DSN=... bun run test— integration tests including NOTIFY wakeup test pass in <2 s withpollInterval: 60_000Closes #147