[ON HOLD v0.2.1] fix(pgque.receive): prevent double-delivery on concurrent same-consumer calls#125
[ON HOLD v0.2.1] fix(pgque.receive): prevent double-delivery on concurrent same-consumer calls#125
Conversation
|
Held for v0.2.1 per maintainer call. The fix modifies the PgQ core engine ( |
|
I did a public-code-only review of #125 against the hold reason, and I think the next useful slice is a true two-session validation harness rather than more single-session assertions. Root-cause/validation shape I found:
I drafted that as a small local shell/psql harness and only static-checked it here ( |
|
@alceops Thanks, that would be useful! |
REV-style reviewAdapted the REV review flow (postgres-ai/rev) for this GitHub PR — REV is GitLab-native ( Aware this is labelled Scope: 1-line BLOCKING — 0NON-BLOCKINGMEDIUM
MEDIUM
LOW
LOW
LOW
POTENTIAL ISSUESMEDIUM Concurrency interaction with
LOW Hot-path effect:
LOW
Verified locally on PG18
Summary
Result: HOLD CONFIRMED. Diff quality is good and the fix is correct on paper. Two test-side items would be cheap to land independently of the v0.2.1 hold: (a) wire the new test into REV-style review (5 agents in spirit; SOC2 skipped per repo policy). Generated by Claude Code |
Documents the single-worker-per-consumer model: calling receive() twice without ack must return the same batch_id (not open a new batch for the same subscription cursor). Serial tests T1-T4 verify the sequential invariant. The concurrent race (two sessions, both see sub_batch=NULL) requires FOR UPDATE on the subscription SELECT in next_batch_custom() -- added in the following fix commit. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Two concurrent receive() calls for the same (queue, consumer) could both read sub_batch = NULL before either committed, allocate distinct batch_ids, and overwrite each other's subscription cursor -- delivering the same events twice. Fix: add FOR UPDATE OF s to the SELECT inside next_batch_custom(). The second session blocks on the row lock until the first commits; on unblocking it re-reads sub_batch != NULL and returns via the early-return guard without allocating a new batch_id. Also documents the single-worker-per-consumer contract in docs/reference.md and the pgque.receive() function comment. Closes #97 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
527f4a8 to
fd4dd74
Compare
|
Rebased #125 on current Important bit found during the rebase: the old fix only patched the transformed PgQ base Local PG17 RED/GREEN evidence: Before the override fix, the wired harness failed: After locking the override path too: Also passed: CI should now run |
Summary
Closes #97
Model decision: single-worker-per-consumer.
PgQue (like PgQ before it) uses one subscription cursor per
(queue, consumer)pair —pgque.subscriptionhasPRIMARY KEY (sub_queue, sub_consumer). The intended model is one worker per consumer name; fan-out uses distinct consumer names, each getting an independent cursor over all events.Root cause
pgque.next_batch_custom()read the subscription row with a plainSELECT(noFOR UPDATE). Two concurrent sessions could:sub_batch = NULLbefore either committedbatch_idfrompgque.batch_id_seqUPDATE pgque.subscription SET sub_batch = <new_id> WHERE sub_queue = ... AND sub_consumer = ...Both UPDATEs succeeded (the WHERE clause matched on
(sub_queue, sub_consumer), not onsub_batch IS NULL). The second writer overwrote the first'sbatch_id. Both sessions then calledget_batch_events()for the same tick range: double-delivery.Fix
Add
FOR UPDATE OF sto theSELECTinsidenext_batch_custom()(sql/pgque.sql).The second session blocks on the row lock until the first session commits. On unblocking, it re-reads
sub_batch != NULLand returns via the existing early-return guard — never reaching theUPDATEor the sequencenextval(). One batch per subscription at a time.Changes
sql/pgque.sql: addFOR UPDATE OF sto the subscriptionSELECTinnext_batch_custom(); add explanatory commentsql/pgque-api/receive.sql: add single-worker-per-consumer contract comment topgque.receive()docs/reference.md: document the single-worker-per-consumer contract in thepgque.receive()entrytests/test_concurrent_receive.sql: regression tests (T1 sequential idempotency, T2 idempotentnext_batch_custom, T3 no duplicate batch under the normal API, T4 cursor cleared after ack)TDD
13e222b— regression test documents the single-worker invariant and the concurrent race. Serial tests T1-T4 verify the sequential case (already correct). The concurrent case (two physical sessions) cannot be reproduced deterministically in a single SQL session; T2 is the closest serial proxy.527f4a8—FOR UPDATE OF sfix + documentation.Test run (green)
All existing tests (
test_api_receive,test_api_send,test_core_consumer,test_core_events,test_core_lifecycle) continue to pass.