Skip to content

feat: add experimental cooperative consumers#208

Merged
NikolayS merged 36 commits intomainfrom
blueprint-coop-consumers
May 6, 2026
Merged

feat: add experimental cooperative consumers#208
NikolayS merged 36 commits intomainfrom
blueprint-coop-consumers

Conversation

@NikolayS
Copy link
Copy Markdown
Owner

@NikolayS NikolayS commented May 5, 2026

Summary

Feature PR for PgQue cooperative consumers / subconsumers for v0.2.

This PR is intentionally staged. The first commit adds the implementation blueprint so we can lock down behavior, API shape, experimental status, and worktree/subagent split before touching SQL and three client libraries.

Scope

Target feature scope:

  • PgQue core cooperative consumer support using existing pgque.consumer and pgque.subscription rows with shared sub_id
  • no new subconsumer table
  • SQL API for subconsumer registration, cooperative batch allocation, and receive_coop
  • cooperative-aware finish_batch() so existing ack(batch_id) works
  • Go, Python, and TypeScript client support
  • docs, roadmap, and release notes marking the feature experimental in 0.2
  • SQL and client tests for no duplicate batch allocation, stale takeover, ack/nack behavior, and backward compatibility

Experimental status

Cooperative consumers / subconsumers must ship as experimental everywhere in 0.2:

  • docs
  • release notes
  • roadmap table
  • client README files
  • API/reference docs

Clean-room note

pgq-coop was studied as behavior reference only. It has no visible license file/header, so implementation must not copy SQL text, comments, tests, or docs from it.

Current state

First step in this PR:

  • Add clean-room implementation blueprint
  • Mark the planned feature experimental in blueprint, roadmap, and client release guidance

Implementation steps to follow in this same PR unless we decide to split:

  • SQL core functions and grants
  • cooperative-aware finish_batch()
  • SQL regression tests
  • docs/reference/examples updates
  • Go client API and tests
  • Python client API and tests
  • TypeScript client API and tests
  • final full test pass

Tests so far

Docs-only first step. Verified touched files have no tabs or trailing spaces.

@NikolayS NikolayS changed the title docs: add cooperative consumers blueprint feat: add experimental cooperative consumers May 5, 2026
@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 5, 2026

Review notes after the latest SQL/style pass (ecab91a):

No confirmed release-blocking regression. The previous receive_coop() empty-batch issue appears fixed: a manual empty tick-window check finished the empty batch and did not strand the subconsumer.

A couple of things I’d still tighten before freezing the SQL API / starting client-library work:

  1. receive_coop(..., i_max_return => 0) should match receive() validation

    Normal pgque.receive() rejects i_max_return < 1, but receive_coop() currently does not. A practical check with one produced event and receive_coop('coop_max', 'main_c', 'w1', 0) returned 1 row.

    Suggested fix: add the same validation near the top of receive_coop():

    IF i_max_return < 1 THEN
        RAISE EXCEPTION 'i_max_return must be at least 1';
    END IF;

    And add a regression assertion for 0.

  2. Add explicit cooperative empty-batch regression test

    The implementation now appears correct, but the exact trap should be codified: create/register coop consumer, force an empty tick window, call receive_coop(), assert it returns 0 and clears sub_batch / active batch state, then send a later event and verify it can be received.

  3. Small style-doc concern

    CLAUDE.md now says lowercase SQL keywords are the repo convention, but also adds a strong “root keywords left-aligned / no decorative indentation” rule. Current SQL still has some PgQ-style aligned clauses, so the rule may be broader than what this PR actually enforces and could cause future review churn. Might be worth narrowing it to the concrete anti-pattern we’re trying to avoid.

Checks I saw green / clean:

  • GitHub CI green
  • bash build/transform.sh clean; generated sql/pgque.sql / sql/pgque-tle.sql match sources
  • tests/test_cooperative_consumers.sql passes on PG18
  • custom cooperative empty-batch probe passes

I’d treat #211 client-library audit as still blocked until this PR is merged/deferred and the SQL API semantics are frozen.

NikolayS and others added 3 commits May 5, 2026 12:48
- Drop byte-identical pgque.receive/ack/nack overrides; the canonical
  definitions in receive.sql already load before this file.
- Collapse the two pgque.next_batch overloads into one with
  i_dead_interval defaulting to NULL; receive_coop dispatch follows.
- Extract pgque._clear_member_cursor for the 4-field cursor-reset
  UPDATE used by finish_batch (coop_member), next_batch_custom
  victim-clear, and unregister_subconsumer.

Net -138 lines in cooperative_consumers.sql; full regression + acceptance
suites pass against PG 18.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Mirror the receive() #103 fix into receive_coop():

- Validate i_max_return >= 1 (parity with receive()'s guard).
- finish_batch the active batch when zero rows are yielded. Without
  this, a coop_member that opens a batch over an empty tick window
  leaves sub_batch set; subsequent receive_coop() calls short-circuit
  on the stale token and the subconsumer never advances, even after
  fresh events arrive.

Red/green TDD: tests/test_receive_coop_contracts.sql reproduces both
defects (max_return=0 silently accepted; sub_batch wedged after empty
window) on the unfixed code, and passes after the change.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Apply the SQL style rule from the upstream style commit to code
introduced on this branch: pgque._clear_member_cursor's UPDATE and
the test_receive_coop_contracts SELECT use root keywords flush at the
statement indent and continuation `and` indented under `where`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 5, 2026

REV Code Review Report

  • PR: NikolayS/pgque#208 — feat: add experimental cooperative consumers
  • Author: @NikolayS
  • AI-Assisted: Yes (multiple commits co-authored with Claude)
  • CI Status: ✅ All 11 checks passed (PG 14–18, pg_tle, pg_cron, Go/Python/TS clients)

Note: sql/pgque.sql and sql/pgque-tle.sql are mechanically-generated bundles of sql/pgque-api/cooperative_consumers.sql. Findings reference the source file; fix once and the generators will pick it up.


BLOCKING ISSUES (8)

HIGH sql/pgque-api/cooperative_consumers.sql:70-83unregister_consumer leaves wedged coop_main after removing last subconsumer

When called with a dotted-name member (e.g. unregister_consumer('q', 'main_c.w1')), the function deletes the member subscription but does NOT revert the main's sub_role from 'coop_main' to 'normal'. Compare with unregister_subconsumer (lines 769-781) which correctly reverts.
Combined with the next_batch_custom override (lines 202-209), which only rejects when coop_main AND exists coop_member, an orphan main with zero members will happily allocate a batch via the legacy 5-arg path — and then ack(batch) calls finish_batch, which raises 'cannot finish cooperative main consumer batch' (line 300-301). The consumer is permanently un-ackable.
Fix: After deleting the last member via this code path, recount remaining members and revert main's sub_role to 'normal'. Also align with unregister_subconsumer's consumer-row cleanup.

HIGH sql/pgque-api/cooperative_consumers.sql:80-83, 100-108unregister_consumer orphans pgque.consumer rows

Both branches delete pgque.subscription rows but only the cascade branch attempts a consumer-row cleanup, and even then only for the main's co_id. Member dotted-name consumer rows (e.g. main_c.w1) are never removed. unregister_subconsumer does this correctly.
Fix: After deleting member subscriptions, iterate freed member co_ids and delete from pgque.consumer when no other subscription references them.

HIGH sql/pgque-api/cooperative_consumers.sql:413-425register_subconsumer silently mutates a foreign consumer's sub_role

If a consumer named i_consumer already exists with sub_role='normal' and no active batch, this code unconditionally flips it to 'coop_main'. No ownership check. The function is reachable indirectly through receive_coop / next_batch, both granted to pgque_reader. Any reader who knows a victim consumer name can break that consumer: subsequent pgque.receive(queue, victim_consumer, …) calls hit the rejection at line 202-210 ("is a cooperative main consumer"). Cross-consumer DoS.
Fix: Require an explicit opt-in flag (e.g. i_create_main boolean default false), or refuse to convert a 'normal' row that the caller doesn't already own. At minimum require the role to already be 'coop_main'.

HIGH sql/pgque-api/cooperative_consumers.sql:70-114unregister_consumer cascade-deletes all members under one shared sub_id

When called with the main consumer name and no active member batches, the function deletes ALL pgque.subscription rows where sub_id = x_sub_id, plus all pgque.retry_queue rows for that owner. Granted to pgque_reader. In multi-tenant or shared deployments, anyone who can guess the main's name can wipe every subconsumer plus retry state.
Fix: Require an explicit i_force boolean default false / cascade boolean parameter before deleting members, or refuse to delete the main while any members exist (mirror unregister_subconsumer's ergonomics).

HIGH tests/test_cooperative_consumers.sql — Blueprint test case 14 (concurrent batch allocation) not exercised

The blueprint explicitly requires proving "concurrent subconsumers cannot allocate the same new batch." The for update lock on the main subscription row at line 535-541 is the central correctness mechanism, yet no test runs two sessions racing on next_batch / receive_coop to verify serialization. CI is single-session; this invariant is unverified end-to-end.
Fix: Add a multi-session test (psql \!, pg_background, or dblink) that calls receive_coop from two simultaneous sessions and asserts distinct batch_ids and no duplicate event delivery.

MEDIUM sql/pgque-api/cooperative_consumers.sql placement — Blueprint mandates "experimental everywhere", file lives in default-install path

The blueprint section "Experimental status" (lines 51-72) requires the feature ship as experimental in docs, release notes, roadmap, and READMEs. The repo already has a documented opt-in tier sql/experimental/ (referenced in docs/reference.md:560-562). This file is in sql/pgque-api/ and is bundled into the default sql/pgque.sql and sql/pgque-tle.sql (build/transform.sh:809). Default installs silently get the experimental schema change (sub_role column).
Fix: Either (a) move cooperative_consumers.sql to sql/experimental/ and remove from the default bundle, or (b) update the blueprint to document an "experimental but bundled" tier, with the marker (sub_role) explained.

MEDIUM docs/reference.md:436-475 — Stale documentation for overridden inherited functions

This PR overrides three inherited PgQ functions: unregister_consumer (line 24), next_batch_custom 5-arg (line 116), finish_batch (line 269). Each gains new exception paths (lines 77, 92, 202-210, 300-301) and cooperative branches. docs/reference.md still describes only the original PgQ semantics — users will hit unexpected exceptions with no documented cause.
Fix: Update the reference entries to note cooperative-aware error paths, and add a section documenting the new public API (register_subconsumer, unregister_subconsumer, subscribe_subconsumer, unsubscribe_subconsumer, touch_subconsumer, cooperative next_batch / next_batch_custom overloads, receive_coop) marked experimental.

MEDIUM docs/reference.md:554 — Grants table missing the 8 new functions

The PR grants 8 new cooperative functions to pgque_reader (cooperative_consumers.sql:851-858). None appear in the authoritative grants table.
Fix: Append the eight cooperative functions to the pgque_reader row, or list them under a new "Cooperative consumers" experimental subsection.


NON-BLOCKING (4)

INFO sql/pgque-api/cooperative_consumers.sql — SQL style: right-aligned root keywords throughout

CLAUDE.md (lines 45-47) declares left-aligned root SQL keywords "non-negotiable for generated and source SQL." The PgQ-derived overrides (lines 24-316) use heavy decorative right-alignment (e.g. lines 52-59, 175-193, 291-294, 305-311). Worse, the new clean-room code (lines 379-457, 525-655, 699-781) — added AFTER the style commit ecab91ab that introduced the rule — also uses right-shifted style. and/or continuation lines are not aligned under where.
Suggestion: Reformat all added/modified SQL in this file to left-aligned root keywords with and/or aligned under where indentation. Tests have the same drift in update/set/from blocks (e.g. test_cooperative_consumers.sql:156, 171-177).

INFO sql/pgque-api/cooperative_consumers.sql:243-245 — UPPERCASE NULL

cur_tick_id := NULL; and surrounding lines use uppercase NULL. CLAUDE.md requires lowercase SQL keywords.
Suggestion: Change to lowercase null.

INFO sql/pgque-api/cooperative_consumers.sql:1-4 — Misleading file header

Header reads "Clean-room PgQue implementation. No code copied from pgq-coop." But lines 24-316 are clearly PgQ-derived overrides (full PgQ-style docblocks, txid_*-era lock idioms). The blueprint's clean-room constraint applies only to pgq-coop, not to PgQ — so the PgQ-derived parts are licit. The header is just inaccurate.
Suggestion: Reword to: "Cooperative-aware overrides extend PgQ-derived primitives (unregister_consumer, next_batch_custom, finish_batch). New cooperative API (register_subconsumer, etc.) is clean-room — no code copied from pgq-coop."

LOW README.md:437 — Roadmap row not updated

"Subconsumers / coop consumers" remains unchecked in the roadmap, even though the PR ships the SQL into the default install with a 23-test regression suite. Internally inconsistent with the rest of the PR.
Suggestion: After deciding the placement (see BLOCKING above), mark this row as 🔬 experimental per blueprint section "Experimental status".


POTENTIAL ISSUES (12)

HIGH sql/pgque-api/cooperative_consumers.sql:567-593 — Caller-controlled i_dead_interval allows immediate batch theft (confidence: 6/10)

No floor on i_dead_interval — passing interval '0' or negative makes sub_active < now() - i_dead_interval trivially true for any peer. Within a single cooperative group this is "by design" (peers cooperate), so attack surface depends on whether all callers of one group are trusted. Worth a hardening note.
Suggestion: greatest(i_dead_interval, '5 seconds') floor, or document threat model.

MEDIUM sql/pgque-api/cooperative_consumers.sql:269-316finish_batch has no ownership check; new coop_member branch makes cursor reset reachable (confidence: 7/10)

Pre-existing PgQ behavior, but the new coop_member branch in _clear_member_cursor resets another subconsumer's sub_last_tick/sub_next_tick/sub_batch if a caller learns the batch_id. Compare to nack (receive.sql) which re-queries event ownership.
Suggestion: Either accept additional identity args (finish_batch(batch_id, queue, consumer, subconsumer)), or document the trust model explicitly.

MEDIUM tests/test_cooperative_consumers.sqlunregister_consumer cascade with idle members not tested (confidence: 8/10)

No test asserts that a coop_main with idle members is fully torn down (subscriptions, retry_queue, member consumer rows). This is the path that would surface the orphan-consumer-row bug.

MEDIUM tests/test_receive_coop_contracts.sql:91-113max_return limiting behavior not tested (confidence: 7/10)

Only rejection of < 1 is verified. Send 5 events into one batch with max_return=2; assert exactly 2 rows yielded with same batch_id, sub_batch still set, then ack and confirm the remaining 3 events do not redeliver to a fresh subconsumer.

MEDIUM tests/test_cooperative_consumers.sql:168-210 — Stale takeover test never re-runs the original (stolen-from) member (confidence: 7/10)

Test asserts ack(old_batch)=0 and nack(old_batch,…) rejection, but never has w1 call receive_coop again to verify its row was reset cleanly via _clear_member_cursor. Add a follow-up receive_coop('coop_stale','main_c','w1',10) and assert no batch / fresh batch behavior plus null sub_batch/sub_last_tick/sub_next_tick.

MEDIUM tests/test_cooperative_consumers.sqlregister_subconsumer race not tested (confidence: 6/10)

Two concurrent register_subconsumer calls for the same name should serialize via the consumer FOR UPDATE chain. Verify with pg_background or two psql sessions.

LOW sql/pgque-api/cooperative_consumers.sql:741-754 — Positional column-to-field mapping into pgque.message (confidence: 6/10)

The for v_msg in select ev_id, v_member.sub_batch, ev_type, … loop relies on positional assignment into the named composite type. Names differ (msg_id vs ev_id, payload vs ev_data, retry_count vs ev_retry, …). If pgque.message is ever reordered, this loop silently misroutes data into event_dead/event_retry.
Suggestion: Alias columns (ev_id as msg_id, ev_data as payload, …) so PL/pgSQL matches by name, or build with explicit row(...)::pgque.message.

LOW tests/test_cooperative_consumers.sql:354-369 — SKIP LOCKED contract not verified (confidence: 5/10)

Stale-takeover happy path is tested, but no test confirms that a held lock on a victim row does not block other workers. In one session, BEGIN; SELECT … FOR UPDATE on a stale member; in another, call receive_coop with dead_interval and assert it does not block.

LOW tests/test_cooperative_consumers.sql:248-296 — Test pollutes shared schema with a dead_letter trigger (confidence: 4/10)

If an assertion fails between CREATE TRIGGER coop_test_fail_dead_letter and the manual DROP TRIGGER, subsequent tests inherit a broken pgque.dead_letter. Wrap in a savepoint or run as its own file.

LOW sql/pgque-api/cooperative_consumers.sql:24, 116, 269 — Inherited PgQ-style docblocks not updated for cooperative branches (confidence: 7/10)

The original PgQ docblocks describe only the legacy semantics. The override at line 24 raises on cooperative subconsumers (line 77) and on cooperative main with live member batches (line 92); the override at line 269 raises on coop_main and clears member ticks for coop_member; the override at line 116 raises on coop_main with members. Headers don't mention any of this.

LOW sql/pgque-api/cooperative_consumers.sql:361-859 — New cooperative functions have no docblocks (confidence: 7/10)

In contrast to the inherited overrides at the top of the file, none of register_subconsumer, subscribe_subconsumer, touch_subconsumer, cooperative next_batch_custom/next_batch, unregister_subconsumer, unsubscribe_subconsumer, receive_coop carry header docblocks. The blueprint enumerates non-trivial WHY decisions (deterministic lock order, takeover allocates fresh batch_id, dead_interval is not a visibility timeout, forced unregister cannot drop messages) — none restated in code.

LOW No CHANGELOG.md and no per-function comment on function markers (confidence: 6/10)

Blueprint mandates the experimental wording appear "in function comments where users are likely to discover the API" (line 60). Users running \df pgque.receive_coop see no warning. Either create CHANGELOG.md for the 0.2-unreleased entry, or add comment on function pgque.receive_coop(...) is 'EXPERIMENTAL in PgQue 0.2: ...' to each new function.


Summary

Area Findings Potential Filtered
Security 3 3 0
Bugs 2 1 0
Tests 1 5 0
Guidelines 3 0 0
Docs 3 3 0
Total 12 12 0

Highlights:

  • The unregister_consumer override is the load-bearing problem: it leaves orphan coop_main rows with sub_role='coop_main' and no members, which then silently wedges via the next_batch + finish_batch combo. Fix this and the related orphan-consumer-row cleanup at the same time, and add a regression test.
  • register_subconsumer mutating a foreign normal consumer's sub_role is a real cross-consumer DoS — needs an explicit opt-in.
  • Concurrent batch allocation (the central correctness invariant) has no end-to-end test. Consider pg_background or a two-session smoke test before promoting out of experimental.
  • Placement: blueprint says experimental everywhere; file is in default install. Pick one.
  • SQL style violates the recently-introduced left-align rule throughout — the new clean-room code is not exempt.

Strong points: all SECURITY DEFINER functions correctly pin search_path = pgque, pg_catalog, no SQL injection vectors, grants correctly target pgque_reader, the empty-tick-window wedge fix and contract test are well-targeted, and the blueprint is thorough.


REV-assisted review (AI analysis by postgres-ai/rev)

@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 5, 2026

Re-reviewed latest head d403e73 after the follow-up fixes.

Previous findings look resolved:

  • receive_coop(..., i_max_return => 0) now rejects < 1 before batch allocation.
  • Explicit cooperative empty-batch regression added in tests/test_receive_coop_contracts.sql.
  • CLAUDE.md SQL style wording is clarified.
  • Generated SQL is consistent after bash build/transform.sh.

Checks:

  • GitHub CI green across PG 14–18, pg_cron, pg_tle, client tests, verify.
  • Local PG18 install of sql/pgque.sql passed.
  • Local tests passed:
    • tests/test_receive_coop_contracts.sql
    • tests/test_cooperative_consumers.sql
    • tests/test_receive_empty_batch.sql
    • full tests/run_all.sql

No release-blocking issues confirmed from my side now.

#211 client-library audit remains procedurally blocked until this PR is merged/deferred and the SQL API surface is frozen, but the known SQL-core blockers are gone.

@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 5, 2026

Ultrareview findings

Three findings, all in sql/pgque-api/cooperative_consumers.sql (mirrored into sql/pgque.sql and sql/pgque-tle.sql by the build).

bug_003 — Cooperative next_batch_custom: ambiguous next_tick_id (normal)

Location: sql/pgque-api/cooperative_consumers.sql:620-624

The cooperative overload declares out next_tick_id bigint and then runs:

select next_tick_id, next_tick_time, next_tick_seq
  into next_tick_id, v_next_tick_time, v_next_tick_event_seq
  from pgque.find_tick_helper(...);

pgque.find_tick_helper also has an OUT column named next_tick_id. With the default plpgsql.variable_conflict = error (no #variable_conflict directive anywhere in the repo), this raises column reference "next_tick_id" is ambiguous at runtime — but only on the else branch, which is reached iff i_min_count or i_min_interval is non-null.

Why CI is green: the pgque.next_batch and pgque.receive_coop wrappers always pass NULL for both knobs, so every test goes through the if-branch (which calls pgque.tick directly, no helper, no collision). The documented 7-arg next_batch_custom is granted to pgque_reader and broken for any direct caller wiring up min_count / min_interval.

The legacy 5-arg PgQ-derived next_batch_custom already shows the right pattern (locals named cur_tick_*); the cooperative rewrite reused next_tick_id for the OUT parameter and lost that property.

Fix: rename the outer OUT parameter (e.g. o_next_tick_id, assigned at end) or add #variable_conflict use_column at the top of the function. Add a regression test that calls pgque.next_batch_custom(queue, consumer, subconsumer, null, 1, null, null) after send + ticker so the else branch is exercised.

bug_001 — Forced unregister loses original ev_txid in DLQ (normal)

Location: sql/pgque-api/cooperative_consumers.sql:741-750

Forced unregister with batch_handling = 1 writes null::xid8 to pgque.dead_letter.ev_txid for messages routed to the DLQ:

for v_msg in
    select ev_id, v_member.sub_batch, ev_type, ev_data, ev_retry, ev_time,
           ev_extra1, ev_extra2, ev_extra3, ev_extra4
      from pgque.get_batch_events(v_member.sub_batch)
loop
    if coalesce(v_msg.retry_count, 0) >= v_max_retries then
        perform pgque.event_dead(v_member.sub_batch, v_msg.msg_id,
            'subconsumer unregistered', v_msg.created_at, null::xid8,
            ...);

The canonical pgque.nack() path (sql/pgque-api/receive.sql:112-131) preserves the original via v_ev.ev_txid::text::xid8. The blueprint (blueprints/COOPERATIVE_CONSUMERS.md, "Active batch unregistration") explicitly requires forced unregister to follow the same retry/dead-letter rules as nack().

v_msg is typed pgque.message, which has no ev_txid field — so the projection cannot carry it even if asked. pgque.get_batch_events() does expose ev_txid; the forced-unregister path simply drops it.

Impact: silent provenance loss in DLQ rows produced by forced unregister vs. nack(). Not a correctness break, but a direct violation of the documented nack-parity contract.

Fix: declare a local record (or sidecar v_ev_txid) and project ev_id, ev_txid, ev_type, ev_data, ev_retry, ev_time, ev_extra1..4 from pgque.get_batch_events(), then pass v_ev.ev_txid::text::xid8 to event_dead(), mirroring nack(). Add a test asserting that forced-unregister DLQ rows carry the original ev_txid, equivalent to the nack() path.

bug_002 — unregister_consumer single-coop-member branch leaks pgque.consumer rows (nit)

Location: sql/pgque-api/cooperative_consumers.sql:70-83

The new IF branch handles the single-coop-member unsubscribe:

if _sub_id_cnt > 1 and _sub_role = 'coop_member' then
    -- ...active-batch safety check...
    delete from pgque.subscription
          where sub_id = x_sub_id
            and sub_consumer = _consumer_id;
    return 1;

The ELSE branch (L84-109) and pgque.unregister_subconsumer both follow their subscription-row delete with an orphan-consumer check:

perform 1 from pgque.subscription where sub_consumer = _consumer_id;
if not found then
    delete from pgque.consumer where co_id = _consumer_id;
end if;

The new IF branch does not. The pgque.consumer row created by register_subconsumer for the composite name <consumer>.<subconsumer> is never deleted on this path.

Reachable via: pgque.unsubscribe(queue, 'billing.worker-1') against a cooperative consumer with sibling subconsumers. The blueprint recommends per-worker-lifecycle unique names (hostname/pid), so leaked rows are non-reusable. pgque.consumer.consumer_name_uq prevents duplicates but orphans never get reaped.

Impact: metadata bloat in pgque.consumer, one row per unregister_consumer call against a cooperative subconsumer with siblings. Not correctness/safety/data-loss; strictly an asymmetry/oversight in the legacy-style API for a feature this PR introduces.

Fix: mirror the ELSE branch's three-line orphan check after L82. Same fix in the regenerated mirrors.


Per CLAUDE.md: red/green TDD — failing tests first for bug_003 and bug_001, then the fix; bug_002 is small enough that a single-test regression alongside the fix is fine.

@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 5, 2026

Re-reviewed latest head 1c4b993.

No release-blocking issues found from my side.

Evidence:

  • GitHub CI green across PG 14–18, pg_cron, pg_tle, Go/Python/TypeScript client tests, verify.
  • bash build/transform.sh clean; generated sql/pgque.sql / sql/pgque-tle.sql consistent with sources.
  • git diff --check d403e73..HEAD clean.
  • Local PG17 focused tests passed:
    • tests/test_coop_ultrareview.sql
    • tests/test_coop_concurrency.sql
    • tests/test_receive_coop_contracts.sql
    • tests/test_cooperative_consumers.sql
  • Local full tests/run_all.sql passed.

The latest diff appears to cover the ultrareview fixes:

  • ambiguous cooperative next_batch_custom(... min_count ...) fixed/tested
  • forced unregister DLQ path preserves ev_txid
  • legacy unregister_consumer(member) cleanup/revert behavior covered
  • main cascade unregister rejected while members exist
  • normal → coop conversion requires explicit convert_normal=true
  • concurrent allocation test added and passing

#211 client-library audit remains procedurally blocked until this PR is merged/deferred and the SQL API surface is frozen, but I see no SQL-core blocker at this head.

@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 5, 2026

REV Code Review Report

  • PR: NikolayS/pgque#208 — feat: add experimental cooperative consumers
  • Author: @NikolayS
  • AI-Assisted: YES (Claude Opus 4.7 co-authored multiple commits)
CI Status Coverage
✅ all checks passed (12/12) — pipeline N/A

BLOCKING ISSUES (8)

Issues that should be addressed before merge (high-confidence HIGH/MEDIUM severity).

HIGH tests/test_cooperative_consumers.sql — Late nack(old_batch) test catches any exception as pass; does not verify new owner's batch survives

The stale-takeover test asserts nack raises but does not re-check that the new owner's sub_batch is still set after the failed nack. A regression where nack() silently clears state instead of raising would pass.
Fix: After the catch block, re-assert that the new owner's sub_batch = new_batch to prove the late nack did not clobber the active owner's state.

HIGH tests/test_cooperative_consumers.sql — Blueprint item 11 says nack must enable redelivery; test only checks retry-queue insertion, never re-receive_coops

Necessary but not sufficient: the test only verifies ev_owner after nack, never proves the message comes back.
Fix: After nack, advance past retry_after, call receive_coop again, assert the retried message returns with incremented ev_retry.

HIGH tests/test_coop_concurrency.sql:3692-3715 — dblink test does not actually exercise lock contention

dblink_send_query runs in implicit autocommit; the FOR UPDATE lock is released as soon as the CTE's INSERT INTO coop_concurrency_results commits — before pg_sleep(1) runs. Worker-2 starts after pg_sleep(0.2) and finds the lock already gone. Test currently proves serial execution, not serialized concurrent access.
Fix: Wrap worker-1's query in an explicit BEGIN/COMMIT with the sleep inside the same transaction, or use pg_advisory_lock so worker-2 actually blocks on the held lock.

HIGH tests/finish_batch coop_main corruption-guard branch is untested

finish_batch() raises if sub_role = 'coop_main' (cooperative_consumers.sql ~L1183). The path is a defensive invariant check, but is unexercised — the error message and behavior are unverified.
Fix: Manually set sub_batch on the coop_main row (direct SQL update) and call pgque.finish_batch(that_batch_id); assert the expected exception.

HIGH tests/ — Upgrade/migration path (sub_role default for pre-existing rows) not exercised

Blueprint test plan item 22 calls for verifying that pre-existing normal subscriptions get sub_role = 'normal' after the alter table add column if not exists runs. Current test only checks newly-created consumers, which exercises the column default, not the upgrade path.
Fix: Add a test that creates a subscription before the column exists (or simulates this), applies the DDL, and asserts existing rows get 'normal'. Or document why this is intentionally deferred.

MEDIUM tests/ — Public-API aliases subscribe_subconsumer / unsubscribe_subconsumer are never called in any test

They are granted to pgque_reader but every test uses register_subconsumer/unregister_subconsumer. A future signature/grant drift would not be caught.
Fix: Replace at least one register/unregister pair in an existing test with the alias names, or add a smoke-test.

MEDIUM sql/pgque-api/cooperative_consumers.sql (and the duplicated copies in sql/pgque.sql, sql/pgque-tle.sql) — Stale -- Calls: pgque.insert_event_raw(11) comment in next_batch_custom(5) override

The override does not call insert_event_raw — it calls pgque.find_tick_helper. Comment was copied verbatim from the upstream PgQ body when the coop-aware override was written and is now misleading.
Fix: Change to -- Calls: pgque.find_tick_helper or remove the Calls: line.

MEDIUM docs/reference.mdreceive_coop() silently auto-finishes empty batches; this is not documented

When the tick window has no events, receive_coop calls finish_batch internally and returns an empty set. A caller polling a quiet queue never sees a batch_id and cannot call ack() — they don't need to, but the docs don't say so. This differs meaningfully from receive().
Fix: Add to the receive_coop() doc entry: "Empty batches are auto-finished; no ack() call is needed when the result set is empty."


NON-BLOCKING (3)

Style violations against CLAUDE.md rules. Worth fixing but don't block correctness.

INFO docs/reference.md (~L149) — Concrete version tag "PgQue 0.2" in user-facing docs

CLAUDE.md: "No PR/issue numbers, and no concrete version tags, in the README or docs/."
Suggestion: Replace "Experimental in PgQue 0.2." with "Experimental." Blueprints and SQL function comments are exempt; this rule applies to docs/ and README.md.

INFO sql/pgque-api/cooperative_consumers.sqland/or continuation lines not aligned under where

CLAUDE.md: "Put and / or at the start of continuation lines, aligned under the statement's where clause indentation."
Examples in _clear_member_cursor, touch_subconsumer, next_batch_custom (and mirrored into pgque.sql, pgque-tle.sql).
Suggestion: Move every and/or to the same column as its parent where.

INFO sql/pgque-api/cooperative_consumers.sql — Root SQL keywords not left-aligned

CLAUDE.md: "Root SQL keywords MUST be left-aligned within the statement. Do not use decorative vertical indentation. This is non-negotiable for generated and source SQL."
Examples: update/set/from/where decoratively right-padded in register_subconsumer, touch_subconsumer, next_batch_custom. _clear_member_cursor is the correct reference style.
Suggestion: Align all root keywords at the same column. The PR's own style commit (d403e73b) attempted this; it appears not all sites were caught.


POTENTIAL ISSUES (15)

Medium-confidence issues (4-7/10). Review manually — may be false positives.

MEDIUM sql/pgque-api/cooperative_consumers.sql:~1644unregister_subconsumer(batch_handling=1) uses hardcoded event_retry(..., 60) (confidence: 7/10)

Blueprint promised parity with nack() retry policy. Hardcoded 60s deviates if the queue has a configured retry interval.
Suggestion: Read the queue's configured retry interval, or document this as an intentional fixed-60s divergence.

MEDIUM docs/reference.md — Claim "Normal receive() for a coop_main raises" relies on receive() happening to call next_batch_custom (confidence: 7/10)

receive() is not overridden in this PR. The guard fires only via the inner next_batch_custom call. A future refactor of receive() could silently break the rejection.
Suggestion: Add an explicit coop_main guard inside receive(), or note in docs that the rejection is inherited via next_batch_custom.

MEDIUM clients/{go,python,typescript}/README.md (not in diff) — Blueprint mandates "client README files" mark feature experimental (confidence: 7/10)

None of the client directories appear in this PR.
Suggestion: Add experimental notices in this PR or open a tracking issue and acknowledge the gap in the PR description.

MEDIUM tests/test_cooperative_consumers.sql:~4108 — After ack on coop_member, the coop_main row is not asserted (confidence: 7/10)

A regression that zeroes the main cursor on member ack would not fail the test.
Suggestion: After ack, also select the coop_main row and assert sub_last_tick advanced and sub_batch is null.

MEDIUM tests/unregister_subconsumer with invalid batch_handling value (e.g. 2) — error path untested (confidence: 7/10)

The function raises 'unsupported batch_handling value: %'; no test exercises this.
Suggestion: Call pgque.unregister_subconsumer('q','c','s', 2) and assert the expected exception.

MEDIUM tests/ — Double-unregister idempotency untested (confidence: 7/10)

Blueprint says second unregister returns 0. No test verifies this.
Suggestion: After successful unregister returning 1, call again and assert return is 0.

MEDIUM tests/ — 2-arg next_batch(queue, consumer) direct rejection on coop_main not tested (confidence: 6/10)

Test currently exercises receive() only, not next_batch directly.
Suggestion: Call pgque.next_batch('queue', 'main_c') directly with main_c having subconsumers; assert the cooperative-main error.

MEDIUM tests/touch_subconsumer on an active (in-batch) member untested (confidence: 6/10)

Blueprint item 21 explicitly requires this. Test only covers idle members.
Suggestion: After receive_coop and before ack, call touch_subconsumer, assert it returns 1, sub_active refreshed, sub_batch unchanged.

LOW tests/test_cooperative_consumers.sql:~4092 — Split-batch payload assertions reference m1.payload / m2.payload (confidence: 5/10)

If the message composite type's column is data (not payload), the assertion silently evaluates null = 'one' (= null, not failure).
Suggestion: Verify the field name; add assert m1.payload is not null guard.

LOW tests/test_coop_concurrency.sql:3708pg_sleep(0.2) timing on slow CI runners may be insufficient (confidence: 4/10)

CLAUDE.md mandates passing on PG 14–18 across all runners.
Suggestion: Use a coordination flag/poll loop instead of fixed sleep.

LOW docs/reference.md:~764 — Locking-bottleneck warning required by blueprint is missing (confidence: 6/10)

Blueprint says: "throughput can bottleneck on the main subscription row lock if many workers poll tiny batches."
Suggestion: Add a note under receive_coop() recommending tuning batch size and tick cadence.

LOW README.md — No "fan-out vs cooperative consumers" section as the blueprint mandates (confidence: 6/10)

Only two table-cell edits; no discoverability section, no usage example, no link to the reference.
Suggestion: Add a brief section pointing to docs/reference.md, or open a follow-up.

LOW blueprints/COOPERATIVE_CONSUMERS.md:~217 — Blueprint stale relative to implementation (confidence: 5/10)

Implementation adds convert_normal boolean default false to register_subconsumer; blueprint signature does not list it.
Suggestion: Add convert_normal boolean default false to blueprint signatures.

LOW (release notes) — Blueprint requires release notes mark experimental; no CHANGELOG / RELEASES.md change (confidence: 5/10)

Suggestion: Add release-note entry, or document that release notes are handled separately.

INFO sql/pgque-api/cooperative_consumers.sql — PgQ-derived decorative alignment carried verbatim into unregister_consumer and next_batch_custom(5) overrides (confidence: 5/10)

These overrides are new code, not inherited verbatim, so should adopt the current left-aligned style rather than the PgQ ancestor's indentation.
Suggestion: Re-indent the overrides to match _clear_member_cursor's left-aligned style.


Summary

Area Findings Potential Filtered
CI/Pipeline 0 0 0
Security 0 0 0
Bugs 0 0 0
Tests 6 8 0
Guidelines 3 1 0
Docs 2 6 0

Note:

  • Findings: high-confidence (8–10/10) — blocking or non-blocking per severity
  • Potential: medium-confidence (4–7/10) — review manually, may be false positives
  • Filtered: low-confidence (0–3/10) — excluded as likely false positives

SOC2 checks skipped per repo policy (CLAUDE.md: "Ignore SOC2 findings; this repo does not need them").

The cooperative-consumer SQL core, batch allocation serialization, stale-takeover token model, and _clear_member_cursor refactor were reviewed for security and bug regressions; no high-confidence findings. The blocking concerns are concentrated in test coverage gaps and one misleading SQL comment carried over from the PgQ source.


REV-assisted review (AI analysis by postgres-ai/rev)

@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 5, 2026

Re-reviewed latest head 84a4613.

No new release-blocking issue found since 1c4b993.

This update appears to be SQL/test style reformatting across:

  • sql/pgque-api/cooperative_consumers.sql
  • generated sql/pgque.sql
  • generated sql/pgque-tle.sql
  • cooperative SQL test files

Checks:

  • GitHub CI green: all 11 checks passing on current head.
  • bash build/transform.sh passes and leaves generated SQL clean.
  • git diff --check 1c4b993..84a4613 clean.
  • Focused PG17 tests passed locally:
    • tests/test_coop_ultrareview.sql
    • tests/test_coop_concurrency.sql
    • tests/test_receive_coop_contracts.sql
    • tests/test_cooperative_consumers.sql

I see no new blocker from this reformat. #211 client-library audit remains procedurally blocked until this PR is merged/deferred and the SQL API surface is frozen.

@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 5, 2026

Ultrareview pass 2 — escalations + new findings

Second pass corroborated all three findings from the previous comment. One escalation and one new style finding.

merged_bug_001 — escalates bug_001 from "NULL ev_txid" to silent DLQ data loss (normal)

Location: sql/pgque-api/cooperative_consumers.sql:741-757 (DLQ branch of forced unregister)

The forced-unregister DLQ branch does not just lose ev_txid provenance — it loses the entire DLQ row.

event_dead() (sql/pgque-additions/dlq.sql:69-86) resolves dl_consumer_id by querying subscription where sub_batch = i_batch_id. In the cooperative path the row with sub_batch = batch_id is the coop_member row (the main row sets sub_batch = null), so dl_consumer_id = the member co_id, not the main.

Right after the loop, unregister_subconsumer deletes the member subscription and (typical single-queue case) runs delete from pgque.consumer where co_id = v_member_consumer_id. dead_letter.dl_consumer_id is declared with on delete cascade (sql/pgque-additions/dlq.sql:24), so the cascade fires inside the same transaction and removes the DLQ row that event_dead() just inserted.

After commit: no retry_queue row, no dead_letter row, no subscription owns the message. Message is unrecoverable. This directly violates the blueprint's "must never drop messages" promise.

Why the test suite misses it: the only forced-unregister test exercising the DLQ branch is force_fail, which uses a trigger to make event_dead fail — it tests rollback, not the cascade-after-success path. The retry-branch test uses retry_count = 0, so it never reaches the DLQ branch.

Reproduction:

  1. create_queue('q'), set queue_max_retries = 1.
  2. register_subconsumer('q','main','w1'); send + tick a message.
  3. receive_coop → batch B1; nack(B1, M) → message in retry_queue with ev_retry = 1.
  4. maint_retry_events(); ticker; receive_coop → batch B2 with retry_count = 1.
  5. unregister_subconsumer('q','main','w1', 1). Inside DLQ branch: event_dead inserts dead_letter row keyed on co_id('main.w1'). Function then deletes that consumer row. Cascade removes the DLQ row.
  6. select * from pgque.dead_letter where ev_id = M.id → 0 rows. select * from pgque.retry_queue where ev_id = M.id → 0 rows.

Fix options:

  • Resolve dl_consumer_id to coop_main.co_id for cooperative DLQ writes (cooperative state is logically owned by the main; the main row is not deleted on subconsumer unregister).
  • Or skip the delete from pgque.consumer when the consumer has any dead_letter rows.
  • Or drop on delete cascade on dead_letter.dl_consumer_id and tolerate orphan DLQ rows.

The ev_txid parity fix from bug_001 is orthogonal and still needed: declare v_msg record (or sidecar var), select ev_txid from get_batch_events(), and forward v_ev.ev_txid::text::xid8 to event_dead().

bug_004 — bug_002 leak is broader than first reported (nit)

Location: sql/pgque-api/cooperative_consumers.sql:84-110 (the ELSE branch, not just the IF branch)

The first comment flagged the IF branch (single coop_member). The ELSE branch is also affected and leaks more rows per call:

When unregister_consumer targets a coop_main consumer, L100-101 bulk-deletes the main subscription plus every coop_member subscription sharing that sub_id. L103-108 then probe and delete only the main co_id. Every member co_id (billing.w1, billing.w2, …) leaks.

Reproduction:

select pgque.create_queue('q');
select pgque.register_subconsumer('q', 'billing', 'w1');
select pgque.register_subconsumer('q', 'billing', 'w2');
select pgque.unregister_consumer('q', 'billing');
select co_name from pgque.consumer where co_name like 'billing%';
-- still returns billing.w1 and billing.w2

Fix: before the bulk subscription delete, collect the sub_consumer values of coop_member rows sharing that sub_id. After the delete, run the same orphan check for each collected co_id.

bug_005 — left-aligned SQL keyword rule violated in same PR that introduced it (nit)

Location: sql/pgque-api/cooperative_consumers.sql:525-551 and many sibling blocks

The PR's CLAUDE.md (L45-49) declares left-aligned root SQL keywords "non-negotiable for generated and source SQL," but cooperative_consumers.sql still uses inherited PgQ right-aligned vertical style at L379-381, 395-399, 525-529, 531-536, 545-551, 602-606, 612-618, 699-703, 708-713, 718-720, 725-731, 759-761.

The earlier style: align new coop code to left-aligned SQL keywords commit (d403e73) only partially completed the cleanup. _clear_member_cursor at L351-357 is the in-file compliant model.

Pure formatting; re-flow select/from/where/and/or to the same column to satisfy the rule.

bug_009 — corroborates bug_003

The ambiguous next_tick_id finding was confirmed independently. Recommended fix in this pass: alias the helper (from pgque.find_tick_helper(...) fth and select fth.next_tick_id) — slightly less invasive than renaming the OUT param.

NikolayS and others added 5 commits May 5, 2026 16:19
event_dead() resolved dl_consumer_id via subscription.sub_batch, which
in the cooperative path matches the coop_member row. unregister_subconsumer
then deleted the member subscription and the orphan member consumer row;
dead_letter.dl_consumer_id has on delete cascade, so the DLQ row inserted
inside the same transaction was silently wiped before commit.

Route cooperative DLQ writes to the coop_main co_id (the persistent
consumer-group identity); normal subscriptions are unchanged. Add a
red/green regression in test_coop_ultrareview using unique consumer and
subconsumer names so the member co_id is not shared with sibling tests
(which is what was averting the cascade in the existing bug_001 test).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The previous test asserted distinct batch_ids and no duplicate events but
did not prove the second worker actually blocked on the coop_main FOR
UPDATE -- a regression that silently dropped the lock could have passed
on a fast machine where worker-1 simply finished first.

Worker-1 now holds for 2s after a 0.5s head start; worker-2 measures its
own wait via clock_timestamp() and asserts it exceeded 1000ms. Negative
test: shortening the hold to 0.1s reduces the observed wait to <10ms and
fires the assertion.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…h_batch

- Late-nack tests in stale-takeover and forced-unregister scenarios now
  capture and assert the specific "batch not found" message instead of
  swallowing any exception, so an unrelated error from nack() would fail
  the test rather than slip through. The stale-takeover case also re-reads
  the new owner's sub_batch after the rejection to prove no clobber.

- Add a redelivery proof for cooperative nack: send + receive + nack with
  retry_after = 0, then commit, run maint_retry_events + tick + receive in
  separate top-level statements, and assert the same msg_id returns with
  an incremented retry_count. The existing test only checked retry_queue
  insertion -- necessary but not sufficient.

- Add a corruption-guard test for finish_batch on a coop_main row: force
  sub_batch to a synthetic value via direct UPDATE and assert finish_batch
  raises the expected message.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Add an "empty tick windows are auto-finished" note: receive_coop()
  closes empty batches internally, so callers polling a quiet queue do
  not need to ack a batch_id. This is a meaningful divergence from
  receive() and was previously undocumented.
- Add a throughput note explaining that cooperative allocation
  serializes on a FOR UPDATE of the coop_main row, so undersized
  batches and short tick periods make the main row a hotspot.
- Drop the "in PgQue 0.2" version tag from the experimental marker per
  the docs rule ("no concrete version tags in README/docs").

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- next_batch_custom(5) override's docblock listed `pgque.insert_event_raw(11)`
  under "Calls:", carried verbatim from the upstream PgQ body. The cooperative
  override actually calls `pgque.find_tick_helper`. Update accordingly.
- Add a round-trip smoke test for `subscribe_subconsumer` /
  `unsubscribe_subconsumer`. The aliases are documented public API but no
  test called them, so a future signature or grant drift would have been
  invisible.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 5, 2026

Status update — addressed five items from the latest reviews on top of 84a4613:

Commit What
817c084 fix: DLQ data loss in forced unregister. event_dead() resolved dl_consumer_id to the coop_member's co_id; unregister_subconsumer then deleted the orphan member consumer row, and dead_letter.dl_consumer_id's on delete cascade wiped the DLQ row in the same transaction. Now routes cooperative writes to the coop_main.co_id. Red/green test in test_coop_ultrareview.sql uses unique consumer/subconsumer names — the existing bug_001 test passed only because bug_003 ran first and accidentally shared main_c.w1, averting the cascade.
eef16b7 test: concurrency test now asserts worker-2 actually blocked >1000ms on the coop_main FOR UPDATE. (REV pass #2's claim that the lock released before pg_sleep was not reproducible — empirical probe shows 807ms wait — but the new assertion makes the property load-bearing so a future regression cannot quietly pass.)
27eb849 test: late-nack handlers in stale-takeover and forced-unregister tests now match the specific batch not found:% message and re-read sub_batch to prove no clobber. New nack→redelivery proof for cooperative consumers (separate top-level statements so maint_retry_events + tick + receive cross transaction boundaries per the snapshot visibility contract). New finish_batch coop_main corruption-guard test.
62828b2 docs: receive_coop() empty-batch auto-finish documented; throughput/locking note added; "in PgQue 0.2" stripped per the no-version-tags-in-docs rule.
88f0b4f chore: stale pgque.insert_event_raw(11) "Calls:" comment in next_batch_custom(5) corrected to pgque.find_tick_helper; round-trip smoke test for subscribe_subconsumer / unsubscribe_subconsumer aliases.

CI: green across the chain.

Local checks (PG 18): full tests/run_all.sql green; bundles regenerated cleanly; git diff --check clean.

Deferred with rationale

  • bug_004 (ELSE-branch member co_id leak) — verified non-reproducible. The literal repro (unregister_consumer('q','billing') against a coop_main with members) hits the rejection guard added in the previous round (cannot unregister cooperative main consumer with registered subconsumers). The bulk-delete path is no longer reachable for cooperative state.
  • bug_005 (left-align SQL keyword style) — checked the cited blocks with an indent-equality probe across cooperative_consumers.sql: every select/from/where within a statement sits at the same column. The cited line ranges describe vertically-decomposed statements (one column per line) where root keywords are still left-aligned to the surrounding indent. and/or continuations are indented one level past where, matching dlq.sql and the rest of the file; CLAUDE.md's "aligned under the where clause indentation" reads either way and codebase usage is mixed. Happy to do the column-shift pass if the intent is the stricter reading.
  • Upgrade-path test for sub_role defaultalter table ... add column if not exists ... default 'normal' is a PG primitive, not pgque code; new subscriptions inheriting 'normal' is exercised implicitly by every existing test that calls register_consumer.

@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 5, 2026

Re-reviewed latest head a17b5caf against previous clean head 84a4613.

No release-blocking issues found from my side.

The latest diff appears to address prior review concerns:

  • cooperative DLQ rows now anchor to coop_main, avoiding member-row cascade loss
  • forced-unregister DLQ survival + ev_txid preservation tested
  • concurrency test now measures real FOR UPDATE contention
  • late nack tests assert batch not found:% and no clobber
  • nack → redelivery, finish_batch coop-main guard, alias smoke tests added
  • docs updated for receive_coop() empty-batch auto-finish + lock bottleneck
  • stale Calls: comment corrected

Checks/evidence:

  • GitHub CI green, all 11 checks passing.
  • bash build/transform.sh clean.
  • generated sql/pgque.sql / sql/pgque-tle.sql clean.
  • git diff --check 84a4613..HEAD clean.
  • focused PG17 tests passed:
    • tests/test_coop_ultrareview.sql
    • tests/test_coop_concurrency.sql
    • tests/test_receive_coop_contracts.sql
    • tests/test_cooperative_consumers.sql
  • full PG17 tests/run_all.sql passed.

#211 client-library audit remains procedurally blocked until this PR is merged/deferred and the SQL API surface is frozen.

@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 6, 2026

REV Code Review Report

  • PR: NikolayS/pgque#208 — feat: add experimental cooperative consumers
  • Author: @NikolayS
  • Branch: blueprint-coop-consumersmain  •  14 files, +5479 / -15
  • AI-Assisted: yes
CI Coverage
✅ all 11 checks passed (Actions run) n/a

This is a re-review on top of the previous REV report posted at 22:44:59Z. Focus is on items not yet resolved by commits 27eb8491, 62828b28, 88f0b4f8, 817c084e, eef16b78, a17b5caf, and on issues newly visible in the current head.


BLOCKING ISSUES (7)

Issues that should be addressed before merge (high-confidence MEDIUM/HIGH).

HIGH tests/test_cooperative_consumers.sql (missing) — No assertion on coop_main row state after ack on a coop_member

The test reads only the member row after ack and asserts sub_batch is null / sub_last_tick is null on the member. The blueprint invariant "coop_main must never have sub_batch is not null, and its sub_last_tick advances correctly" is not asserted on the actual main row.
Fix: After perform pgque.ack(m1.batch_id), query the coop_main subscription row and assert sub_batch is null and sub_last_tick = <expected next tick id>.

HIGH tests/test_cooperative_consumers.sql (missing)unregister_subconsumer idempotency not tested

Implementation returns 0 early when the member is already gone (if not found then return 0; end if;); blueprint requires this. No test exercises the second-call path.
Fix: After assert pgque.unregister_subconsumer('q','c','w1') = 1, add assert pgque.unregister_subconsumer('q','c','w1') = 0, 'second unregister must be idempotent'.

HIGH tests/test_cooperative_consumers.sql (missing) — Invalid batch_handling value not exercised

The function raises 'unsupported batch_handling value: %' for any value not in (0,1). No test triggers this branch.
Fix: Wrap pgque.unregister_subconsumer('q','c','w1', 2) in begin/exception when others and assert sqlerrm like 'unsupported batch_handling value%'.

HIGH tests/test_cooperative_consumers.sql (missing)touch_subconsumer on an active (in-batch) member not covered

Only the idle path is asserted (touch_subconsumer on idle member = 1). The blueprint explicitly requires the active-member case: returns 1, refreshes sub_active, leaves sub_batch unchanged.
Fix: After receive_coop allocates a batch, call touch_subconsumer, assert it returns 1, then re-read the member row and assert sub_batch is not null (unchanged) and sub_active was refreshed.

HIGH tests/test_cooperative_consumers.sql (missing) — Direct next_batch / next_batch_custom rejection on coop_main not tested

The current test reaches the rejection only via receive('q','main_c',10). The 2-arg next_batch('q','main_c') and the 5-arg legacy next_batch_custom('q','main_c', null, null, null) paths are not exercised. A future refactor of receive() could silently break the rejection.
Fix: Call both forms inside begin/exception blocks and assert the message matches 'consumer % on queue % is a cooperative main consumer%'.

MEDIUM blueprints/COOPERATIVE_CONSUMERS.md:~217 — Blueprint signature is stale: register_subconsumer shown as 3-arg, implementation has 4 args

Implementation: register_subconsumer(queue, consumer, subconsumer, convert_normal boolean default false). Blueprint shows only the first three. Same for the subscribe_subconsumer alias.
Fix: Add convert_normal boolean default false to the blueprint signatures for both register_subconsumer and subscribe_subconsumer, and document the conversion semantics.

MEDIUM sql/pgque-api/cooperative_consumers.sql (mirrored in sql/pgque.sql, sql/pgque-tle.sql) — finish_batch header still says Calls: None

The body now calls pgque._clear_member_cursor(...) on the coop_member branch. The previous chore(coop): fix stale Calls comment commit (88f0b4f8) updated other headers but missed this one.
Fix: Change -- Calls:\n-- None to -- Calls:\n-- pgque._clear_member_cursor (coop_member branch) in all three copies.


POTENTIAL ISSUES (21)

Issues with moderate confidence (4–7/10). Review manually — may be false positives.

HIGH sql/pgque-api/cooperative_consumers.sql:~1182-1198 (mirrored) — Misleading "PgQ corruption" exception when a coop_member is passed to legacy next_batch_custom(5) (confidence: 7/10)

The override only rejects sub_role='coop_main'. A coop_member falls through; with sub_last_tick NULL by design, the LEFT JOIN pgque.tick yields prev_tick_id is null and the generic "PgQ corruption" exception fires.
Suggestion: Add an explicit if sub_role = 'coop_member' then raise exception 'consumer % on queue % is a cooperative subconsumer; use receive_coop / next_batch_custom (cooperative form)', ...; end if; before the prev_tick_id sanity check.

HIGH tests/test_cooperative_consumers.sql (missing) — Legacy non-coop receive flow with an active coop group on the same queue not exercised (confidence: 7/10)

The current test only checks receive(...,'normal_c',10) = 0 against an empty queue. A regression that suppresses fan-out for 'normal' consumers when a coop_main exists on the same queue would not fail.
Suggestion: Send an event, tick, and assert that BOTH the normal consumer AND the cooperative group receive it independently.

HIGH sql/pgque-api/cooperative_consumers.sql:~1963-1978 (mirrored) — touch_subconsumer does not lock the parent main during the role-flip window (confidence: 6/10)

register_subconsumer and unregister_subconsumer correctly serialize on the main row via for update. touch_subconsumer only locks the member, so a freshly-touched member could refer to a parent that just flipped from coop_main back to 'normal'.
Suggestion: Replace the select count(*) … if = 0 then update two-step in unregister_subconsumer with a single update … where … and not exists (select 1 from pgque.subscription where sub_id = v_main.sub_id and sub_role='coop_member'), so the demotion check is atomic with the held main lock; or have touch_subconsumer re-validate the parent row.

MEDIUM sql/pgque-api/cooperative_consumers.sql:~1939 (mirrored at sql/pgque-tle.sql:~3179 and sql/pgque.sql) — Hardcoded 60 passed to pgque.event_retry (confidence: 7/10)

Two concerns: (1) literal type is integer; if pgque.event_retry's third arg is interval the call fails at runtime ("function does not exist"). (2) Even with an int4-seconds overload, the value is undocumented and bypasses the queue's configured retry policy. Blueprint says force-unregister with batch_handling=1 should follow the same retry rules as nack().
Suggestion: Use interval '60 seconds' if the signature requires interval, or read the queue's configured retry interval (alongside queue_max_retries in the same SELECT at ~line 1849). Document the choice in docs/reference.md under unregister_subconsumer.

MEDIUM sql/pgque-api/cooperative_consumers.sql:~1090-1127 (mirrored) — Stale doc-header on the legacy next_batch_custom(5) override (confidence: 6/10)

Header is the verbatim PgQ docstring. It does not mention the new coop_main rejection branch, and Calls: pgque.find_tick_helper / Tables directly manipulated: update - pgque.subscription understate the now-added EXISTS read on pgque.subscription.
Suggestion: Update the header to describe the coop_main rejection branch and add the EXISTS read.

MEDIUM sql/pgque-api/cooperative_consumers.sql:~1053 (mirrored) — Stale comment "this will drop subconsumers too" inside unregister_consumer (confidence: 6/10)

The surrounding branch raises earlier (~line 1031, 1045) when any coop_member rows exist or have active batches. The DELETE only ever removes a single normal/coop_main row.
Suggestion: Replace with -- delete the single normal/coop_main subscription; member rows were rejected above.

MEDIUM tests/test_coop_concurrency.sql:~4663pg_sleep(0.5) fixed head-start may be insufficient on slow CI runners (confidence: 7/10)

The assertion requires w2_wait_ms > 1000. On a loaded PG 14–18 matrix, the dblink open + receive_coop traversal for w1 may take longer than 500 ms, allowing w2 to grab the lock first; the measurement then captures the wrong direction and the test fails spuriously.
Suggestion: Replace fixed sleep with a poll loop on pg_stat_activity.wait_event_type='Lock' to confirm w2 is actually blocked before measuring; or extend the head-start to 1 s and the hold to 3 s.

MEDIUM tests/test_cooperative_consumers.sql (missing) — Direct finish_batch(batch_id) on a coop_member not tested (confidence: 7/10)

finish_batch is part of the public PgQ-style primitive surface. The cooperative branch is reached via ack only.
Suggestion: After next_batch('q','c','w1'), call pgque.finish_batch(batch_id) directly and assert it returns 1, with member's sub_batch and sub_last_tick cleared.

MEDIUM docs/reference.md:~778register_subconsumer reference omits the convert_normal parameter (confidence: 7/10)

Implementation has convert_normal boolean default false; without it, conversion of an existing normal consumer is silently refused. Users reading only the reference cannot discover this.
Suggestion: Show the full signature and document the conversion semantics.

MEDIUM docs/reference.md:~778subscribe_subconsumer alias entry inherits the same convert_normal omission (confidence: 7/10)

The alias is described as identical to the parent, which is itself missing convert_normal.
Suggestion: Either show both full signatures explicitly, or fix the parent entry and clarify the alias relationship.

MEDIUM tests/test_cooperative_consumers.sql (missing) — Full DLQ teardown sequence not exercised (confidence: 6/10)

dlq_survive covers DLQ persistence after unregister_subconsumer(..., 1). Full teardown — all members unregistered → main demoted to 'normal' → main unregistered — is not asserted to preserve DLQ rows.
Suggestion: Build a coop DLQ row, then unregister all members, then unregister_consumer the main; assert the DLQ row still exists with the original queue reference.

MEDIUM tests/test_coop_concurrency.sql:~4610dblink extension may not be installable on managed-Postgres targets (confidence: 6/10)

CLAUDE.md and the project positioning emphasize compatibility with Cloud SQL / Aurora / Neon / Supabase. CI passes today on hosted runners, but this test will fail on managed targets that disable contrib.
Suggestion: Wrap the test in a pg_available_extensions check that issues RAISE NOTICE 'SKIP …' and exits cleanly if dblink is unavailable.

MEDIUM sql/pgque-api/cooperative_consumers.sql:~1761-1770 — Empty-window return path does not refresh the member's sub_active (confidence: 5/10)

Active-batch path and the empty-window path behave asymmetrically with respect to heartbeat. The takeover query also requires sub_batch is not null, so an idle member can't be victimized today, but the asymmetry is brittle and easy to break in a future refactor.
Suggestion: Either refresh sub_active = clock_timestamp() on the empty-window return, or add an explicit comment documenting why it is intentionally skipped.

LOW sql/pgque-api/cooperative_consumers.sql:~1929 — Wasteful v_ev.ev_txid::text::xid8 round-trip cast (confidence: 4/10)

If ev_txid is already xid8, a direct ::xid8 (or no cast) suffices. Same pattern repeated in tests/test_coop_ultrareview.sql:~4833.
Suggestion: Replace with a direct cast and apply the same cleanup in the test.

LOW tests/test_cooperative_consumers.sql:~5209m1.payload/m2.payload field-name guard absent (confidence: 5/10)

If a future refactor renames the pgque.message field, the assertion would actually raise (PL/pgSQL ASSERT raises on NULL too), so this is not a current correctness issue — but adding assert m1.payload is not null makes the failure mode self-explanatory.
Suggestion: Add assert m1.payload is not null, 'verify pgque.message field name' before the equality check.

LOW tests/test_receive_coop_contracts.sql:~5623 — Same field-name guard for v_msg.type (confidence: 5/10)

Same reasoning as above; CI passing implies the field exists.
Suggestion: Optional safeguard assert v_msg.type is not null before the equality assertion.

LOW (missing — docs/tutorial.md, docs/examples.md) — Blueprint's documentation plan calls for these updates; PR ships none (confidence: 6/10)

Blueprint mandates a "Fan-out vs cooperative consumers" section under docs/.
Suggestion: Add a short comparison + minimal receive_coop example to docs/examples.md, or open a tracking issue and acknowledge the deferral in the PR description.

LOW README.md:~25 — Role table mentions "experimental cooperative consumer functions" without linking to the reference (confidence: 5/10)

A reader cannot discover the cooperative API from the table alone.
Suggestion: Link the phrase to docs/reference.md (anchor on the cooperative-consumers section).

LOW blueprints/COOPERATIVE_CONSUMERS.md:~603-635 — Documentation plan still lists "client README files" as required, but no client files are touched (confidence: 5/10)

Worth either adding a "deferred to follow-up" note in the blueprint or filing tracking issues.
Suggestion: Add an explicit deferral note or link to follow-up issues.

INFO sql/pgque-api/cooperative_consumers.sql:~1966 (mirrored) — unregister_subconsumer's count(*) where block violates the and/or continuation indent rule introduced by this same PR (confidence: 7/10)

Inline where … and sub_id = … (8-space and) does not align under the where (4-space). Other sites in the same file use the expanded form.
Suggestion: Either expand to where\n sub_queue = …\n and sub_id = …, or move and to 4-space indent — consistently across all three files (source + generated).

INFO PR description — "Tests so far: Docs-only first step" is stale (confidence: 4/10)

The diff includes the full SQL implementation, three regression test files, the cooperative-aware finish_batch override, and reference doc updates. The "Current state" checkboxes are not updated.
Suggestion: Update the PR description so reviewers scanning the summary aren't misled.


Summary

Area Findings Potential Filtered
CI/Pipeline 0 0 0
Security 0 0 0
Bugs 0 7 0
Tests 5 7 0
Guidelines 0 1 0
Docs 2 5 0
Metadata 0 1 0
  • Findings: high-confidence (8–10/10) — blocking or non-blocking per severity
  • Potential: medium-confidence (4–7/10) — review manually, may be false positives
  • Filtered: low-confidence (0–3/10) — excluded as likely false positives

SOC2 checks skipped per repo policy (CLAUDE.md: "Ignore SOC2 findings; this repo does not need them").

The cooperative SQL core, batch allocation serialization, stale-takeover token model, DLQ rerouting, and the _clear_member_cursor refactor were re-reviewed for security and bug regressions; no high-confidence security findings. Blocking concerns are concentrated in test-coverage gaps for blueprint-required behaviors and two documentation/code mismatches that the prior round of fixes nearly — but not fully — caught.


REV-assisted review (AI analysis by postgres-ai/rev)

NikolayS added 6 commits May 5, 2026 17:12
Style guide says "Root keywords on their own line (except with single
argument)". The previous a17b5ca pass caught scalar selects but missed
the `if not exists (select 1 from t where ...)`, `perform 1 from t
where ...`, `select * into v from t where <single>`, single-target
`set`, single-condition `where`, and bare-from-with-function-call
patterns inside cooperative_consumers.sql.

Source edited; sql/pgque.sql and sql/pgque-tle.sql regenerated via
build/transform.sh. Behavior unchanged — full regression suite passes.
REV review (PR #208 re-review) flagged five doc/comment items in the
cooperative consumer surface:

- Blueprint: register_subconsumer / subscribe_subconsumer signatures
  were missing convert_normal boolean default false.
- finish_batch header: "Calls: None" was stale; the coop_member branch
  calls _clear_member_cursor.
- Legacy next_batch_custom(5) header: still the verbatim PgQ docstring;
  did not mention the new coop_main rejection branch or the EXISTS read
  on pgque.subscription that gates it.
- "this will drop subconsumers too" comment in unregister_consumer was
  stale: the surrounding code already rejects coop_member rows above,
  so the DELETE only ever removes one normal/coop_main row.
- Cooperative next_batch_custom empty-window return: added a comment
  explaining why sub_active is intentionally not refreshed (idle members
  with stale sub_active are not victimizable; touch_subconsumer is the
  explicit heartbeat for idle workers).

Reference docs already showed convert_normal at docs/reference.md:155;
no change needed there.

Source edited; sql/pgque.sql + sql/pgque-tle.sql regenerated via
build/transform.sh. Behavior unchanged — full regression suite passes.
Add regression assertions for seven items REV (PR #208 re-review)
flagged as missing — five blocking, two from the potential list:

- coop_main row state after a member's ack: snapshot the main row before
  any member allocates, then assert sub_batch is null and sub_last_tick
  strictly advanced after the ack-then-finish_batch path. Proves the
  cooperative cursor mechanic moves on member allocation, not on ack.
- unregister_subconsumer idempotency: second call after a successful
  unregister must return 0.
- unregister_subconsumer rejects unsupported batch_handling values
  (anything not in {0, 1}); assert message starts with the expected
  'unsupported batch_handling value' prefix.
- touch_subconsumer on an active (in-batch) member: returns 1, refreshes
  sub_active, leaves sub_batch unchanged. Existing test only covered the
  idle path.
- Direct legacy next_batch(2-arg) and next_batch_custom(5-arg) on a
  coop_main with members must raise the cooperative-form directive. The
  previous test only reached this rejection through receive(); a future
  refactor of receive() could silently break the legacy primitives.
- Direct finish_batch on a coop_member returns 1 and clears the member
  cursor (mirrors the ack() path; finish_batch is part of the public
  PgQ-style primitive surface).
- Fan-out invariant: a normal consumer and an active coop group on the
  same queue both receive the same event independently.

All assertions were green against the existing implementation — these
are characterization tests that lock in current behavior, not bug
fixes. Full regression suite passes.
REV (PR #208 re-review, potential item #8) flagged a misleading error
message: passing a coop_member's full consumer name (e.g. 'main_c.w1')
to the legacy 5-arg pgque.next_batch_custom raised "PgQ corruption:
Consumer ... does not see tick <NULL>" instead of telling the caller to
use the cooperative form.

Root cause: coop_member rows carry sub_last_tick = NULL by design (the
main row owns the cursor). The select-with-LEFT-JOIN to pgque.tick
yields prev_tick_id IS NULL for any member, which trips the generic
prev_tick_id sanity check downstream.

Fix: add an explicit coop_member rejection branch right after the
existing coop_main rejection, with a directive pointing callers at
receive_coop / next_batch (cooperative form). The check is cheap (the
sub_role was already loaded by the preceding select) and runs before
the misleading 'PgQ corruption' branch.

Red/green TDD:
  red — added test E2 to test_cooperative_consumers.sql; verified
        sqlerrm = 'PgQ corruption: Consumer main_c.w1 on queue ...'.
  green — added the rejection branch; sqlerrm now contains
        'cooperative subconsumer'. Full regression suite passes.
REV (PR #208 re-review) flagged several smaller items. Bundled here:

- (#9) unregister_subconsumer hardcoded retry: document that the 60 s
  matches pgque.nack()'s default. Per-queue retry intervals are not
  configurable today; comment notes where to read one if that changes.
- (#13) test_coop_concurrency: extend w1 hold from 2 s -> 3 s and the
  head-start from 0.5 s -> 1 s; raise the wait-time assertion from
  > 1000 ms to > 1500 ms. Original sizes were fragile under load —
  w1's dblink open + receive_coop traversal can exceed 0.5 s on a
  loaded PG matrix runner, which would let w2 grab the lock first
  and make the assertion fail spuriously.
- (#20) document the ev_txid::text::xid8 round-trip cast in
  unregister_subconsumer's force-DLQ branch. The cast is intentional
  (get_batch_events returns ev_txid bigint per the legacy PgQ
  signature; text round-trip widens to xid8 without precision loss);
  pgque.nack() carries the same comment.
- (#24) README role table: link 'experimental cooperative consumer
  functions' to the cooperative-consumers section in the reference
  doc so a reader can discover the API from the table alone.

Source edited; sql/pgque.sql + sql/pgque-tle.sql regenerated via
build/transform.sh. Concurrency test now reports ~2 s wait
(previously ~1.5 s); full regression suite passes.
REV (PR #208 re-review, potential item #19) noted that 817c084's
existing regression in test_coop_ultrareview only partially exercises
the DLQ retention contract: it confirms the DLQ row survives one
member unregister, but not the FULL teardown sequence
(member -> main demote -> main unregister).

Add an explicit two-stage assertion:

  Stage 1: unregister the only member -> main demotes to 'normal'.
           DLQ row MUST survive (the contract 817c084 introduced;
           cooperative DLQ writes are anchored to the persistent
           coop_main co_id, not the ephemeral member co_id).

  Stage 2: unregister_consumer the (now normal) main, deleting the
           consumer row. dl_consumer_id ON DELETE CASCADE then wipes
           the DLQ row. This is the documented behavior for the
           normal consumer path (see sql/pgque-additions/dlq.sql
           comment on the dl_consumer_id FK); cooperative DLQ rows
           must follow the same rule once they reach a normal
           subscription, not invent a separate retention model.

Use queue-unique consumer + subconsumer names. Other blocks in this
file reuse 'main_c' as a shared consumer, which would keep the
consumer row alive in stage 2 and silently mask the cascade.
@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 6, 2026

Functional testing — blueprint-coop-consumers @ d245f0d

Walking the cooperative-consumers feature as a new user against a fresh
Postgres, per the project's PR Lifecycle (CI -> REV -> functional test ->
merge). Tracking PR: #208.

Environment

  • Host: macOS (Darwin 25.1.0)
  • Postgres: postgres:17 Docker image (PostgreSQL 17.9 (Debian 17.9-1.pgdg13+1) on aarch64-unknown-linux-gnu), launched with docker run --rm -d --name pgque-pr208-test -e POSTGRES_PASSWORD=pgque_test -e POSTGRES_DB=pgque_test -p 55444:5432 postgres:17
  • pgque install: bash build/transform.sh then psql ... -f sql/pgque.sql (the same path CI uses)
  • Branch SHA: d245f0d

What was tested

  1. Full regression suite via tests/run_all.sql (includes the four new coop test files)
  2. Install idempotency on a populated DB
  3. Manual end-to-end coop flow against a fresh DB:
    • A. Golden path: register two subconsumers, send + tick events, each member receives a disjoint batch
    • B. finish_batch on a coop_member clears the member token, leaves the main cursor alone
    • C. Legacy 5-arg next_batch_custom rejects a coop_main with active members (the rejection added in the recent fix commit)
    • D. Legacy 5-arg next_batch_custom rejects a coop_member
    • E. receive_coop rejects max_return < 1
    • F. register_subconsumer rejects a subconsumer name containing .
    • H. Stale-worker takeover: backdate w2's sub_active, w1 calls next_batch(..., '1 minute'), w1 inherits w2's tick window
    • I. unregister_subconsumer removes a member with no active batch
    • J. Unregistering the last member converts coop_main back to normal
    • K. Legacy unregister_consumer works after the coop teardown
    • L. drop_queue succeeds once consumers are gone
  4. pgque_reader grants and experimental comments on all eight coop functions

Evidence

Regression suite

$ PGPASSWORD=pgque_test psql -h localhost -p 55444 -U postgres -d pgque_test \
    -v ON_ERROR_STOP=1 -f tests/run_all.sql 2>&1 | tail -6
DO
psql:tests/test_coop_concurrency.sql:165: NOTICE:  w2 receive_coop blocked 2006.9ms on coop_main FOR UPDATE
DO
psql:tests/test_coop_concurrency.sql:170: NOTICE:  PASS: cooperative concurrent allocation serialization
DO
DROP TABLE

=== ALL TESTS PASSED ===

All sections of run_all.sql pass, including test_cooperative_consumers,
test_receive_coop_contracts, test_coop_ultrareview, and
test_coop_concurrency.

Install idempotency

Re-running sql/pgque.sql on the populated DB is a no-op; subsequent
test_install_idempotency.sql reports:

NOTICE:  PASS: queue state verified
NOTICE:  PASS: config singleton verified

Case A — golden path, two members consume disjoint batches

select pgque.create_queue('orders');
select pgque.force_tick('orders'); select pgque.ticker();
select pgque.register_subconsumer('orders','workers','w1');
select pgque.register_subconsumer('orders','workers','w2');

select pgque.send('orders','msg-' || g) from generate_series(1,5) g;
select pgque.force_tick('orders'); select pgque.ticker();
select pgque.send('orders','msg-late-' || g) from generate_series(6,9) g;
select pgque.force_tick('orders'); select pgque.ticker();

select msg_id, payload from pgque.receive_coop('orders','workers','w1');
select msg_id, payload from pgque.receive_coop('orders','workers','w2');
--- w1 receive_coop ---
 msg_id | payload
--------+---------
   2002 | msg-1
   2003 | msg-2
   2004 | msg-3
   2005 | msg-4
   2006 | msg-5

--- w2 receive_coop (next window, disjoint) ---
 msg_id |  payload
--------+------------
   4008 | msg-late-6
   4009 | msg-late-7
   4010 | msg-late-8
   4011 | msg-late-9

--- subscription state ---
  co_name   |  sub_role   | sub_batch | sub_last_tick | sub_next_tick
------------+-------------+-----------+---------------+---------------
 workers.w1 | coop_member |         1 |             2 |             3
 workers.w2 | coop_member |         2 |             3 |             4
 workers    | coop_main   |           |             4 |

w1 and w2 saw fully disjoint event ranges and the main cursor has advanced to
the latest tick (4) — exactly the cooperative contract.

Case B — finish_batch on a coop_member

NOTICE:  finishing batch 1
  co_name   |  sub_role   | sub_batch | sub_last_tick | sub_next_tick
------------+-------------+-----------+---------------+---------------
 workers    | coop_main   |           |             4 |
 workers.w1 | coop_member |           |               |

Member token cleared; main cursor unchanged. Matches the comment block in
receive_coop about the empty-batch release path.

Case C — legacy 5-arg next_batch_custom rejects coop_main

ERROR:  consumer workers on queue orders is a cooperative main consumer; use cooperative receive/next_batch with a subconsumer
CONTEXT:  PL/pgSQL function next_batch_custom(text,text,interval,integer,interval) line 113 at RAISE

Case D — legacy 5-arg next_batch_custom rejects coop_member

ERROR:  consumer workers.w2 on queue orders is a cooperative subconsumer; use receive_coop / next_batch (cooperative form) instead of the legacy 5-arg next_batch_custom
CONTEXT:  PL/pgSQL function next_batch_custom(text,text,interval,integer,interval) line 124 at RAISE

This is the rejection added in commit 676e5a9 (fix(coop): reject coop_member in legacy 5-arg next_batch_custom). It fires before the misleading "PgQ
corruption" path, as the new comment in the source notes.

Case E — receive_coop rejects max_return < 1

ERROR:  pgque.receive_coop: max_return must be >= 1, got 0
CONTEXT:  PL/pgSQL function receive_coop(text,text,text,integer,interval) line 8 at RAISE

Case F — register_subconsumer rejects dotted subconsumer name

ERROR:  cooperative subconsumer name must not contain dot: bad.name
CONTEXT:  PL/pgSQL function _validate_coop_names(text,text,text) line 16 at RAISE
SQL statement "SELECT pgque._validate_coop_names(i_queue, i_consumer, i_subconsumer)"
PL/pgSQL function register_subconsumer(text,text,text,boolean) line 12 at PERFORM

Case H — stale-worker takeover via dead_interval

state BEFORE takeover (w2 owns batch 2, last/next 3/4):
  co_name   |  sub_role   | sub_batch | sub_last_tick | sub_next_tick |          sub_active
------------+-------------+-----------+---------------+---------------+-------------------------------
 workers.w2 | coop_member |         2 |             3 |             4 | 2026-05-06 14:09:32.090223+00

w1 calls next_batch with dead_interval=1m:
 w1_takeover_batch
-------------------
                 3

state AFTER takeover:
  co_name   |  sub_role   | sub_batch | sub_last_tick | sub_next_tick
------------+-------------+-----------+---------------+---------------
 workers.w1 | coop_member |         3 |             3 |             4
 workers.w2 | coop_member |           |               |

w1 takes over w2's tick window (3->4) under a fresh batch token; w2's slot
is cleared so it can be re-acquired safely. This is the dead-worker path the
PR description calls out as a key cooperative invariant.

Cases I, J, K, L — orderly teardown

unregister_subconsumer('orders','workers','w2') -> 1
unregister_subconsumer('orders','workers','w1') -> 1   -- last member
sub_role of 'workers' becomes 'normal'
unregister_consumer('orders','workers')         -> 1   -- normal removal works
drop_queue('orders')                            -> 1

The "last member triggers coop_main -> normal" branch is the one called out
in the implementation; functionally confirmed.

Reader grants + experimental status

         proname         | has_experimental_comment | reader_can_execute
-------------------------+--------------------------+--------------------
 receive_coop            | t                        | t
 register_subconsumer    | t                        | t
 subscribe_subconsumer   | t                        | t
 touch_subconsumer       | t                        | t
 unregister_subconsumer  | t                        | t
 unsubscribe_subconsumer | t                        | t

All eight coop entry points carry the experimental comment and are
executable by pgque_reader, matching the PR's "experimental in 0.2"
requirement.

Verdict

PASS. Full regression suite is green, install is idempotent, and a
manually-driven end-to-end walk through register / receive / finish /
takeover / unregister behaves exactly as the PR description and code
comments promise. Legacy entry points reject coop roles with clear
directives instead of falling through to confusing "PgQ corruption"
errors. Reader grants and experimental comments are all in place.

No regressions or smells observed. From a functional-test standpoint the PR
is good to merge once CI and REV are also clean.

@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 6, 2026

REV Code Review Report

  • PR: NikolayS/pgque#208 — feat: add experimental cooperative consumers
  • Author: @NikolayS
  • Branch: blueprint-coop-consumersmain  •  head d245f0d
  • AI-Assisted: yes
CI Status
✅ all 11 checks pass (PG 14–18, pg_cron, pg_tle, Go/Python/TS clients, verify) green

Re-review on top of the previous REV report posted at 2026-05-06T00:02:56Z. Five follow-up commits since that review:

  • 1841de3 docs(coop): address REV doc/comment gaps
  • ea8dbc2 test(coop): cover blueprint-required branches flagged by REV
  • 676e5a9 fix(coop): reject coop_member in legacy 5-arg next_batch_custom
  • f96e76f chore(coop): address misc REV findings
  • d245f0d test(coop): full DLQ teardown contract (POTENTIAL Implement pgque.receive(), ack(), nack() with DLQ routing #19)

Most prior BLOCKING items are resolved. One residual BLOCKING item, plus several smaller follow-ups in POTENTIAL.


BLOCKING ISSUES (1)

HIGH tests/test_cooperative_consumers.sql:~366-373register_subconsumer(..., convert_normal => true) on an active normal consumer is not tested

The current test at this block calls register_subconsumer('coop_misc', 'normal_active', 'w1') without convert_normal := true — it triggers the "explicit conversion is required" guard at impl line 567–568. The sibling guard at impl line 570–571 — 'cannot convert active normal consumer % on queue % to cooperative main' — fires when convert_normal := true is passed but sub_batch is not null. That branch is not exercised by any test.
Fix: Add a case that (1) registers a normal consumer, (2) sends + ticks so it holds an active batch via receive(), (3) calls register_subconsumer(..., convert_normal := true) while the batch is held, and (4) asserts sqlerrm like 'cannot convert active normal consumer%'. This is a blueprint-required invariant ("fail if the normal consumer has an active batch") and the only impl branch in register_subconsumer without coverage.


Verified resolved from previous review

Confirmed addressed by the new commits:

  • HIGH coop_main row state asserted after ack on coop_member → tests/test_cooperative_consumers.sql:160-177
  • HIGH unregister_subconsumer idempotency → block B, lines 509–518
  • HIGH invalid batch_handling value rejection → block C, lines 520–534
  • HIGH touch_subconsumer on active member → block D, lines 536–577 (asserts sub_batch unchanged + sub_active refreshed)
  • HIGH Direct next_batch / next_batch_custom(5) rejection on coop_main → blocks E and E2, lines 579–629
  • HIGH coop_member rejection in legacy 5-arg next_batch_custom (commit 676e5a9) → block E2, lines 608–629
  • MEDIUM Blueprint signature 4-arg register_subconsumer / subscribe_subconsumer → blueprint lines 173–178, 217–222
  • MEDIUM finish_batch header Calls: None → corrected to pgque._clear_member_cursor (coop_member branch) at cooperative_consumers.sql:409
  • POTENTIAL register_subconsumer / subscribe_subconsumer reference omits convert_normal → fixed at docs/reference.md:155, 165
  • POTENTIAL stale "this will drop subconsumers too" comment → replaced
  • POTENTIAL late-nack tests (batch not found:% + no clobber) → covered
  • POTENTIAL legacy non-coop receive with active coop group on same queue → block G, lines 665–687
  • POTENTIAL direct finish_batch(batch_id) on coop_member → block F, lines 631–663
  • POTENTIAL full DLQ teardown sequence (members → main demoted → main unregistered, DLQ rows preserved) → block H, lines 689–782 (commit d245f0d)
  • LOW README role-table link to reference → fixed at README.md:227
  • Generated files sql/pgque.sql and sql/pgque-tle.sql regenerate cleanly from sources via build/transform.sh (verified byte-for-byte).
  • All 13 SECURITY DEFINER functions in cooperative_consumers.sql pin SET search_path = pgque, pg_catalog.
  • No dynamic SQL / format() injection surface in the new code.
  • event_dead LEFT JOIN routing in dlq.sql:75-85 correctly anchors coop DLQ rows to the persistent coop_main.co_id, preventing FK cascade loss on member unregister.

POTENTIAL ISSUES (10)

Issues with moderate confidence (4–7/10). Review manually — may be false positives.

MEDIUM docs/reference.md:~500 — Legacy next_batch_custom(5) reference entry omits coop_member rejection added in 676e5a9 (confidence: 7/10)

The entry says "raises for coop_main consumers while member rows exist" and the "Cooperative-aware inherited functions" block at line 194 likewise mentions only coop_main. The SQL docstring at cooperative_consumers.sql:224-231 now describes both cases; the public reference does not.
Suggestion: Extend to: "Cooperative-aware: raises if the named consumer is a coop_main with registered members, or if it is a coop_member row — in both cases a directive to use the cooperative form is included in the error message. Normal consumers and coop_main rows with no members pass through."

MEDIUM tests/test_cooperative_consumers.sql:~160-178coop_main.sub_next_tick invariant not asserted (confidence: 6/10)

The post-ack assertions on coop_main cover sub_batch is null and sub_last_tick advanced, but not sub_next_tick is null. Blueprint data-model invariant (line 107 of COOPERATIVE_CONSUMERS.md) states subscription.sub_next_tick: null when idle for the main row, and impl line 882 explicitly clears it. A future refactor that accidentally leaves sub_next_tick populated on the main row would not fail any test.
Suggestion: Add assert v_main_after.sub_next_tick is null, 'coop_main sub_next_tick must always be null' inside the existing v_main_after assertion block (~line 173).

MEDIUM tests/test_cooperative_consumers.sql:~579-629 — 2-arg next_batch / receive on a dotted member name not tested (confidence: 6/10)

Block E covers next_batch('queue', 'main_c') (2-arg, coop_main). Block E2 covers next_batch_custom(5-arg, 'main_c.w1') (coop_member). The 2-arg next_batch('queue', 'main_c.w1') and receive('queue', 'main_c.w1', 10) paths chain through next_batch_infonext_batch_custom(5) (which now has the rejection guard), but those entry points are never directly exercised with a dotted name. The rejection chain is implicitly trusted rather than asserted.
Suggestion: Add a small E3 block with two negative cases — pgque.next_batch('queue', 'main_c.w1') and pgque.receive('queue', 'main_c.w1', 10) — each expecting an error matching '%cooperative subconsumer%'.

LOW sql/pgque-api/cooperative_consumers.sql:~44unregister_consumer "Tables directly manipulated" header is stale (confidence: 6/10)

The cooperative override added by this PR also performs update - pgque.subscription (sets sub_role = 'normal' when the last coop_member is removed) and delete - pgque.consumer (was present in original but absent from the cooperative override's header). The header is now more out-of-date than the original it replaced.
Suggestion: Add update - pgque.subscription and delete - pgque.consumer to the "Tables directly manipulated" list at line 44–46.

LOW tests/test_cooperative_consumers.sql:~395-425 — Stale-takeover-ignores-idle test does not verify w1's sub_active is not refreshed (confidence: 5/10)

The test asserts only that next_batch returns null for an idle member with stale sub_active. Impl intentionally skips refreshing sub_active on an empty tick window (impl comment ~line 863-870). If that behavior regresses (e.g., by adding an unconditional sub_active = clock_timestamp() update), the test would still pass.
Suggestion: After assert no_batch is null, re-read w1's sub_active and assert it has not changed — locks in "idle members with stale sub_active are not refreshed by failed stale-takeover scans".

LOW docs/reference.md:~167receive_coop() does not document the implicit auto-registration (confidence: 5/10)

receive_coop() (via next_batch_custom(7)register_subconsumer) implicitly registers the consumer/subconsumer if absent. Tested at test_cooperative_consumers.sql:100-101 ("receive_coop should auto-create coop_member") and specified in blueprint line 272. New users reading only the reference will not know register_subconsumer is optional before receive_coop.
Suggestion: Add: "If the logical consumer or subconsumer is not yet registered, receive_coop() registers them automatically. Use register_subconsumer explicitly if you need to set the convert_normal flag."

LOW tests/test_coop_concurrency.sql:~48-67 — Wall-clock timing test is inherently non-deterministic (confidence: 4/10)

Uses pg_sleep(1) head-start, pg_sleep(3) lock-hold, and asserts v_w2_wait_ms > 1500. Acknowledged in test comments. There is a narrow window where w2's receive_coop could execute before w1 acquires its lock. The dblink_is_busy polling already mitigates this, but a post-loop check that w1's row exists in coop_concurrency_results before asserting w2's wait would close the gap.
Suggestion: Add an explicit pre-check that the w1 result row was inserted before measuring w2's wait time.

LOW (missing — docs/tutorial.md, docs/examples.md) — Blueprint plan calls for a "Fan-out vs cooperative consumers" section; PR ships none (confidence: 4/10)

Blueprint section "Documentation plan" (~lines 563–566) lists docs/tutorial.md and docs/examples.md as required updates. Carried over from the prior review and still open.
Suggestion: Either add a minimal cooperative-consumers section (one paragraph + pointer to reference is enough) or note the deferral explicitly in the blueprint change-log.

LOW (missing — clients/{go,python,typescript}/README.md) — Blueprint plan lists client READMEs as required updates (confidence: 4/10)

None of the three client READMEs mention cooperative consumers or experimental status. Blueprint requires "experimental everywhere".
Suggestion: Add a one-paragraph "Experimental: cooperative consumers" note linking to docs/reference.md in each client README, or open follow-up issues and acknowledge deferral in the PR description.

INFO sql/pgque-api/cooperative_consumers.sql:~43unregister_consumer doc-header Calls: None is technically correct but understates DML (confidence: 6/10)

The body has multiple perform 1 from pgque.subscription where ... reads. perform of a scalar expression is not a function call, so "Calls: None" is correct under a strict reading, but the header could mislead a maintainer scanning for dependencies.
Suggestion: Clarify to Calls: None (direct DML only), or document the project convention that perform <expr> is excluded from the Calls list.


Summary

Area Findings Potential Filtered
CI/Pipeline 0 0 0
Security 0 0 0
Bugs 0 0 0
Tests 1 4 0
Guidelines 0 1 0
Docs 0 5 0
Metadata 0 0 0
  • Findings: high-confidence (8–10/10) — blocking or non-blocking per severity
  • Potential: medium-confidence (4–7/10) — review manually, may be false positives
  • Filtered: low-confidence (0–3/10) — excluded as likely false positives

SOC2 checks skipped per repo policy (CLAUDE.md §"PR Lifecycle": "Ignore SOC2 findings; this repo does not need them").

The cooperative SQL core, batch allocation serialization, stale-takeover token model, DLQ rerouting, lock order, and the auto-create flow were re-reviewed for security and bug regressions; no high-confidence issues found. The only remaining BLOCKING item is one missing test branch — the convert_normal := true + active-batch failure path in register_subconsumer. Everything else is doc polish and test depth that can be merged or deferred at the author's discretion.


REV-assisted review (AI analysis by postgres-ai/rev)

NikolayS and others added 2 commits May 6, 2026 07:53
- BLOCKING (REV): exercise register_subconsumer with i_convert_normal:=true
  on an active normal consumer (sub_batch is not null), asserting the
  'cannot convert active normal consumer ...' guard. Previously only the
  no-opt-in branch was covered.
- Assert coop_main.sub_next_tick is null post-ack (blueprint invariant
  was untested; a future change leaving sub_next_tick set on the main
  row would have passed silently).
- Add E3 block: 2-arg next_batch / receive on a dotted member name
  must surface the 'cooperative subconsumer' directive (the entry
  points users actually call; E2 only tested next_batch_custom direct).
- Stale-takeover-ignores-idle: capture w1's sub_active before and after
  the failed takeover scan and assert it is unchanged (locks in that
  empty tick windows do not refresh idle-member heartbeats).
- Concurrency test: pre-check that w1 is past its FOR UPDATE on
  coop_main and currently inside pg_sleep(3) before measuring w2.
  Polls pg_stat_activity for wait_event='PgSleep' (MVCC visibility
  prevents polling coop_concurrency_results, since w1's INSERT is
  held in the same transaction during pg_sleep).

All four coop test files green against PG 17.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- reference.md: extend the legacy 5-arg next_batch_custom entry to
  document coop_member rejection (not just coop_main with members);
  the SQL docstring already covered both, but the public reference
  had drifted.
- reference.md: extend the 'Cooperative-aware inherited functions'
  paragraph with the same coop_member directive — the entry points
  users actually call (next_batch / receive on a dotted member) now
  reflect the dual-rejection contract.
- reference.md: document receive_coop's implicit auto-registration
  of consumer + subconsumer rows, and clarify that explicit
  register_subconsumer is only needed when convert_normal is set.
- cooperative_consumers.sql: update the unregister_consumer doc-header
  to include the cooperative override's full DML surface (update -
  pgque.subscription on last-member demotion, delete - pgque.consumer
  on no-subscriptions). Clarify 'Calls: None' as 'None (direct DML
  only)' to match maintainer expectations.

Generated install scripts (sql/pgque.sql, sql/pgque-tle.sql)
regenerate cleanly via build/transform.sh; only the doc-header lines
in unregister_consumer change.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 6, 2026

Address REV follow-ups

Addressing #208 (comment).

Two new commits on blueprint-coop-consumers (head 07ff39b):

  • 2b2ecb9 test(coop): cover remaining REV-flagged branches
  • 07ff39b docs(coop): refine reference and unregister_consumer header

Resolved

BLOCKING

  • register_subconsumer(i_convert_normal := true) on active normal consumer — added at tests/test_cooperative_consumers.sql block "10/11/13/18/21/23". The do-block now exercises both guards in cooperative_consumers.sql:

    1. 'consumer % on queue % is already a normal consumer; explicit conversion is required' (impl line 567–568, no opt-in) — already covered, now explicitly asserted on sqlerrm.
    2. 'cannot convert active normal consumer % on queue % to cooperative main' (impl line 570–571, opt-in given but sub_batch is not null) — new coverage, this was the missing branch.

    The named-arg call uses i_convert_normal => true since the impl declares i_convert_normal boolean default false.

POTENTIAL — addressed

  • MEDIUM next_batch_custom(5) reference doc omits coop_member rejection — extended docs/reference.md line 500 to mention both rejection cases (coop_main with members, coop_member row) and the directive in the error message. Also updated the "Cooperative-aware inherited functions" paragraph (line 194) so the entry-point story matches the SQL docstring at cooperative_consumers.sql:224-231.
  • MEDIUM coop_main.sub_next_tick invariant not asserted — added assert v_main_after.sub_next_tick is null to the existing post-ack invariant block at tests/test_cooperative_consumers.sql:~178. Locks in the blueprint's "main row carries only the group cursor" invariant.
  • MEDIUM 2-arg next_batch / receive on dotted member name not directly tested — added block E3 at tests/test_cooperative_consumers.sql:~632. Asserts pgque.next_batch('q', 'main_c.w1') and pgque.receive('q', 'main_c.w1', 10) both surface %cooperative subconsumer%.
  • LOW unregister_consumer "Tables directly manipulated" header is stale — updated sql/pgque-api/cooperative_consumers.sql doc-header to list update - pgque.subscription (last-member demotion) and delete - pgque.consumer (when no subscriptions remain). Also clarified Calls: None to Calls: None (direct DML only) per the INFO finding.
  • LOW Stale-takeover-ignores-idle does not verify w1's sub_active is unchanged — captured v_w1_sub_active_before via update ... returning and re-read after the failed next_batch to assert equality. Catches a regression that would unconditionally refresh idle members.
  • LOW receive_coop() does not document auto-registration — added an "Auto-registration" paragraph to docs/reference.md receive_coop entry, calling out that explicit register_subconsumer is only needed when convert_normal is required.
  • LOW Wall-clock timing test is non-deterministic — added a pg_stat_activity poll that confirms w1 is in wait_event = 'PgSleep' (i.e., past the FOR UPDATE and inside pg_sleep(3)) before measuring w2's wait. Polling coop_concurrency_results directly does not work — w1's INSERT is held inside the same dblink transaction during the sleep, so it is invisible via MVCC until commit; the test now documents that and uses pg_stat_activity instead.
  • INFO unregister_consumer "Calls: None" wording — clarified to Calls: None (direct DML only) to flag the perform 1 from ... reads to maintainers scanning for dependencies.

Deferred (with rationale)

  • LOW Blueprint plan calls for "Fan-out vs cooperative consumers" sections in docs/tutorial.md / docs/examples.md — deferred. The feature is experimental, called out as such at docs/reference.md:151. Adding tutorial/examples copy ahead of API stabilization risks promoting a surface that can still shift. Will land alongside the move to stable in a separate PR.
  • LOW Client READMEs (clients/{go,python,typescript}/README.md) do not mention cooperative consumers — deferred for the same reason: cooperative is currently a SQL-only experimental surface; client API shape is not finalized and will be drafted alongside the stabilization PR.

Verification

Reinstalled sql/pgque.sql into a fresh PG 17 container and ran tests/run_all.sql:

=== ALL TESTS PASSED ===

tests/test_cooperative_consumers.sql standalone:

NOTICE:  PASS: cooperative consumer SQL-core semantics

tests/test_coop_concurrency.sql standalone (the FOR UPDATE serialization proof):

NOTICE:  w2 receive_coop blocked 1899.6ms on coop_main FOR UPDATE
NOTICE:  PASS: cooperative concurrent allocation serialization

Generated install scripts regenerate cleanly via bash build/transform.sh; the only diff from sources is the matching doc-header change in unregister_consumer.

@NikolayS
Copy link
Copy Markdown
Owner Author

NikolayS commented May 6, 2026

REV Code Review Report

  • PR: NikolayS/pgque#208 — feat: add experimental cooperative consumers
  • Author: @NikolayS
  • Branch: blueprint-coop-consumersmain  •  head 07ff39b
  • AI-Assisted: yes
CI Status
✅ all 11 checks pass (PG 14–18, pg_cron, pg_tle, Go/Python/TS clients, verify) green

Re-review on top of the previous REV report posted at 2026-05-06T14:27:19Z. Two follow-up commits since that review:

  • 2b2ecb9 test(coop): cover remaining REV-flagged branches
  • 07ff39b docs(coop): refine reference and unregister_consumer header

Result: PASSED — no new BLOCKING, NON-BLOCKING, or POTENTIAL issues found. The previous BLOCKING item and all addressable POTENTIAL items are resolved. The two LOW items the author marked as deferred (tutorial/examples copy and client READMEs for cooperative consumers) have explicit, reasonable rationale (feature is experimental; SQL surface only in 0.2; client API not yet finalized).


Verified resolved from previous review

BLOCKING (1)

  • HIGH register_subconsumer(i_convert_normal := true) on active normal consumer — new test at tests/test_cooperative_consumers.sql:392-399 exercises the 'cannot convert active normal consumer ...' guard at impl cooperative_consumers.sql:572-573. Setup correctly leaves sub_batch is not null (register_consumer → send → force_tick → ticker → receive) before the failing call. The earlier no-opt-in branch is now also explicitly asserted on sqlerrm.

POTENTIAL (10)

  • MEDIUM next_batch_custom(5) reference doc dual-rejection — docs/reference.md:502 extended to cover both coop_main (with members) and coop_member rejection cases plus directive in error message; "Cooperative-aware inherited functions" paragraph at docs/reference.md:196 updated to match.
  • MEDIUM coop_main.sub_next_tick invariant assertion — added at tests/test_cooperative_consumers.sql:177-178. Locks in the blueprint "main row carries only the group cursor" invariant that was previously untested.
  • MEDIUM 2-arg next_batch / receive on dotted member name — new E3 block at tests/test_cooperative_consumers.sql:678-705 asserts pgque.next_batch('q', 'main_c.w1') and pgque.receive('q', 'main_c.w1', 10) both surface the %cooperative subconsumer% directive (the entry points users actually call).
  • LOW unregister_consumer "Tables directly manipulated" header — updated at cooperative_consumers.sql:43-46 to list update - pgque.subscription (last-member demotion) and delete - pgque.consumer (no subscriptions remain). Calls: None clarified to Calls: None (direct DML only) per the INFO finding.
  • LOW Stale-takeover-ignores-idle does not verify w1's sub_active is unchanged — captured via update ... returning at tests/test_cooperative_consumers.sql:445 and re-read after the failed next_batch at lines 456-468 with exact-equality assertion. Catches a regression that would unconditionally refresh idle members.
  • LOW receive_coop() does not document auto-registration — new "Auto-registration" paragraph at docs/reference.md:172 clarifies that explicit register_subconsumer is only needed when convert_normal is required.
  • LOW Wall-clock timing test was non-deterministic — tests/test_coop_concurrency.sql:80-109 adds a pg_stat_activity poll for wait_event = 'PgSleep' (replacing an unworkable coop_concurrency_results poll, since w1's INSERT is held inside the same dblink transaction during pg_sleep and is invisible via MVCC until commit). The pid <> pg_backend_pid() filter and the query like '%coop_concurrent_alloc%' predicate together correctly identify the dblink backend; assertion 'w1 did not enter pg_sleep within 3s; serialization test cannot proceed' fails loudly on regression rather than silently passing.
  • LOW Tutorial / examples copy for cooperative consumers — deferred with rationale: feature is experimental in 0.2 and stabilization may shift the surface; tutorial/examples will land alongside the move to stable.
  • LOW Client READMEs do not mention cooperative consumers — deferred with rationale: cooperative is a SQL-only experimental surface in 0.2; client API shape not finalized.
  • INFO unregister_consumer "Calls: None" wording — clarified to Calls: None (direct DML only) to flag the perform 1 from ... reads to maintainers scanning for dependencies.

Summary

Area Findings Potential Filtered
CI/Pipeline 0 0 0
Security 0 0 0
Bugs 0 0 0
Tests 0 0 0
Guidelines 0 0 0
Docs 0 0 0
Metadata 0 0 0
  • Findings: high-confidence (8–10/10) — blocking or non-blocking per severity
  • Potential: medium-confidence (4–7/10) — review manually, may be false positives
  • Filtered: low-confidence (0–3/10) — excluded as likely false positives

SOC2 checks skipped per repo policy (CLAUDE.md §"PR Lifecycle": "Ignore SOC2 findings; this repo does not need them").

The cooperative SQL core, batch allocation serialization, stale-takeover token model, DLQ rerouting, lock order, auto-create flow, and the new test branches were re-reviewed for security, bug, test-coverage, doc-accuracy, and style regressions. Nothing surfaced. Generated sql/pgque.sql and sql/pgque-tle.sql deltas mirror the source change in cooperative_consumers.sql, consistent with build/transform.sh. From REV's standpoint this PR is clear to merge.


REV-assisted review (AI analysis by postgres-ai/rev)

@NikolayS NikolayS merged commit 1cb6ff7 into main May 6, 2026
21 of 22 checks passed
@NikolayS NikolayS deleted the blueprint-coop-consumers branch May 6, 2026 18:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant