Skip to content

Bug: concurrent receive() for same consumer can double-deliver same events #97

@NikolayS

Description

@NikolayS

Summary

Deep raw SQL testing found that two concurrent pgque.receive(queue, same_consumer) calls can both receive the same messages, with different batch_ids.

This appears to be a race in batch acquisition: the subscription row is read/updated without a strong enough lock or conditional update, so two sessions can both observe no active batch and allocate separate batches over the same tick range.

Impact

For the same queue + consumer, concurrent workers can process the same event twice.

That violates the expected single-consumer cursor semantics for a registered consumer. Fan-out across different consumers is expected; duplicate delivery to the same consumer due to concurrent receive() is not.

Evidence

Using a local two-session repro with an artificial sleep to widen the race, both sessions received the same msg_ids with different batch_ids:

('T1', 1.03, [(1, 7, 'p1'), (2, 7, 'p2')])
('T2', 1.02, [(1, 6, 'p1'), (2, 6, 'p2')])
subscription [(7, 2, 1)]

Both sessions got msg_id 1 and 2. Final subscription state retained only one of the batch ids.

Expected

For a given (queue, consumer):

  • at most one concurrent receiver should acquire the next batch
  • a second concurrent receiver should either see the same active batch in a controlled/idempotent way, block, or return no new batch depending on intended semantics
  • it should not allocate a distinct batch id for the same event range

Likely fix direction

In the lower-level batch acquisition path (next_batch_custom / related subscription update):

  • lock the subscription row (FOR UPDATE) before checking/updating sub_batch, or
  • use a conditional update like WHERE sub_batch IS NULL and verify exactly one row updated, or
  • use advisory lock keyed by subscription id

The invariant is: only one session can transition a subscription from no active batch to active batch.

Test to add

Add a concurrency regression test with two sessions calling receive() for the same consumer at the same time. It should assert that the same msg_id is not delivered under two different batch ids.

Environment

Tested on main at 9b3f89f.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingengineerOwned by an engineer agent (do work, open PR)sprint-v0.2.0Sprint v0.2.0 — Drivers, CI, README hero

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions