feat: add experimental cooperative consumers#208
Conversation
|
Review notes after the latest SQL/style pass ( No confirmed release-blocking regression. The previous A couple of things I’d still tighten before freezing the SQL API / starting client-library work:
Checks I saw green / clean:
I’d treat #211 client-library audit as still blocked until this PR is merged/deferred and the SQL API semantics are frozen. |
- Drop byte-identical pgque.receive/ack/nack overrides; the canonical definitions in receive.sql already load before this file. - Collapse the two pgque.next_batch overloads into one with i_dead_interval defaulting to NULL; receive_coop dispatch follows. - Extract pgque._clear_member_cursor for the 4-field cursor-reset UPDATE used by finish_batch (coop_member), next_batch_custom victim-clear, and unregister_subconsumer. Net -138 lines in cooperative_consumers.sql; full regression + acceptance suites pass against PG 18. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Mirror the receive() #103 fix into receive_coop(): - Validate i_max_return >= 1 (parity with receive()'s guard). - finish_batch the active batch when zero rows are yielded. Without this, a coop_member that opens a batch over an empty tick window leaves sub_batch set; subsequent receive_coop() calls short-circuit on the stale token and the subconsumer never advances, even after fresh events arrive. Red/green TDD: tests/test_receive_coop_contracts.sql reproduces both defects (max_return=0 silently accepted; sub_batch wedged after empty window) on the unfixed code, and passes after the change. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Apply the SQL style rule from the upstream style commit to code introduced on this branch: pgque._clear_member_cursor's UPDATE and the test_receive_coop_contracts SELECT use root keywords flush at the statement indent and continuation `and` indented under `where`. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
REV Code Review Report
BLOCKING ISSUES (8)HIGH
HIGH
HIGH
HIGH
HIGH
MEDIUM
MEDIUM
MEDIUM
NON-BLOCKING (4)INFO
INFO
INFO
LOW
POTENTIAL ISSUES (12)HIGH
MEDIUM
MEDIUM
MEDIUM
MEDIUM
MEDIUM
LOW
LOW
LOW
LOW
LOW
LOW No
Summary
Highlights:
Strong points: all SECURITY DEFINER functions correctly pin REV-assisted review (AI analysis by postgres-ai/rev) |
|
Re-reviewed latest head Previous findings look resolved:
Checks:
No release-blocking issues confirmed from my side now.
|
Ultrareview findingsThree findings, all in bug_003 — Cooperative
|
|
Re-reviewed latest head No release-blocking issues found from my side. Evidence:
The latest diff appears to cover the ultrareview fixes:
|
REV Code Review Report
BLOCKING ISSUES (8)Issues that should be addressed before merge (high-confidence HIGH/MEDIUM severity). HIGH
HIGH
HIGH
HIGH
HIGH
MEDIUM
MEDIUM
MEDIUM
NON-BLOCKING (3)Style violations against CLAUDE.md rules. Worth fixing but don't block correctness. INFO
INFO
INFO
POTENTIAL ISSUES (15)Medium-confidence issues (4-7/10). Review manually — may be false positives. MEDIUM
MEDIUM
MEDIUM
MEDIUM
MEDIUM
MEDIUM
MEDIUM
MEDIUM
LOW
LOW
LOW
LOW
LOW
LOW
INFO
Summary
Note:
SOC2 checks skipped per repo policy (CLAUDE.md: "Ignore SOC2 findings; this repo does not need them"). The cooperative-consumer SQL core, batch allocation serialization, stale-takeover token model, and REV-assisted review (AI analysis by postgres-ai/rev) |
|
Re-reviewed latest head No new release-blocking issue found since This update appears to be SQL/test style reformatting across:
Checks:
I see no new blocker from this reformat. |
Ultrareview pass 2 — escalations + new findingsSecond pass corroborated all three findings from the previous comment. One escalation and one new style finding. merged_bug_001 — escalates bug_001 from "NULL ev_txid" to silent DLQ data loss (normal)Location: The forced-unregister DLQ branch does not just lose
Right after the loop, After commit: no Why the test suite misses it: the only forced-unregister test exercising the DLQ branch is Reproduction:
Fix options:
The bug_004 — bug_002 leak is broader than first reported (nit)Location: The first comment flagged the IF branch (single coop_member). The ELSE branch is also affected and leaks more rows per call: When Reproduction: select pgque.create_queue('q');
select pgque.register_subconsumer('q', 'billing', 'w1');
select pgque.register_subconsumer('q', 'billing', 'w2');
select pgque.unregister_consumer('q', 'billing');
select co_name from pgque.consumer where co_name like 'billing%';
-- still returns billing.w1 and billing.w2Fix: before the bulk subscription delete, collect the bug_005 — left-aligned SQL keyword rule violated in same PR that introduced it (nit)Location: The PR's The earlier Pure formatting; re-flow bug_009 — corroborates bug_003The ambiguous |
event_dead() resolved dl_consumer_id via subscription.sub_batch, which in the cooperative path matches the coop_member row. unregister_subconsumer then deleted the member subscription and the orphan member consumer row; dead_letter.dl_consumer_id has on delete cascade, so the DLQ row inserted inside the same transaction was silently wiped before commit. Route cooperative DLQ writes to the coop_main co_id (the persistent consumer-group identity); normal subscriptions are unchanged. Add a red/green regression in test_coop_ultrareview using unique consumer and subconsumer names so the member co_id is not shared with sibling tests (which is what was averting the cascade in the existing bug_001 test). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The previous test asserted distinct batch_ids and no duplicate events but did not prove the second worker actually blocked on the coop_main FOR UPDATE -- a regression that silently dropped the lock could have passed on a fast machine where worker-1 simply finished first. Worker-1 now holds for 2s after a 0.5s head start; worker-2 measures its own wait via clock_timestamp() and asserts it exceeded 1000ms. Negative test: shortening the hold to 0.1s reduces the observed wait to <10ms and fires the assertion. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…h_batch - Late-nack tests in stale-takeover and forced-unregister scenarios now capture and assert the specific "batch not found" message instead of swallowing any exception, so an unrelated error from nack() would fail the test rather than slip through. The stale-takeover case also re-reads the new owner's sub_batch after the rejection to prove no clobber. - Add a redelivery proof for cooperative nack: send + receive + nack with retry_after = 0, then commit, run maint_retry_events + tick + receive in separate top-level statements, and assert the same msg_id returns with an incremented retry_count. The existing test only checked retry_queue insertion -- necessary but not sufficient. - Add a corruption-guard test for finish_batch on a coop_main row: force sub_batch to a synthetic value via direct UPDATE and assert finish_batch raises the expected message. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Add an "empty tick windows are auto-finished" note: receive_coop()
closes empty batches internally, so callers polling a quiet queue do
not need to ack a batch_id. This is a meaningful divergence from
receive() and was previously undocumented.
- Add a throughput note explaining that cooperative allocation
serializes on a FOR UPDATE of the coop_main row, so undersized
batches and short tick periods make the main row a hotspot.
- Drop the "in PgQue 0.2" version tag from the experimental marker per
the docs rule ("no concrete version tags in README/docs").
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- next_batch_custom(5) override's docblock listed `pgque.insert_event_raw(11)` under "Calls:", carried verbatim from the upstream PgQ body. The cooperative override actually calls `pgque.find_tick_helper`. Update accordingly. - Add a round-trip smoke test for `subscribe_subconsumer` / `unsubscribe_subconsumer`. The aliases are documented public API but no test called them, so a future signature or grant drift would have been invisible. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Status update — addressed five items from the latest reviews on top of
CI: green across the chain. Local checks (PG 18): full Deferred with rationale
|
|
Re-reviewed latest head No release-blocking issues found from my side. The latest diff appears to address prior review concerns:
Checks/evidence:
|
REV Code Review Report
This is a re-review on top of the previous REV report posted at 22:44:59Z. Focus is on items not yet resolved by commits BLOCKING ISSUES (7)Issues that should be addressed before merge (high-confidence MEDIUM/HIGH). HIGH
HIGH
HIGH
HIGH
HIGH
MEDIUM
MEDIUM
POTENTIAL ISSUES (21)Issues with moderate confidence (4–7/10). Review manually — may be false positives. HIGH
HIGH
HIGH
MEDIUM
MEDIUM
MEDIUM
MEDIUM
MEDIUM
MEDIUM
MEDIUM
MEDIUM
MEDIUM
MEDIUM
LOW
LOW
LOW
LOW (missing —
LOW
LOW
INFO
INFO PR description — "Tests so far: Docs-only first step" is stale (confidence: 4/10)
Summary
SOC2 checks skipped per repo policy (CLAUDE.md: "Ignore SOC2 findings; this repo does not need them"). The cooperative SQL core, batch allocation serialization, stale-takeover token model, DLQ rerouting, and the REV-assisted review (AI analysis by postgres-ai/rev) |
Style guide says "Root keywords on their own line (except with single argument)". The previous a17b5ca pass caught scalar selects but missed the `if not exists (select 1 from t where ...)`, `perform 1 from t where ...`, `select * into v from t where <single>`, single-target `set`, single-condition `where`, and bare-from-with-function-call patterns inside cooperative_consumers.sql. Source edited; sql/pgque.sql and sql/pgque-tle.sql regenerated via build/transform.sh. Behavior unchanged — full regression suite passes.
REV review (PR #208 re-review) flagged five doc/comment items in the cooperative consumer surface: - Blueprint: register_subconsumer / subscribe_subconsumer signatures were missing convert_normal boolean default false. - finish_batch header: "Calls: None" was stale; the coop_member branch calls _clear_member_cursor. - Legacy next_batch_custom(5) header: still the verbatim PgQ docstring; did not mention the new coop_main rejection branch or the EXISTS read on pgque.subscription that gates it. - "this will drop subconsumers too" comment in unregister_consumer was stale: the surrounding code already rejects coop_member rows above, so the DELETE only ever removes one normal/coop_main row. - Cooperative next_batch_custom empty-window return: added a comment explaining why sub_active is intentionally not refreshed (idle members with stale sub_active are not victimizable; touch_subconsumer is the explicit heartbeat for idle workers). Reference docs already showed convert_normal at docs/reference.md:155; no change needed there. Source edited; sql/pgque.sql + sql/pgque-tle.sql regenerated via build/transform.sh. Behavior unchanged — full regression suite passes.
Add regression assertions for seven items REV (PR #208 re-review) flagged as missing — five blocking, two from the potential list: - coop_main row state after a member's ack: snapshot the main row before any member allocates, then assert sub_batch is null and sub_last_tick strictly advanced after the ack-then-finish_batch path. Proves the cooperative cursor mechanic moves on member allocation, not on ack. - unregister_subconsumer idempotency: second call after a successful unregister must return 0. - unregister_subconsumer rejects unsupported batch_handling values (anything not in {0, 1}); assert message starts with the expected 'unsupported batch_handling value' prefix. - touch_subconsumer on an active (in-batch) member: returns 1, refreshes sub_active, leaves sub_batch unchanged. Existing test only covered the idle path. - Direct legacy next_batch(2-arg) and next_batch_custom(5-arg) on a coop_main with members must raise the cooperative-form directive. The previous test only reached this rejection through receive(); a future refactor of receive() could silently break the legacy primitives. - Direct finish_batch on a coop_member returns 1 and clears the member cursor (mirrors the ack() path; finish_batch is part of the public PgQ-style primitive surface). - Fan-out invariant: a normal consumer and an active coop group on the same queue both receive the same event independently. All assertions were green against the existing implementation — these are characterization tests that lock in current behavior, not bug fixes. Full regression suite passes.
REV (PR #208 re-review, potential item #8) flagged a misleading error message: passing a coop_member's full consumer name (e.g. 'main_c.w1') to the legacy 5-arg pgque.next_batch_custom raised "PgQ corruption: Consumer ... does not see tick <NULL>" instead of telling the caller to use the cooperative form. Root cause: coop_member rows carry sub_last_tick = NULL by design (the main row owns the cursor). The select-with-LEFT-JOIN to pgque.tick yields prev_tick_id IS NULL for any member, which trips the generic prev_tick_id sanity check downstream. Fix: add an explicit coop_member rejection branch right after the existing coop_main rejection, with a directive pointing callers at receive_coop / next_batch (cooperative form). The check is cheap (the sub_role was already loaded by the preceding select) and runs before the misleading 'PgQ corruption' branch. Red/green TDD: red — added test E2 to test_cooperative_consumers.sql; verified sqlerrm = 'PgQ corruption: Consumer main_c.w1 on queue ...'. green — added the rejection branch; sqlerrm now contains 'cooperative subconsumer'. Full regression suite passes.
REV (PR #208 re-review) flagged several smaller items. Bundled here: - (#9) unregister_subconsumer hardcoded retry: document that the 60 s matches pgque.nack()'s default. Per-queue retry intervals are not configurable today; comment notes where to read one if that changes. - (#13) test_coop_concurrency: extend w1 hold from 2 s -> 3 s and the head-start from 0.5 s -> 1 s; raise the wait-time assertion from > 1000 ms to > 1500 ms. Original sizes were fragile under load — w1's dblink open + receive_coop traversal can exceed 0.5 s on a loaded PG matrix runner, which would let w2 grab the lock first and make the assertion fail spuriously. - (#20) document the ev_txid::text::xid8 round-trip cast in unregister_subconsumer's force-DLQ branch. The cast is intentional (get_batch_events returns ev_txid bigint per the legacy PgQ signature; text round-trip widens to xid8 without precision loss); pgque.nack() carries the same comment. - (#24) README role table: link 'experimental cooperative consumer functions' to the cooperative-consumers section in the reference doc so a reader can discover the API from the table alone. Source edited; sql/pgque.sql + sql/pgque-tle.sql regenerated via build/transform.sh. Concurrency test now reports ~2 s wait (previously ~1.5 s); full regression suite passes.
REV (PR #208 re-review, potential item #19) noted that 817c084's existing regression in test_coop_ultrareview only partially exercises the DLQ retention contract: it confirms the DLQ row survives one member unregister, but not the FULL teardown sequence (member -> main demote -> main unregister). Add an explicit two-stage assertion: Stage 1: unregister the only member -> main demotes to 'normal'. DLQ row MUST survive (the contract 817c084 introduced; cooperative DLQ writes are anchored to the persistent coop_main co_id, not the ephemeral member co_id). Stage 2: unregister_consumer the (now normal) main, deleting the consumer row. dl_consumer_id ON DELETE CASCADE then wipes the DLQ row. This is the documented behavior for the normal consumer path (see sql/pgque-additions/dlq.sql comment on the dl_consumer_id FK); cooperative DLQ rows must follow the same rule once they reach a normal subscription, not invent a separate retention model. Use queue-unique consumer + subconsumer names. Other blocks in this file reuse 'main_c' as a shared consumer, which would keep the consumer row alive in stage 2 and silently mask the cascade.
Functional testing —
|
REV Code Review Report
Re-review on top of the previous REV report posted at
Most prior BLOCKING items are resolved. One residual BLOCKING item, plus several smaller follow-ups in POTENTIAL. BLOCKING ISSUES (1)HIGH
Verified resolved from previous reviewConfirmed addressed by the new commits:
POTENTIAL ISSUES (10)Issues with moderate confidence (4–7/10). Review manually — may be false positives. MEDIUM
MEDIUM
MEDIUM
LOW
LOW
LOW
LOW
LOW (missing —
LOW (missing —
INFO
Summary
SOC2 checks skipped per repo policy (CLAUDE.md §"PR Lifecycle": "Ignore SOC2 findings; this repo does not need them"). The cooperative SQL core, batch allocation serialization, stale-takeover token model, DLQ rerouting, lock order, and the auto-create flow were re-reviewed for security and bug regressions; no high-confidence issues found. The only remaining BLOCKING item is one missing test branch — the REV-assisted review (AI analysis by postgres-ai/rev) |
- BLOCKING (REV): exercise register_subconsumer with i_convert_normal:=true on an active normal consumer (sub_batch is not null), asserting the 'cannot convert active normal consumer ...' guard. Previously only the no-opt-in branch was covered. - Assert coop_main.sub_next_tick is null post-ack (blueprint invariant was untested; a future change leaving sub_next_tick set on the main row would have passed silently). - Add E3 block: 2-arg next_batch / receive on a dotted member name must surface the 'cooperative subconsumer' directive (the entry points users actually call; E2 only tested next_batch_custom direct). - Stale-takeover-ignores-idle: capture w1's sub_active before and after the failed takeover scan and assert it is unchanged (locks in that empty tick windows do not refresh idle-member heartbeats). - Concurrency test: pre-check that w1 is past its FOR UPDATE on coop_main and currently inside pg_sleep(3) before measuring w2. Polls pg_stat_activity for wait_event='PgSleep' (MVCC visibility prevents polling coop_concurrency_results, since w1's INSERT is held in the same transaction during pg_sleep). All four coop test files green against PG 17. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- reference.md: extend the legacy 5-arg next_batch_custom entry to document coop_member rejection (not just coop_main with members); the SQL docstring already covered both, but the public reference had drifted. - reference.md: extend the 'Cooperative-aware inherited functions' paragraph with the same coop_member directive — the entry points users actually call (next_batch / receive on a dotted member) now reflect the dual-rejection contract. - reference.md: document receive_coop's implicit auto-registration of consumer + subconsumer rows, and clarify that explicit register_subconsumer is only needed when convert_normal is set. - cooperative_consumers.sql: update the unregister_consumer doc-header to include the cooperative override's full DML surface (update - pgque.subscription on last-member demotion, delete - pgque.consumer on no-subscriptions). Clarify 'Calls: None' as 'None (direct DML only)' to match maintainer expectations. Generated install scripts (sql/pgque.sql, sql/pgque-tle.sql) regenerate cleanly via build/transform.sh; only the doc-header lines in unregister_consumer change. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Address REV follow-upsAddressing #208 (comment). Two new commits on
ResolvedBLOCKING
POTENTIAL — addressed
Deferred (with rationale)
VerificationReinstalled
Generated install scripts regenerate cleanly via |
REV Code Review Report
Re-review on top of the previous REV report posted at
Result: PASSED — no new BLOCKING, NON-BLOCKING, or POTENTIAL issues found. The previous BLOCKING item and all addressable POTENTIAL items are resolved. The two LOW items the author marked as deferred (tutorial/examples copy and client READMEs for cooperative consumers) have explicit, reasonable rationale (feature is experimental; SQL surface only in 0.2; client API not yet finalized). Verified resolved from previous reviewBLOCKING (1)
POTENTIAL (10)
Summary
SOC2 checks skipped per repo policy (CLAUDE.md §"PR Lifecycle": "Ignore SOC2 findings; this repo does not need them"). The cooperative SQL core, batch allocation serialization, stale-takeover token model, DLQ rerouting, lock order, auto-create flow, and the new test branches were re-reviewed for security, bug, test-coverage, doc-accuracy, and style regressions. Nothing surfaced. Generated REV-assisted review (AI analysis by postgres-ai/rev) |
Summary
Feature PR for PgQue cooperative consumers / subconsumers for v0.2.
This PR is intentionally staged. The first commit adds the implementation blueprint so we can lock down behavior, API shape, experimental status, and worktree/subagent split before touching SQL and three client libraries.
Scope
Target feature scope:
pgque.consumerandpgque.subscriptionrows with sharedsub_idsubconsumertablereceive_coopfinish_batch()so existingack(batch_id)worksExperimental status
Cooperative consumers / subconsumers must ship as experimental everywhere in 0.2:
Clean-room note
pgq-coopwas studied as behavior reference only. It has no visible license file/header, so implementation must not copy SQL text, comments, tests, or docs from it.Current state
First step in this PR:
Implementation steps to follow in this same PR unless we decide to split:
finish_batch()Tests so far
Docs-only first step. Verified touched files have no tabs or trailing spaces.