Skip to content

Introduce partitioned queues#348

Merged
hardbyte merged 3 commits into
mainfrom
codex/partitioned-queue-api
Jun 12, 2026
Merged

Introduce partitioned queues#348
hardbyte merged 3 commits into
mainfrom
codex/partitioned-queue-api

Conversation

@hardbyte

@hardbyte hardbyte commented Jun 12, 2026

Copy link
Copy Markdown
Owner

Summary

  • Replace the pre-release QueueFanout surface with PartitionedQueue across Rust and Python; no compatibility alias.
  • Make partition 0 the logical queue name (email, email__p1, ...), so partitions = 1 is exactly a plain queue and 1 -> N rollouts do not strand direct enqueues.
  • Domain-separate partition routing from ADR-025 enqueue-shard routing, with public partition_for_ordering_key / partition_hash64 helpers and regression coverage for the correlated-hash defect.
  • Extend Python insert_many_copy / enqueue_many_copy with per-job opts for queue and ordering_key, so one COPY call can carry mixed partition routes.
  • Add ADR-031, docs updates, and a focused hash-based TLA+ routing companion spec. The storage lifecycle model is unchanged because each partition is still an ordinary Awa queue.
  • Tighten worker completion/client test database setup to avoid concurrent test schema/database races seen during local validation.

Refs #347.

Validation

  • cargo fmt --all
  • git diff --check
  • cargo check --workspace
  • PYO3_PYTHON=/usr/bin/python3.12 cargo check --manifest-path awa-python/Cargo.toml
  • cargo test -p awa-model partitioned_queue -- --nocapture
  • cargo test -p awa-worker partitioned_queue -- --nocapture
  • ./correctness/run-tlc.sh storage/AwaPartitionedQueueRouting.tla (69,905 states, no invariant errors)
  • ./correctness/run-tlc.sh storage/AwaPartitionedQueueRouting.tla storage/AwaPartitionedQueueRoutingBroken.cfg (expected failure: CorrelatedPartitionShardCoverage is false)
  • DATABASE_URL=postgres://postgres:test@localhost:15432/awa_test cargo test -p awa-worker --lib
  • DATABASE_URL=postgres://postgres:test@localhost:15432/awa_test cargo test --workspace --lib
  • DATABASE_URL=postgres://postgres:test@localhost:15432/awa_test uv run pytest tests/test_partitioned_queue.py tests/test_start_config.py::test_partitioned_queue_configs_expand_and_dispatch tests/test_start_config.py::test_duplicate_partitioned_queue_queue_config_is_rejected tests/test_awa.py::test_copy_batch_per_job_opts_route_mixed_queues tests/test_sync.py::test_insert_many_copy_sync_per_job_opts -q

Local Producer-Path Smoke Benchmark

Environment: local Docker Postgres 18-alpine on this machine, awa-python editable install in a uv Python 3.14 venv, direct queue-storage COPY, no workers, 20,000 jobs, chunk size 2,000.

  • Single physical queue direct COPY: 20,000 jobs in 0.948s = 21,097 jobs/s.
  • Four-partition PartitionedQueue using per-job opts: 20,000 jobs in 1.185s = 16,878 jobs/s.
  • Distribution was exact: py_partitioned_copy_bench:5000, py_partitioned_copy_bench__p1:5000, py_partitioned_copy_bench__p2:5000, py_partitioned_copy_bench__p3:5000.

Treat this as an API/routing overhead smoke benchmark, not an end-to-end throughput claim. End-to-end partition-width runs should live in the benchmark harness.

Notes

No generated .so files are staged or committed.

Summary by CodeRabbit

  • New Features

    • Added per-job opts parameter to Python COPY batch APIs (insert_many_copy, enqueue_many_copy) for custom queue routing and ordering key overrides.
    • Introduced PartitionedQueue with improved deterministic routing to replace QueueFanout.
  • Breaking Changes

    • QueueFanout removed; migrate to PartitionedQueue.
    • Queue config parameters renamed for per-partition semantics.

@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown

Review Change Stack

Caution

Review failed

Pull request was closed or merged during review

📝 Walkthrough

Walkthrough

This PR replaces the QueueFanout abstraction with PartitionedQueue, adding per-job COPY batch option overrides, domain-separated partition hashing, and TLA+ correctness models. The changes span Rust models, Python bindings, worker integration, comprehensive tests, and documentation.

Changes

Partitioned Queue Refactoring

Layer / File(s) Summary
Core PartitionedQueue model and hashing
awa-model/src/partitioned_queue.rs, awa-model/src/queue_storage.rs
New PartitionedQueue struct routes jobs across physical queues via deterministic ordering-key hashing. Error types validate partition counts, queue names, and logical queue names. Domain-separated partition_hash64 and partition_for_ordering_key fix correlated-hashing defect. Shared ordering_key_hash64 extracted from queue_storage for partitioning logic reuse.
Module re-exports across crates
awa-model/src/lib.rs, awa-worker/src/lib.rs, awa/src/lib.rs, awa-python/python/awa/__init__.py
Public API surface updated: PartitionedQueue and PartitionedQueueError replace QueueFanout equivalents across all crate roots and Python module all.
Worker ClientBuilder and test infrastructure
awa-worker/src/client.rs, awa-worker/src/completion.rs
New ClientBuilder::partitioned_queue() method registers each physical queue from a PartitionedQueue. Test database helpers now create isolated worker-specific Postgres databases and handle creation errors gracefully.
Python type stubs and PyO3 binding
awa-python/python/awa/_awa.pyi, awa-python/src/lib.rs, awa-python/src/partitioned_queue.rs
Type stubs define PartitionedQueue class with partitions() getter and per-partition queue_configs() parameters. PyPartitionedQueue PyO3 class wraps the Rust model and validates partitions > 0.
Python client COPY batch API with opts parameter
awa-python/python/awa/client.py
insert_many_copy() and enqueue_many_copy() methods on both AsyncClient and Client now accept optional opts list for per-job queue and ordering_key overrides. Docstrings document fallback and clearing semantics.
Rust COPY per-job opts parsing and per-queue metrics
awa-python/src/client.rs
PyClient COPY methods parse per-job opts including validation and tri-state ordering_key handling (absent, explicit None, or value). Enqueue metrics grouped by routed queue instead of batch-level, enabling per-queue telemetry.
Python test coverage for PartitionedQueue
awa-python/tests/test_partitioned_queue.py, awa-python/tests/test_awa.py, awa-python/tests/test_start_config.py, awa-python/tests/test_sync.py
New test suite validates PartitionedQueue initialization, key-based/index-based routing, queue_configs generation, and validation errors. Integration tests verify COPY batch per-job opts routing across multiple queues. Updated start_config and sync tests use PartitionedQueue.
TLA+ partitioned queue routing correctness models
correctness/storage/AwaPartitionedQueueRouting.tla, correctness/storage/AwaPartitionedQueueRouting.cfg, correctness/storage/AwaPartitionedQueueRoutingBroken.cfg
Formal specification models client-side routing with correct domain-separated hashing (via folded PartitionHash) and broken correlated variant. Verifies job enqueue, partition/shard assignment, sequence uniqueness, and shard coverage per partition.
Changelog, ADR, and user-facing documentation
CHANGELOG.md, docs/adr/031-partitioned-queues.md, README.md, awa-model/README.md, awa-python/README.md, awa/README.md, docs/configuration.md, docs/architecture.md, docs/benchmarking.md, docs/getting-started-python.md, correctness/storage/MAPPING.md, correctness/storage/README.md, correctness/README.md
ADR-031 formalizes partitioned queues as public concept with terminology, routing contract, Python API, capacity semantics, and repartitioning guidance. All README and configuration docs updated to use PartitionedQueue terminology, examples, and per-partition configuration patterns.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

Possibly related PRs

  • hardbyte/awa#327: Introduced the QueueFanout implementation that this PR replaces with PartitionedQueue, along with related worker builder and API wiring updates.

Poem

A rabbit rebuilt the queue lanes today, 🐰
From fanout's one width to partitions' fine way,
With hashes domain-split and jobs routed per-opt,
The TLA+ specs prove no chaos is afoot!
Throughput preserved, and correctness tight—
Hop along, dear partitions, dance through the night! ✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'Introduce partitioned queues' clearly and concisely summarizes the main change: replacing QueueFanout with PartitionedQueue across Rust and Python.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

hardbyte added 2 commits June 12, 2026 19:57
…ttribution

- Point awa-python project.readme at the SDK README.md; newer maturin
  rejects readme paths outside the package root, which broke the
  Python build + test and chaos smoke CI jobs.
- Restore the 0.6.0-beta.2 changelog entry to the QueueFanout surface
  that release actually shipped, and document the PartitionedQueue
  replacement (naming, domain-separated routing, per-job COPY opts)
  under Unreleased instead.
- Attribute enqueue_many_copy batch metrics to the routed queues:
  per-job opts can split one batch across queues, so recording the
  whole batch under the batch-level default queue mislabels both
  histograms. Homogeneous batches keep the store-reported total.
- Reattach the prepare_insert_many_params doc comment that the
  PerJobInsertOpts struct insertion had captured, and document the
  tri-state ordering_key semantics on the struct.
- Raise a ValidationError for non-string opts queue values to match
  the neighboring opts validation errors.
- Mark ADR-031 Accepted now that this PR implements it.
@hardbyte hardbyte marked this pull request as ready for review June 12, 2026 08:49
@hardbyte hardbyte merged commit 7be5bd8 into main Jun 12, 2026
21 of 23 checks passed
@hardbyte hardbyte deleted the codex/partitioned-queue-api branch June 12, 2026 08:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant