Introduce partitioned queues#348
Conversation
|
Caution Review failedPull request was closed or merged during review 📝 WalkthroughWalkthroughThis PR replaces the QueueFanout abstraction with PartitionedQueue, adding per-job COPY batch option overrides, domain-separated partition hashing, and TLA+ correctness models. The changes span Rust models, Python bindings, worker integration, comprehensive tests, and documentation. ChangesPartitioned Queue Refactoring
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…ttribution - Point awa-python project.readme at the SDK README.md; newer maturin rejects readme paths outside the package root, which broke the Python build + test and chaos smoke CI jobs. - Restore the 0.6.0-beta.2 changelog entry to the QueueFanout surface that release actually shipped, and document the PartitionedQueue replacement (naming, domain-separated routing, per-job COPY opts) under Unreleased instead. - Attribute enqueue_many_copy batch metrics to the routed queues: per-job opts can split one batch across queues, so recording the whole batch under the batch-level default queue mislabels both histograms. Homogeneous batches keep the store-reported total. - Reattach the prepare_insert_many_params doc comment that the PerJobInsertOpts struct insertion had captured, and document the tri-state ordering_key semantics on the struct. - Raise a ValidationError for non-string opts queue values to match the neighboring opts validation errors. - Mark ADR-031 Accepted now that this PR implements it.
Summary
QueueFanoutsurface withPartitionedQueueacross Rust and Python; no compatibility alias.email,email__p1, ...), sopartitions = 1is exactly a plain queue and1 -> Nrollouts do not strand direct enqueues.partition_for_ordering_key/partition_hash64helpers and regression coverage for the correlated-hash defect.insert_many_copy/enqueue_many_copywith per-joboptsforqueueandordering_key, so one COPY call can carry mixed partition routes.Refs #347.
Validation
cargo fmt --allgit diff --checkcargo check --workspacePYO3_PYTHON=/usr/bin/python3.12 cargo check --manifest-path awa-python/Cargo.tomlcargo test -p awa-model partitioned_queue -- --nocapturecargo test -p awa-worker partitioned_queue -- --nocapture./correctness/run-tlc.sh storage/AwaPartitionedQueueRouting.tla(69,905 states, no invariant errors)./correctness/run-tlc.sh storage/AwaPartitionedQueueRouting.tla storage/AwaPartitionedQueueRoutingBroken.cfg(expected failure:CorrelatedPartitionShardCoverageis false)DATABASE_URL=postgres://postgres:test@localhost:15432/awa_test cargo test -p awa-worker --libDATABASE_URL=postgres://postgres:test@localhost:15432/awa_test cargo test --workspace --libDATABASE_URL=postgres://postgres:test@localhost:15432/awa_test uv run pytest tests/test_partitioned_queue.py tests/test_start_config.py::test_partitioned_queue_configs_expand_and_dispatch tests/test_start_config.py::test_duplicate_partitioned_queue_queue_config_is_rejected tests/test_awa.py::test_copy_batch_per_job_opts_route_mixed_queues tests/test_sync.py::test_insert_many_copy_sync_per_job_opts -qLocal Producer-Path Smoke Benchmark
Environment: local Docker Postgres 18-alpine on this machine,
awa-pythoneditable install in auvPython 3.14 venv, direct queue-storage COPY, no workers, 20,000 jobs, chunk size 2,000.20,000jobs in0.948s=21,097 jobs/s.PartitionedQueueusing per-jobopts:20,000jobs in1.185s=16,878 jobs/s.py_partitioned_copy_bench:5000,py_partitioned_copy_bench__p1:5000,py_partitioned_copy_bench__p2:5000,py_partitioned_copy_bench__p3:5000.Treat this as an API/routing overhead smoke benchmark, not an end-to-end throughput claim. End-to-end partition-width runs should live in the benchmark harness.
Notes
No generated
.sofiles are staged or committed.Summary by CodeRabbit
New Features
optsparameter to Python COPY batch APIs (insert_many_copy,enqueue_many_copy) for custom queue routing and ordering key overrides.PartitionedQueuewith improved deterministic routing to replaceQueueFanout.Breaking Changes
QueueFanoutremoved; migrate toPartitionedQueue.