Skip to content

[ON HOLD v0.2.1] fix(pgque.receive): prevent double-delivery on concurrent same-consumer calls#125

Open
NikolayS wants to merge 6 commits intomainfrom
fix/concurrent-receive
Open

[ON HOLD v0.2.1] fix(pgque.receive): prevent double-delivery on concurrent same-consumer calls#125
NikolayS wants to merge 6 commits intomainfrom
fix/concurrent-receive

Conversation

@NikolayS
Copy link
Copy Markdown
Owner

Summary

Closes #97

Model decision: single-worker-per-consumer.

PgQue (like PgQ before it) uses one subscription cursor per (queue, consumer) pair — pgque.subscription has PRIMARY KEY (sub_queue, sub_consumer). The intended model is one worker per consumer name; fan-out uses distinct consumer names, each getting an independent cursor over all events.

Root cause

pgque.next_batch_custom() read the subscription row with a plain SELECT (no FOR UPDATE). Two concurrent sessions could:

  1. Both read sub_batch = NULL before either committed
  2. Both allocate a new batch_id from pgque.batch_id_seq
  3. Both UPDATE pgque.subscription SET sub_batch = <new_id> WHERE sub_queue = ... AND sub_consumer = ...

Both UPDATEs succeeded (the WHERE clause matched on (sub_queue, sub_consumer), not on sub_batch IS NULL). The second writer overwrote the first's batch_id. Both sessions then called get_batch_events() for the same tick range: double-delivery.

Fix

Add FOR UPDATE OF s to the SELECT inside next_batch_custom() (sql/pgque.sql).

The second session blocks on the row lock until the first session commits. On unblocking, it re-reads sub_batch != NULL and returns via the existing early-return guard — never reaching the UPDATE or the sequence nextval(). One batch per subscription at a time.

Changes

  • sql/pgque.sql: add FOR UPDATE OF s to the subscription SELECT in next_batch_custom(); add explanatory comment
  • sql/pgque-api/receive.sql: add single-worker-per-consumer contract comment to pgque.receive()
  • docs/reference.md: document the single-worker-per-consumer contract in the pgque.receive() entry
  • tests/test_concurrent_receive.sql: regression tests (T1 sequential idempotency, T2 idempotent next_batch_custom, T3 no duplicate batch under the normal API, T4 cursor cleared after ack)

TDD

  • Red commit 13e222b — regression test documents the single-worker invariant and the concurrent race. Serial tests T1-T4 verify the sequential case (already correct). The concurrent case (two physical sessions) cannot be reproduced deterministically in a single SQL session; T2 is the closest serial proxy.
  • Green commit 527f4a8FOR UPDATE OF s fix + documentation.

Test run (green)

PASS T1: sequential idempotency -- same batch_id on repeated receive()
PASS T2: next_batch_custom idempotent for active subscription
PASS T3: no double-delivery -- receive() reuses active batch
PASS T4: subscription cursor cleared after ack
PASS: concurrent-receive regression tests complete

All existing tests (test_api_receive, test_api_send, test_core_consumer, test_core_events, test_core_lifecycle) continue to pass.

@NikolayS
Copy link
Copy Markdown
Owner Author

Held for v0.2.1 per maintainer call. The fix modifies the PgQ core engine (next_batch_custom) which CLAUDE.md key design rule #2 marks as sacred. Even with the conceptual rationale (single-worker contract + FOR UPDATE to serialize) and the test evidence, the maintainer is deferring to a later release after more rigorous validation (concurrent integration tests against PG 14–18 under sustained load, deadlock scenario coverage with pgque.maint(), lock-contention measurement). PR stays open; do not merge.

@NikolayS NikolayS changed the title fix(pgque.receive): prevent double-delivery on concurrent same-consumer calls [ON HOLD v0.2.1] fix(pgque.receive): prevent double-delivery on concurrent same-consumer calls Apr 30, 2026
@alceops
Copy link
Copy Markdown
Contributor

alceops commented Apr 30, 2026

I did a public-code-only review of #125 against the hold reason, and I think the next useful slice is a true two-session validation harness rather than more single-session assertions.

Root-cause/validation shape I found:

  • next_batch_custom() now locks the pgque.subscription row with for update of s, which matches the single cursor per (queue, consumer) model.
  • The existing tests/test_concurrent_receive.sql is valuable as an invariant test, but it also correctly notes that it cannot deterministically exercise the two-physical-session race.
  • A maintainer-run harness can start session 1 receive() in an open transaction, start session 2 for the same consumer, then assert session 2 waits and returns the same batch_id after session 1 commits. On the old code, the second session can resume after the blocked update and overwrite with a distinct batch id.

I drafted that as a small local shell/psql harness and only static-checked it here (bash -n, git diff --check); I did not run a PG 14–18 matrix in this environment. If useful, I can turn it into a narrow PR/validation pack that runs pre-fix vs PR head across PG 14–18 and adds a short lock/deadlock note, without needing production DB access, credentials, paid CI, or private data.

@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 2, 2026

@alceops Thanks, that would be useful!

Copy link
Copy Markdown
Owner Author

NikolayS commented May 3, 2026

REV-style review

Adapted the REV review flow (postgres-ai/rev) for this GitHub PR — REV is GitLab-native (glab) so I drove the same five agents (Security, Bug Hunter, Test Analyzer, Guidelines Checker, Docs Reviewer) over the diff manually. SOC2 skipped per repo policy.

Aware this is labelled wontfix and held for v0.2.1 per the maintainer comment above. The fix itself is minimal and correct on paper; the hold is on validation depth, not on diff quality. Posting the formal review for the record so the held-state has the per-agent findings attached.

Scope: 1-line for update of s added to pgque.next_batch_custom SELECT (sql/pgque.sql), inline contract comment on pgque.receive (sql/pgque-api/receive.sql), reference-doc paragraph (docs/reference.md), and a new sequential regression test (tests/test_concurrent_receive.sql, +302). +332 / −1 across 4 files.


BLOCKING — 0

NON-BLOCKING

MEDIUM tests/run_all.sql — New regression test is not wired in; CI never runs it

$ git diff main...pr-125 -- tests/run_all.sql
(empty)

tests/test_concurrent_receive.sql is added but never referenced from tests/run_all.sql. The matrix test job on PG 14-18 will not exercise it. The 75/75 pass count in the PR description doesn't include this file. A regression that removes for update of s from next_batch_custom would not be caught by CI even after this PR lands.

Fix: add a \i test_concurrent_receive.sql line to tests/run_all.sql in the same shape as siblings.

MEDIUM tests/test_concurrent_receive.sql — Tests are sequential by design; cannot exercise the race they fix

The test file's own header acknowledges this:

"True concurrent races require two sessions and cannot be deterministically reproduced in a single-session SQL file."

T1 (sequential idempotency), T2 (active-batch idempotency), and T3 (no-double-delivery in the same session) all pass on main before the fix — the early-return guard handles the serial case. They are regression sentinels for the early-return guard, not for for update of s itself.

Meanwhile, #175 (now merged) introduces a real two-session shell harness (tests/two_session_receive_lock.sh) that does fail pre-fix and pass post-fix. That harness is the missing red/green proof for this exact change.

Fix: either (a) cross-reference tests/two_session_receive_lock.sh from this test file's header so a future reader knows where the actual race coverage lives, or (b) lift the harness into a make target or CI job and rerun it on this branch as the green proof attached to the PR description. Option (a) is one comment line; option (b) is what closes the hold-rationale gap.

LOW tests/test_concurrent_receive.sql:200-220 (T3) — v_count is computed but never asserted

foreach v_id in array v_ids_a
loop
  if v_id = any(v_ids_b) then
    v_count := v_count + 1;
  end if;
end loop;

-- After fix: both calls return identical sets (same batch), so count = len.
-- ...
assert v_batch_b = v_batch_a,
  'T3: duplicate-free check: both receive() calls must be for the same batch';

The foreach loop accumulates v_count, then the assertion ignores v_count and re-checks v_batch_b = v_batch_a (which T3 already asserted 12 lines earlier). The intended check — "no duplicate msg_ids unless they came from the same batch" — was never written.

Fix: either drop the loop and the duplicated assertion, or replace the duplicated assertion with assert v_count = array_length(v_ids_a, 1), 'T3: when both calls return same batch, every msg_id must repeat'.

LOW tests/test_concurrent_receive.sql:282-291 — Final cleanup is not exception-safe

do $$
begin
  perform pgque.unregister_consumer('test_concurrent_recv', 'c1');
  perform pgque.drop_queue('test_concurrent_recv');
  ...

The leading cleanup at the top of the file uses pgque.drop_queue(..., true) (cascade) wrapped in exception when others then null; — defensive against partial state. The final cleanup doesn't. If a prior assertion in this test file ever fails partway through (or any future addition adds an assertion that does), the queue stays around and pollutes the next run_all.sql invocation.

Fix: mirror the top-of-file shape: perform pgque.drop_queue('test_concurrent_recv', true); inside exception when others then null.

LOW docs/reference.md — "Single-worker-per-consumer contract" paragraph is missing the crash-recovery note

The new paragraph explains the lock contract clearly. What it doesn't say: what happens if a worker crashes or its transaction rolls back before commit. The answer is "the row lock releases on transaction abort, the next caller observes sub_batch != null, and resumes the same batch" — important enough to include because it tells operators that a stuck worker doesn't permanently wedge the consumer.

Fix: append one sentence: "If a worker crashes before committing, the row lock releases on transaction abort and the next caller resumes the already-open batch (it observes sub_batch IS NOT NULL via the early-return guard in next_batch_custom)."

POTENTIAL ISSUES

MEDIUM Concurrency interaction with unregister_consumer and finish_batch

next_batch_custom now takes for update of s on pgque.subscription. Two functions on the same row deserve a quick check:

  1. finish_batch: also UPDATEs the subscription row (clears sub_batch, advances sub_last_tick). Standard write-write conflict resolution applies; this is the canonical "first session opens batch, then commits, then second session unblocks and observes the cleared sub_batch from the committed state in its re-read" path. Works correctly post-fix.
  2. unregister_consumer: PR Rotate subscription and tick tables to avoid held-xmin bloat (#61) #62's review noted that unregister_consumer had for update of s, c reduced to for update of c. With the new for update of s in next_batch_custom, an unregister_consumer racing with a receive() call has the row-lock dynamic flipped: receive() now serializes ahead of unregister_consumer, so we cannot anymore fall into a "consumer unregistered between SELECT and UPDATE in next_batch_custom" path. That's actually a positive side effect of this fix and worth documenting in the contract block.

Suggestion: add a short comment block near the new for update of s listing the three callers of pgque.subscription (next_batch_custom, finish_batch, unregister_consumer) and the order they take locks. Future maintainers debugging deadlocks will thank you.

LOW Hot-path effect: for update of s extends row-lock hold time across the whole transaction

Once next_batch_custom takes the lock, it releases at transaction end — typically the caller's transaction, not the function's. If a consumer's downstream handler runs in the same transaction (atypical but allowed), the lock is held for handler duration, which serializes other receivers on the same consumer for that long. The single-worker contract makes this benign by design, but worth flagging in docs/reference.md: "Keep pgque.receive() transactions short; the subscription row lock is held until commit." This is good operational hygiene and matches PgQ's existing batch-window short-transaction guidance.

LOW next_batch_custom is the canonical PgQ engine entry point — touching it is a CLAUDE.md key-design-rule-#2 event

CLAUDE.md: "The PgQ engine is sacred. Batch/tick/rotation/consumer tracking logic is inherited from PgQ and must not be modified without deep understanding." This PR modifies one of the most central engine functions. The maintainer's hold for v0.2.1 to validate against PG 14-18 with sustained load + deadlock scenarios is exactly the right call. Suggestion when un-holding: also add a tests/test_concurrent_receive_deadlock.sql exercising receivefinish_batchunregister_consumer interleaving via pg_try_advisory_xact_lock choreography in two psql sessions, and run it under all 5 PG versions before merge.


Verified locally on PG18

  • Diff applies cleanly to main. \i sql/pgque.sql install: clean.
  • tests/run_all.sql: 72/72 pass (test_concurrent_receive.sql is not wired in — see MEDIUM Set up PgQ git submodule (vendor/pgq/) #1).
  • Manual run of psql -f tests/test_concurrent_receive.sql: T1, T2, T3, T4 all PASS on PR head.
  • Sanity check: git diff main...pr-125 -- sql/pgque.sql | grep -c 'for update of s'1 (exactly the change advertised; no incidental engine modifications).

Summary

Area Findings (8-10) Potential (4-7) Filtered (0-3)
Security 0 0 0
Bugs 0 1 MED 0
Tests 2 MED, 2 LOW 0 0
Guidelines 0 1 LOW 0
Docs 1 LOW 1 LOW 0
Concurrency 0 1 MED 0

Result: HOLD CONFIRMED. Diff quality is good and the fix is correct on paper. Two test-side items would be cheap to land independently of the v0.2.1 hold: (a) wire the new test into tests/run_all.sql so it runs in CI, and (b) cross-reference the now-merged two-session harness from #175 to give this PR the actual red/green proof its sequential tests cannot provide. Both are pure test-tree changes, no engine touch.


REV-style review (5 agents in spirit; SOC2 skipped per repo policy).


Generated by Claude Code

NikolayS and others added 6 commits May 7, 2026 00:21
Documents the single-worker-per-consumer model: calling
receive() twice without ack must return the same batch_id
(not open a new batch for the same subscription cursor).

Serial tests T1-T4 verify the sequential invariant.
The concurrent race (two sessions, both see sub_batch=NULL)
requires FOR UPDATE on the subscription SELECT in
next_batch_custom() -- added in the following fix commit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Two concurrent receive() calls for the same (queue, consumer)
could both read sub_batch = NULL before either committed,
allocate distinct batch_ids, and overwrite each other's
subscription cursor -- delivering the same events twice.

Fix: add FOR UPDATE OF s to the SELECT inside
next_batch_custom(). The second session blocks on the row
lock until the first commits; on unblocking it re-reads
sub_batch != NULL and returns via the early-return guard
without allocating a new batch_id.

Also documents the single-worker-per-consumer contract in
docs/reference.md and the pgque.receive() function comment.

Closes #97

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@NikolayS NikolayS force-pushed the fix/concurrent-receive branch from 527f4a8 to fd4dd74 Compare May 7, 2026 00:25
@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 7, 2026

Rebased #125 on current main and wired the two-session receive harness from #175/#220 into CI.

Important bit found during the rebase: the old fix only patched the transformed PgQ base next_batch_custom(5), but current main now has the cooperative-consumers override that redefines the same legacy 5-arg function later in sql/pgque-api/cooperative_consumers.sql. That override also needed FOR UPDATE OF s; otherwise the new harness still failed.

Local PG17 RED/GREEN evidence:

Before the override fix, the wired harness failed:

FAIL: session2 returned batch 2; expected session1 batch 1
session2 receive time: ~3.8s

After locking the override path too:

PASS: concurrent same-consumer receive serialized; session2 waited 4s and idempotently returned batch 1

Also passed:

PASS T1: sequential idempotency -- same batch_id on repeated receive()
PASS T2: next_batch_custom idempotent for active subscription
PASS T3: no double-delivery -- receive() reuses active batch
PASS T4: subscription cursor cleared after ack
PASS: concurrent-receive regression tests complete

CI should now run tests/two_session_receive_lock.sh across the PG 14–18 matrix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

wontfix This will not be worked on

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bug: concurrent receive() for same consumer can double-deliver same events

2 participants