From 88d7051d34e6fdd3f980c9545d8644931fbf7b88 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Fri, 12 Jun 2026 19:02:19 +1200 Subject: [PATCH 1/3] Introduce partitioned queue API --- CHANGELOG.md | 2 +- README.md | 4 +- awa-model/README.md | 2 +- awa-model/src/lib.rs | 6 +- awa-model/src/partitioned_queue.rs | 329 ++++++++++++++++++ awa-model/src/queue_fanout.rs | 269 -------------- awa-model/src/queue_storage.rs | 29 +- awa-python/README.md | 27 +- awa-python/python/awa/__init__.py | 4 +- awa-python/python/awa/_awa.pyi | 22 +- awa-python/python/awa/client.py | 32 +- awa-python/src/client.rs | 114 +++++- awa-python/src/lib.rs | 4 +- .../{queue_fanout.rs => partitioned_queue.rs} | 78 +++-- awa-python/tests/test_awa.py | 57 +++ awa-python/tests/test_partitioned_queue.py | 121 +++++++ awa-python/tests/test_queue_fanout.py | 116 ------ awa-python/tests/test_start_config.py | 24 +- awa-python/tests/test_sync.py | 17 + awa-worker/src/client.rs | 46 ++- awa-worker/src/completion.rs | 56 ++- awa-worker/src/lib.rs | 4 +- awa/README.md | 12 +- awa/src/lib.rs | 10 +- correctness/README.md | 4 + .../storage/AwaPartitionedQueueRouting.cfg | 15 + .../storage/AwaPartitionedQueueRouting.tla | 99 ++++++ correctness/storage/MAPPING.md | 8 + correctness/storage/README.md | 17 + docs/adr/031-partitioned-queues.md | 125 +++++++ docs/adr/README.md | 1 + docs/architecture.md | 2 +- docs/benchmarking.md | 2 +- docs/configuration.md | 36 +- docs/getting-started-python.md | 2 +- 35 files changed, 1155 insertions(+), 541 deletions(-) create mode 100644 awa-model/src/partitioned_queue.rs delete mode 100644 awa-model/src/queue_fanout.rs rename awa-python/src/{queue_fanout.rs => partitioned_queue.rs} (64%) create mode 100644 awa-python/tests/test_partitioned_queue.py delete mode 100644 awa-python/tests/test_queue_fanout.py create mode 100644 correctness/storage/AwaPartitionedQueueRouting.cfg create mode 100644 correctness/storage/AwaPartitionedQueueRouting.tla create mode 100644 docs/adr/031-partitioned-queues.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 80aeae72..4d0357c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ Migrations v022–v031 apply via `awa migrate` (or the SQL-only path in [`docs/m - **Transactional follow-up jobs** ([ADR-029](docs/adr/029-transactional-followup-jobs.md), [#285](https://github.com/hardbyte/awa/pull/285)/[#288](https://github.com/hardbyte/awa/pull/288)). `ClientBuilder::on_completed_enqueue` (and friends) registers a follow-up job inserted in the same transaction as the lifecycle transition for worker-driven outcomes; callback resolution through the worker `Client` commits transition + follow-up atomically too. Maintenance rescue stays best-effort by design. - **Callback-only router and user-owned callback layers** ([#291](https://github.com/hardbyte/awa/pull/291)/[#293](https://github.com/hardbyte/awa/pull/293), closes [#281](https://github.com/hardbyte/awa/issues/281)). The callback ingress contract is shared and the URL prefix configurable; embed Awa's router or implement the contract in your own API layer — axum and FastAPI examples documented. - **`WaitingForCallback` lifecycle event + client-side callback resolution** ([#276](https://github.com/hardbyte/awa/pull/276)). Jobs parking on `WaitForCallback` are now visible to lifecycle hooks, and `Client` gains `resolve_callback` / `complete_external` / `fail_external` / `retry_external` that dispatch the matching terminal event in-process. -- **Queue fanout helper** ([#327](https://github.com/hardbyte/awa/pull/327)). Rust `QueueFanout` + `ClientBuilder::queue_fanout` and Python `awa.QueueFanout` for deterministic routing from one hot logical queue to multiple physical queues; duplicate physical declarations are rejected. +- **Partitioned queue helper** ([#327](https://github.com/hardbyte/awa/pull/327)). Rust `PartitionedQueue` + `ClientBuilder::partitioned_queue` and Python `awa.PartitionedQueue` for deterministic routing from one hot logical queue to multiple physical queues; duplicate physical declarations are rejected. - **Cron schedule pause / resume** ([#320](https://github.com/hardbyte/awa/pull/320), migration v026). `POST /api/cron/{name}/pause` / `/resume`; the evaluator skips paused rows and the `atomic_enqueue` CTE re-checks `paused_at IS NULL` so a pause asserted mid-evaluation still takes effect. `last_enqueued_at` is untouched while paused, so `missed_fire_policy` decides catch-up on resume. Manual `trigger_cron_job` bypasses pause. The `/cron` UI gains Pause/Resume controls. - **`awa storage finalize --wait` / `--check`** ([#298](https://github.com/hardbyte/awa/pull/298)). `--wait` polls and finalizes once readiness gates stay clear for two consecutive observations; `--check` is a dry-run that exits 2 when blocked. - **Storage-transition readiness UI** ([#299](https://github.com/hardbyte/awa/pull/299)). Time-in-state, epoch-anchored backlog history, a prominent `prepared_schema_ready=false` warning with the remediation command, and a rollback-boundaries panel. diff --git a/README.md b/README.md index 561bff8f..d3dea46d 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Awa (Māori: river) fills the gap between Postgres event queues that are too nar - A worker **claims** a job before running it. The claim increments `run_lease`, which guards completion so stale workers cannot finish a newer attempt. - A **lease** is the durable execution record for a live attempt. Short jobs usually stay on the receipt path; jobs that need heartbeat, progress, callbacks, or mutable attempt state materialize a lease row. - A **lane** is the ordered stream for one `(queue, priority, enqueue_shard)`. FIFO is strict inside a lane; raising shard count creates partitioned FIFO. -- A **queue fanout** maps one hot logical workload to several physical queues so workers can drain independent claim/completion streams while preserving normal Awa durability and rescue semantics. +- A **partitioned queue** maps one hot logical workload to several physical queues so workers can drain independent claim/completion streams while preserving normal Awa durability and rescue semantics. - A **segment** is a ring partition Awa can rotate and later truncate. Segments keep high-churn queue history off long-lived row-vacuum paths. - A **ready tombstone** is a small marker saying an immutable ready row should no longer be claimed, for example after cancelling or reprioritizing an unclaimed job. The ready row itself stays append-only until segment prune. @@ -74,7 +74,7 @@ A phase-driven portable benchmark harness comparing Awa against pgque, procrasti ## Hot Queues -Queues default to one physical queue, one enqueue shard, one claimer, and one queue-storage completion shard. Those defaults are intentionally conservative. For workloads that need more end-to-end throughput and can accept partitioned ordering, Rust and Python both expose `QueueFanout` to route one logical queue over several physical queues. `enqueue_shards` and `claimers` address different bottlenecks; see [Choosing a throughput lever](docs/configuration.md#choosing-a-throughput-lever) before changing them. +Queues default to one physical queue, one enqueue shard, one claimer, and one queue-storage completion shard. Those defaults are intentionally conservative. For workloads that need more end-to-end throughput and can accept partitioned ordering, Rust and Python both expose `PartitionedQueue` to route one logical queue over several physical queues. `enqueue_shards` and `claimers` address different bottlenecks; see [Choosing a throughput lever](docs/configuration.md#choosing-a-throughput-lever) before changing them. Methodology and caveats live in [benchmarking notes](docs/benchmarking.md). Validation artifacts: [ADR-019 (queue storage)](docs/adr/bench/019-queue-storage-validation-2026-04-19.md) and [ADR-023 (receipt-plane ring partitioning)](docs/adr/bench/023-receipt-ring-validation-2026-04-26.md). diff --git a/awa-model/README.md b/awa-model/README.md index bbac1cc4..59425f19 100644 --- a/awa-model/README.md +++ b/awa-model/README.md @@ -8,7 +8,7 @@ Most Rust applications should depend on the [`awa`](https://crates.io/crates/awa - **Job model** — `JobRow`, `JobState`, `InsertOpts`, `UniqueOpts`, `InsertParams`. - **Insertion** — `insert`, `insert_with`, `insert_many`, `insert_many_copy` for the compatibility insert surface, and `QueueStorage::enqueue_params_copy` for direct queue-storage COPY ingestion with an explicitly configured queue-storage engine. -- **Queue fanout** — `QueueFanout` gives enqueue-only producers the same deterministic physical queue routing workers use for very hot logical queues. +- **Partitioned queues** — `PartitionedQueue` gives enqueue-only producers the same deterministic physical queue routing workers use for very hot logical queues. - **Migrations** — `migrations::run` applies the schema; `migrations`, `migration_sql`, `migration_sql_range`, and `current_version` expose the catalog for tooling. - **Admin** (`admin`) — retry, cancel (single, by unique key, bulk), pause/resume/drain queues, queue and job-kind overviews, runtime instance snapshots, dirty-key recompute, and descriptor sync (`sync_queue_descriptors`, `sync_job_kind_descriptors`, `cleanup_stale_descriptors`). - **Dead Letter Queue** (`dlq`) — `DlqRow`, `DlqMetadata`, `ListDlqFilter`, `RetryFromDlqOpts`, list / retry / move / purge helpers backing the `awa dlq` CLI and the DLQ admin UI tab. diff --git a/awa-model/src/lib.rs b/awa-model/src/lib.rs index 868bd40a..16de29be 100644 --- a/awa-model/src/lib.rs +++ b/awa-model/src/lib.rs @@ -10,7 +10,7 @@ pub mod insert; pub mod job; pub mod kind; pub mod migrations; -pub mod queue_fanout; +pub mod partitioned_queue; pub mod queue_storage; pub mod storage; pub mod unique; @@ -39,7 +39,9 @@ pub use dlq::{DlqMetadata, DlqRow, ListDlqFilter, RetryFromDlqOpts}; pub use error::{map_sqlx_error, AwaError}; pub use insert::{insert, insert_many, insert_many_copy, insert_many_copy_from_pool, insert_with}; pub use job::{InsertOpts, InsertParams, JobRow, JobState, UniqueOpts}; -pub use queue_fanout::{QueueFanout, QueueFanoutError}; +pub use partitioned_queue::{ + partition_for_ordering_key, partition_hash64, PartitionedQueue, PartitionedQueueError, +}; pub use queue_storage::{ ClaimedEntry, ClaimedRuntimeJob, PruneOutcome, QueueCounts, QueueStorage, QueueStorageConfig, RotateOutcome, SkipReason, TerminalDeltaRollupOutcome, diff --git a/awa-model/src/partitioned_queue.rs b/awa-model/src/partitioned_queue.rs new file mode 100644 index 00000000..c6ee55ff --- /dev/null +++ b/awa-model/src/partitioned_queue.rs @@ -0,0 +1,329 @@ +use crate::job::InsertOpts; +use crate::queue_storage::ordering_key_hash64; +use std::collections::HashSet; + +const DEFAULT_PHYSICAL_QUEUE_SUFFIX: &str = "__p"; +const PARTITION_HASH_DOMAIN: u64 = 0x9d4d_1b2f_53a7_0c91; + +/// Errors returned when constructing a [`PartitionedQueue`]. +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum PartitionedQueueError { + #[error("partitioned queue logical queue must not be empty")] + EmptyLogicalQueue, + #[error("partitioned queue partitions must be > 0")] + ZeroPartitions, + #[error("partitioned queue supports at most {max} physical queues; got {got}")] + TooManyPhysicalQueues { got: usize, max: usize }, + #[error("partitioned queue physical queue must not be empty")] + EmptyPhysicalQueue, + #[error("partitioned queue physical queue '{queue}' is duplicated")] + DuplicatePhysicalQueue { queue: String }, +} + +/// A deterministic set of physical queues for one hot logical queue. +/// +/// Awa still stores and executes jobs from ordinary queue names. This helper +/// gives producers and workers the same stable list of physical queues, so an +/// application can partition one logical workload over several queues without +/// hand-rolling naming and routing in every process. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PartitionedQueue { + logical_queue: String, + physical_queues: Vec, +} + +impl PartitionedQueue { + /// Build a partitioned queue using Awa's default physical queue naming. + /// + /// One partition maps to the logical queue name itself. With more than one + /// partition, partition 0 is still the logical queue name and later + /// partitions are `{logical_queue}__p1`, `{logical_queue}__p2`, and so on. + /// That keeps direct enqueues to the logical name consumable during a + /// `1 -> N` rollout. + pub fn new( + logical_queue: impl Into, + partitions: usize, + ) -> Result { + let logical_queue = logical_queue.into(); + validate_logical_queue(&logical_queue)?; + validate_physical_count(partitions)?; + + let mut physical_queues = Vec::with_capacity(partitions); + physical_queues.push(logical_queue.clone()); + physical_queues.extend( + (1..partitions) + .map(|idx| format!("{logical_queue}{DEFAULT_PHYSICAL_QUEUE_SUFFIX}{idx}")), + ); + + Ok(Self { + logical_queue, + physical_queues, + }) + } + + /// Build a partitioned queue from explicit physical queue names. + /// + /// Use this when an application already has queue names it wants to keep, + /// or when migrating an existing manually-fanned-out deployment to the + /// shared helper. + pub fn from_physical_queues( + logical_queue: impl Into, + physical_queues: I, + ) -> Result + where + I: IntoIterator, + S: Into, + { + let logical_queue = logical_queue.into(); + validate_logical_queue(&logical_queue)?; + + let physical_queues: Vec = physical_queues.into_iter().map(Into::into).collect(); + validate_physical_count(physical_queues.len())?; + validate_physical_queues(&physical_queues)?; + + Ok(Self { + logical_queue, + physical_queues, + }) + } + + /// Logical queue name used by the application. + pub fn logical_queue(&self) -> &str { + &self.logical_queue + } + + /// Physical queues that must be declared on worker runtimes. + pub fn physical_queues(&self) -> &[String] { + &self.physical_queues + } + + /// Number of physical queues in the partitioned queue. + pub fn partitions(&self) -> usize { + self.physical_queues.len() + } + + /// Select a physical queue by stable routing key. + /// + /// The same key always maps to the same physical queue. Partition routing + /// domain-separates the hash used for queue-storage enqueue shards so + /// `partitions == enqueue_shards` does not collapse keyed traffic onto one + /// shard inside each partition. + pub fn queue_for_key(&self, key: impl AsRef<[u8]>) -> &str { + let partition = partition_for_ordering_key(key.as_ref(), self.partitions()); + &self.physical_queues[partition] + } + + /// Select a physical queue by caller-supplied sequence number. + /// + /// This is useful for bulk producers that want round-robin partitioning and + /// do not need per-key ordering. + pub fn queue_for_index(&self, index: usize) -> &str { + &self.physical_queues[index % self.partitions()] + } + + /// Return insert options routed by key. + /// + /// This sets both the physical queue and `ordering_key`, so per-key FIFO is + /// preserved even if the selected physical queue later uses multiple + /// queue-storage enqueue shards. + pub fn route_opts_by_key(&self, mut opts: InsertOpts, key: impl AsRef<[u8]>) -> InsertOpts { + let key = key.as_ref(); + opts.queue = self.queue_for_key(key).to_string(); + opts.ordering_key = Some(key.to_vec()); + opts + } + + /// Return insert options routed by round-robin index. + pub fn route_opts_by_index(&self, mut opts: InsertOpts, index: usize) -> InsertOpts { + opts.queue = self.queue_for_index(index).to_string(); + opts + } +} + +impl AsRef<[String]> for PartitionedQueue { + fn as_ref(&self) -> &[String] { + self.physical_queues() + } +} + +impl<'a> IntoIterator for &'a PartitionedQueue { + type Item = &'a String; + type IntoIter = std::slice::Iter<'a, String>; + + fn into_iter(self) -> Self::IntoIter { + self.physical_queues.iter() + } +} + +fn validate_logical_queue(logical_queue: &str) -> Result<(), PartitionedQueueError> { + if logical_queue.is_empty() { + return Err(PartitionedQueueError::EmptyLogicalQueue); + } + Ok(()) +} + +fn validate_physical_count(count: usize) -> Result<(), PartitionedQueueError> { + if count == 0 { + return Err(PartitionedQueueError::ZeroPartitions); + } + + let max = i16::MAX as usize; + if count > max { + return Err(PartitionedQueueError::TooManyPhysicalQueues { got: count, max }); + } + + Ok(()) +} + +/// Deterministically map an ordering key to a partition in `[0, partitions)`. +/// +/// This uses the same portable base hash as `shard_for_ordering_key`, then +/// applies a domain-separated SplitMix64 finalizer before taking the modulo. +/// That keeps partition selection stable and portable without correlating it +/// with the enqueue-shard modulo for the same key bytes. +pub fn partition_for_ordering_key(ordering_key: &[u8], partitions: usize) -> usize { + if partitions <= 1 { + return 0; + } + (partition_hash64(ordering_key) % partitions as u64) as usize +} + +/// Portable partition-routing hash over raw ordering-key bytes. +/// +/// This is domain-separated from [`crate::queue_storage::ordering_key_hash64`] +/// so keyed partition routing composes with queue-storage enqueue sharding. +pub fn partition_hash64(ordering_key: &[u8]) -> u64 { + let mut value = ordering_key_hash64(ordering_key) ^ PARTITION_HASH_DOMAIN; + value ^= value >> 30; + value = value.wrapping_mul(0xbf58_476d_1ce4_e5b9); + value ^= value >> 27; + value = value.wrapping_mul(0x94d0_49bb_1331_11eb); + value ^ (value >> 31) +} + +fn validate_physical_queues(queues: &[String]) -> Result<(), PartitionedQueueError> { + let mut seen = HashSet::with_capacity(queues.len()); + for queue in queues { + if queue.is_empty() { + return Err(PartitionedQueueError::EmptyPhysicalQueue); + } + if !seen.insert(queue.as_str()) { + return Err(PartitionedQueueError::DuplicatePhysicalQueue { + queue: queue.clone(), + }); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn default_names_preserve_single_queue_shape() { + let queue = PartitionedQueue::new("email", 1).expect("partitioned queue should build"); + + assert_eq!(queue.logical_queue(), "email"); + assert_eq!(queue.physical_queues(), &["email".to_string()]); + assert_eq!(queue.partitions(), 1); + assert_eq!(queue.queue_for_index(42), "email"); + } + + #[test] + fn default_names_are_stable_for_multiple_queues() { + let queue = PartitionedQueue::new("email", 4).expect("partitioned queue should build"); + + assert_eq!( + queue.physical_queues(), + &[ + "email".to_string(), + "email__p1".to_string(), + "email__p2".to_string(), + "email__p3".to_string(), + ] + ); + assert_eq!(queue.partitions(), 4); + assert_eq!(queue.queue_for_index(0), "email"); + assert_eq!(queue.queue_for_index(5), "email__p1"); + } + + #[test] + fn key_routing_sets_queue_and_ordering_key() { + let queue = + PartitionedQueue::new("customer-updates", 4).expect("partitioned queue should build"); + + let opts = queue.route_opts_by_key(InsertOpts::default(), b"customer-42"); + + assert_eq!(opts.queue, queue.queue_for_key(b"customer-42")); + assert_eq!(opts.ordering_key.as_deref(), Some(&b"customer-42"[..])); + } + + #[test] + fn partition_hash_is_domain_separated_from_enqueue_shard_hash() { + use crate::queue_storage::shard_for_ordering_key; + + let partitions = 4; + let shards = 4; + let mut partition_shards = vec![HashSet::new(); partitions]; + for idx in 0..20_000 { + let key = format!("customer-{idx}"); + let partition = partition_for_ordering_key(key.as_bytes(), partitions); + let shard = shard_for_ordering_key(key.as_bytes(), shards); + partition_shards[partition].insert(shard); + } + + for hits in partition_shards { + assert_eq!(hits.len(), shards as usize); + } + } + + #[test] + fn explicit_queues_reject_empty_and_duplicate_names() { + let empty = PartitionedQueue::from_physical_queues("email", ["email-a", ""]); + assert!(matches!( + empty, + Err(PartitionedQueueError::EmptyPhysicalQueue) + )); + + let duplicate = PartitionedQueue::from_physical_queues("email", ["email-a", "email-a"]); + assert!(matches!( + duplicate, + Err(PartitionedQueueError::DuplicatePhysicalQueue { queue }) if queue == "email-a" + )); + } + + #[test] + fn explicit_queues_preserve_caller_order() { + let queue = PartitionedQueue::from_physical_queues( + "email", + ["email-fast", "email-bulk", "email-slow"], + ) + .expect("partitioned queue should build"); + + assert_eq!( + queue.physical_queues(), + &[ + "email-fast".to_string(), + "email-bulk".to_string(), + "email-slow".to_string(), + ] + ); + assert_eq!(queue.queue_for_index(0), "email-fast"); + assert_eq!(queue.queue_for_index(4), "email-bulk"); + } + + #[test] + fn partition_count_is_bounded_by_queue_storage_shard_type() { + let too_wide = PartitionedQueue::from_physical_queues( + "email", + (0..=(i16::MAX as usize)).map(|idx| format!("email-{idx}")), + ); + + assert!(matches!( + too_wide, + Err(PartitionedQueueError::TooManyPhysicalQueues { got, max }) + if got == i16::MAX as usize + 1 && max == i16::MAX as usize + )); + } +} diff --git a/awa-model/src/queue_fanout.rs b/awa-model/src/queue_fanout.rs deleted file mode 100644 index 9c2459a3..00000000 --- a/awa-model/src/queue_fanout.rs +++ /dev/null @@ -1,269 +0,0 @@ -use crate::job::InsertOpts; -use crate::queue_storage::shard_for_ordering_key; -use std::collections::HashSet; - -const DEFAULT_PHYSICAL_QUEUE_SUFFIX: &str = "__p"; - -/// Errors returned when constructing a [`QueueFanout`]. -#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] -pub enum QueueFanoutError { - #[error("queue fanout logical queue must not be empty")] - EmptyLogicalQueue, - #[error("queue fanout width must be > 0")] - ZeroWidth, - #[error("queue fanout supports at most {max} physical queues; got {got}")] - TooManyPhysicalQueues { got: usize, max: usize }, - #[error("queue fanout physical queue must not be empty")] - EmptyPhysicalQueue, - #[error("queue fanout physical queue '{queue}' is duplicated")] - DuplicatePhysicalQueue { queue: String }, -} - -/// A deterministic set of physical queues for one hot logical queue. -/// -/// Awa still stores and executes jobs from ordinary queue names. This helper -/// gives producers and workers the same stable list of physical queues, so an -/// application can fan one logical workload out over several queues without -/// hand-rolling naming and routing in every process. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct QueueFanout { - logical_queue: String, - physical_queues: Vec, -} - -impl QueueFanout { - /// Build a fanout using Awa's default physical queue naming. - /// - /// Width `1` maps to the logical queue name itself. Widths above `1` - /// produce `{logical_queue}__p0`, `{logical_queue}__p1`, and so on. - pub fn new(logical_queue: impl Into, width: usize) -> Result { - let logical_queue = logical_queue.into(); - validate_logical_queue(&logical_queue)?; - validate_physical_count(width)?; - - let physical_queues = if width == 1 { - vec![logical_queue.clone()] - } else { - (0..width) - .map(|idx| format!("{logical_queue}{DEFAULT_PHYSICAL_QUEUE_SUFFIX}{idx}")) - .collect() - }; - - Ok(Self { - logical_queue, - physical_queues, - }) - } - - /// Build a fanout from explicit physical queue names. - /// - /// Use this when an application already has queue names it wants to keep, - /// or when migrating an existing manually-fanned-out deployment to the - /// shared helper. - pub fn from_physical_queues( - logical_queue: impl Into, - physical_queues: I, - ) -> Result - where - I: IntoIterator, - S: Into, - { - let logical_queue = logical_queue.into(); - validate_logical_queue(&logical_queue)?; - - let physical_queues: Vec = physical_queues.into_iter().map(Into::into).collect(); - validate_physical_count(physical_queues.len())?; - validate_physical_queues(&physical_queues)?; - - Ok(Self { - logical_queue, - physical_queues, - }) - } - - /// Logical queue name used by the application. - pub fn logical_queue(&self) -> &str { - &self.logical_queue - } - - /// Physical queues that must be declared on worker runtimes. - pub fn physical_queues(&self) -> &[String] { - &self.physical_queues - } - - /// Number of physical queues in the fanout. - pub fn width(&self) -> usize { - self.physical_queues.len() - } - - /// Select a physical queue by stable routing key. - /// - /// The same key always maps to the same physical queue, using the same - /// portable hash Awa uses for queue-storage enqueue shards. - pub fn queue_for_key(&self, key: impl AsRef<[u8]>) -> &str { - let shard = shard_for_ordering_key(key.as_ref(), self.width() as i16) as usize; - &self.physical_queues[shard] - } - - /// Select a physical queue by caller-supplied sequence number. - /// - /// This is useful for bulk producers that want round-robin fanout and do - /// not need per-key ordering. - pub fn queue_for_index(&self, index: usize) -> &str { - &self.physical_queues[index % self.width()] - } - - /// Return insert options routed by key. - /// - /// This sets both the physical queue and `ordering_key`, so per-key FIFO is - /// preserved even if the selected physical queue later uses multiple - /// queue-storage enqueue shards. - pub fn route_opts_by_key(&self, mut opts: InsertOpts, key: impl AsRef<[u8]>) -> InsertOpts { - let key = key.as_ref(); - opts.queue = self.queue_for_key(key).to_string(); - opts.ordering_key = Some(key.to_vec()); - opts - } - - /// Return insert options routed by round-robin index. - pub fn route_opts_by_index(&self, mut opts: InsertOpts, index: usize) -> InsertOpts { - opts.queue = self.queue_for_index(index).to_string(); - opts - } -} - -impl AsRef<[String]> for QueueFanout { - fn as_ref(&self) -> &[String] { - self.physical_queues() - } -} - -impl<'a> IntoIterator for &'a QueueFanout { - type Item = &'a String; - type IntoIter = std::slice::Iter<'a, String>; - - fn into_iter(self) -> Self::IntoIter { - self.physical_queues.iter() - } -} - -fn validate_logical_queue(logical_queue: &str) -> Result<(), QueueFanoutError> { - if logical_queue.is_empty() { - return Err(QueueFanoutError::EmptyLogicalQueue); - } - Ok(()) -} - -fn validate_physical_count(count: usize) -> Result<(), QueueFanoutError> { - if count == 0 { - return Err(QueueFanoutError::ZeroWidth); - } - - let max = i16::MAX as usize; - if count > max { - return Err(QueueFanoutError::TooManyPhysicalQueues { got: count, max }); - } - - Ok(()) -} - -fn validate_physical_queues(queues: &[String]) -> Result<(), QueueFanoutError> { - let mut seen = HashSet::with_capacity(queues.len()); - for queue in queues { - if queue.is_empty() { - return Err(QueueFanoutError::EmptyPhysicalQueue); - } - if !seen.insert(queue.as_str()) { - return Err(QueueFanoutError::DuplicatePhysicalQueue { - queue: queue.clone(), - }); - } - } - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn default_names_preserve_single_queue_shape() { - let fanout = QueueFanout::new("email", 1).expect("fanout should build"); - - assert_eq!(fanout.logical_queue(), "email"); - assert_eq!(fanout.physical_queues(), &["email".to_string()]); - assert_eq!(fanout.queue_for_index(42), "email"); - } - - #[test] - fn default_names_are_stable_for_multiple_queues() { - let fanout = QueueFanout::new("email", 4).expect("fanout should build"); - - assert_eq!( - fanout.physical_queues(), - &[ - "email__p0".to_string(), - "email__p1".to_string(), - "email__p2".to_string(), - "email__p3".to_string(), - ] - ); - assert_eq!(fanout.queue_for_index(0), "email__p0"); - assert_eq!(fanout.queue_for_index(5), "email__p1"); - } - - #[test] - fn key_routing_sets_queue_and_ordering_key() { - let fanout = QueueFanout::new("customer-updates", 4).expect("fanout should build"); - - let opts = fanout.route_opts_by_key(InsertOpts::default(), b"customer-42"); - - assert_eq!(fanout.queue_for_key(b"customer-42"), "customer-updates__p0"); - assert_eq!(opts.queue, fanout.queue_for_key(b"customer-42")); - assert_eq!(opts.ordering_key.as_deref(), Some(&b"customer-42"[..])); - } - - #[test] - fn explicit_queues_reject_empty_and_duplicate_names() { - let empty = QueueFanout::from_physical_queues("email", ["email-a", ""]); - assert!(matches!(empty, Err(QueueFanoutError::EmptyPhysicalQueue))); - - let duplicate = QueueFanout::from_physical_queues("email", ["email-a", "email-a"]); - assert!(matches!( - duplicate, - Err(QueueFanoutError::DuplicatePhysicalQueue { queue }) if queue == "email-a" - )); - } - - #[test] - fn explicit_queues_preserve_caller_order() { - let fanout = - QueueFanout::from_physical_queues("email", ["email-fast", "email-bulk", "email-slow"]) - .expect("fanout should build"); - - assert_eq!( - fanout.physical_queues(), - &[ - "email-fast".to_string(), - "email-bulk".to_string(), - "email-slow".to_string(), - ] - ); - assert_eq!(fanout.queue_for_index(0), "email-fast"); - assert_eq!(fanout.queue_for_index(4), "email-bulk"); - } - - #[test] - fn width_is_bounded_by_queue_storage_shard_type() { - let too_wide = QueueFanout::from_physical_queues( - "email", - (0..=(i16::MAX as usize)).map(|idx| format!("email-{idx}")), - ); - - assert!(matches!( - too_wide, - Err(QueueFanoutError::TooManyPhysicalQueues { got, max }) - if got == i16::MAX as usize + 1 && max == i16::MAX as usize - )); - } -} diff --git a/awa-model/src/queue_storage.rs b/awa-model/src/queue_storage.rs index 3c48bfb6..061900b2 100644 --- a/awa-model/src/queue_storage.rs +++ b/awa-model/src/queue_storage.rs @@ -24,27 +24,34 @@ const COPY_NULL_SENTINEL: &str = "__AWA_NULL__"; const COPY_CHUNK_TARGET_BYTES: usize = 256 * 1024; const TERMINAL_COUNTER_BUCKETS: i16 = 256; +/// Portable 64-bit hash over raw ordering-key bytes. +/// +/// This is intentionally simple enough to implement byte-for-byte in +/// `awa.insert_job_compat`, so SQL, Rust, and Python producers that +/// pass the same ordering-key bytes can derive the same routing facts. +pub fn ordering_key_hash64(ordering_key: &[u8]) -> u64 { + let mut hash: u128 = 14_695_981_039_346_656_037; + const PRIME: u128 = 1_099_511_628_211; + const MASK: u128 = u64::MAX as u128; + for byte in ordering_key { + hash = hash.wrapping_mul(PRIME).wrapping_add(*byte as u128) & MASK; + } + hash as u64 +} + /// Deterministically map an ordering key to a shard in `[0, shards)`. /// /// Inputs sharing a key always produce the same shard, which is what /// lets producers preserve FIFO within a key when the destination /// queue is sharded. `shards <= 1` returns shard 0 unconditionally. /// -/// The hash is a portable 64-bit rolling hash over the raw key bytes. -/// It is intentionally simple enough to implement byte-for-byte in -/// `awa.insert_job_compat`, so SQL, Rust, and Python producers that -/// pass the same ordering-key bytes land jobs on the same shard. +/// This deliberately remains the raw `ordering_key_hash64(key) % shards` +/// mapping used by SQL compatibility producers. pub fn shard_for_ordering_key(ordering_key: &[u8], shards: i16) -> i16 { if shards <= 1 { return 0; } - let mut hash: u128 = 14_695_981_039_346_656_037; - const PRIME: u128 = 1_099_511_628_211; - const MASK: u128 = u64::MAX as u128; - for byte in ordering_key { - hash = hash.wrapping_mul(PRIME).wrapping_add(*byte as u128) & MASK; - } - (hash % (shards as u128)) as i16 + (ordering_key_hash64(ordering_key) % shards as u64) as i16 } fn terminal_counter_bucket(job_id: i64) -> i16 { diff --git a/awa-python/README.md b/awa-python/README.md index 39729f83..e2aaa7d1 100644 --- a/awa-python/README.md +++ b/awa-python/README.md @@ -51,7 +51,7 @@ For application tables, keep using your existing database library. The `awa.brid - **Transactional enqueue** — enqueue inside the same Postgres transaction as your application's writes, using your existing connection/session. - **Vacuum-aware storage** — append-only ready entries plus a partitioned receipt ring keep dead-tuple pressure bounded under sustained load. See [ADR-019](https://github.com/hardbyte/awa/blob/main/docs/adr/019-queue-storage-redesign.md) and [ADR-023](https://github.com/hardbyte/awa/blob/main/docs/adr/023-receipt-plane-ring-partitioning.md). - **COPY ingestion** — `enqueue_many_copy` streams directly into queue storage for high-volume Python producers. `insert_many_copy` remains the compatibility insert surface for canonical-storage and adapter-style callers. If workers use `queue_storage_queue_stripe_count > 1`, pass the same value to `enqueue_many_copy`. -- **Queue fanout** — `QueueFanout` maps one hot logical queue to several physical queues so workers can drain independent streams without changing Awa's durability model. +- **Partitioned queues** — `PartitionedQueue` maps one hot logical queue to several physical queues so workers can drain independent streams without changing Awa's durability model. - **Crash-safe execution** — heartbeat-based lease tracking; jobs whose workers vanish are rescued automatically. - **Per-queue policy** — priorities, priority aging, weighted concurrency, rate limits, deadlines, retry/backoff, cron, dead-letter queue. - **Durable batch operations** — preview, submit, monitor, and cancel async operator mutations such as reprioritizing queued jobs or moving a backlog to another queue. @@ -110,30 +110,37 @@ await client.insert( The key can be `bytes` or `str` (encoded UTF-8). Two enqueues with the same key always pick the same shard regardless of which producer process or batch they came from. At `enqueue_shards = 1` (the default) the key is ignored. See [`docs/adr/025-sharded-enqueue-heads.md`](https://github.com/hardbyte/awa/blob/main/docs/adr/025-sharded-enqueue-heads.md) for the full contract. -## Logical queue fanout +## Partitioned queues -A logical queue is the workload name your application thinks in, such as `customer-updates`. A physical queue is the queue name stored in Postgres and claimed by workers. Most workloads use one physical queue. For a very hot workload where partitioned ordering is acceptable, use `QueueFanout` to spread one logical queue over several physical queues: +A logical queue is the workload name your application thinks in, such as `customer-updates`. A physical queue is the queue name stored in Postgres and claimed by workers. Most workloads use one physical queue. For a very hot workload where partitioned ordering is acceptable, use `PartitionedQueue` to spread one logical queue over several physical queues: ```python -fanout = awa.QueueFanout("customer-updates", 4) +queue = awa.PartitionedQueue("customer-updates", 4) -@client.task(UpdateCustomer, queue=fanout.physical_queues[0]) +@client.task(UpdateCustomer, queue=queue.physical_queues[0]) async def update_customer(job): ... -await client.start(fanout.queue_configs(max_workers_per_queue=16)) +await client.start(queue.queue_configs(max_workers_per_partition=16)) await client.insert( UpdateCustomer(customer_id=42, payload=...), - **fanout.route_by_key("customer-42"), + **queue.route_by_key("customer-42"), ) ``` -Register the handler once and pass explicit fanout queue configs to `start()`. Python handlers are dispatched by job kind; the queue name on `@client.task` gives `start()` a declared queue to validate. +Register the handler once and pass explicit partition configs to `start()`. Python handlers are dispatched by job kind; the queue name on `@client.task` gives `start()` a declared queue to validate. -`route_by_key()` returns `queue` and `ordering_key`, so jobs for the same key pick the same physical queue and keep per-key FIFO. `route_by_index()` returns a round-robin queue for workloads that do not need per-key ordering. The worker `queue_configs()` helper is explicit about `max_workers_per_queue` because each physical queue is configured independently; pass `global_max_workers` to `start()` if you need a logical fleet-wide cap. +`route_by_key()` returns `queue` and `ordering_key`, so jobs for the same key pick the same physical queue and keep per-key FIFO. `route_by_index()` returns a round-robin queue for workloads that do not need per-key ordering. The worker `queue_configs()` helper is explicit about `max_workers_per_partition` because each physical queue is configured independently; pass `global_max_workers` to `start()` if you need a logical fleet-wide cap. -`enqueue_many_copy()` is a homogeneous batch API: one queue and one optional ordering key per call. For mixed-key fanout batches, group jobs by physical queue and ordering key before calling it, or use individual inserts when preserving per-key FIFO matters more than batch size. +`insert_many_copy()` and `enqueue_many_copy()` accept per-job `opts`, so a mixed-partition batch can still use one COPY call: + +```python +await client.enqueue_many_copy( + jobs, + opts=[queue.route_by_key(job.customer_id) for job in jobs], +) +``` ## Documentation diff --git a/awa-python/python/awa/__init__.py b/awa-python/python/awa/__init__.py index 020f0e6f..4dce45b0 100644 --- a/awa-python/python/awa/__init__.py +++ b/awa-python/python/awa/__init__.py @@ -6,7 +6,7 @@ # Job types Job, JobState, - QueueFanout, + PartitionedQueue, HealthCheck, QueueHealth, QueueStat, @@ -67,7 +67,7 @@ # Job types "Job", "JobState", - "QueueFanout", + "PartitionedQueue", "HealthCheck", "QueueHealth", "QueueStat", diff --git a/awa-python/python/awa/_awa.pyi b/awa-python/python/awa/_awa.pyi index d01625e6..64ca423e 100644 --- a/awa-python/python/awa/_awa.pyi +++ b/awa-python/python/awa/_awa.pyi @@ -19,31 +19,31 @@ class JobState(IntEnum): WaitingExternal = ... def __str__(self) -> str: ... -class QueueFanout: - """Deterministic physical-queue fanout for one hot logical queue.""" +class PartitionedQueue: + """Deterministic physical queue partitions for one hot logical queue.""" - def __init__(self, logical_queue: str, width: int) -> None: ... + def __init__(self, logical_queue: str, partitions: int) -> None: ... @classmethod def from_physical_queues( cls, logical_queue: str, physical_queues: Sequence[str] - ) -> QueueFanout: ... + ) -> PartitionedQueue: ... @property def logical_queue(self) -> str: ... @property def physical_queues(self) -> list[str]: ... @property - def width(self) -> int: ... + def partitions(self) -> int: ... def queue_for_key(self, key: bytes | str) -> str: ... def queue_for_index(self, index: int) -> str: ... def route_by_key(self, key: bytes | str) -> dict[str, str | bytes]: ... def route_by_index(self, index: int) -> dict[str, str]: ... def queue_configs( self, - max_workers_per_queue: int | None = None, + max_workers_per_partition: int | None = None, *, - min_workers_per_queue: int | None = None, + min_workers_per_partition: int | None = None, weight: int = 1, - rate_limit_per_queue: tuple[float, int] | None = None, + rate_limit_per_partition: tuple[float, int] | None = None, priority_aging_interval_ms: int | None = None, deadline_duration_ms: int | None = None, claimers: int | None = None, @@ -507,6 +507,7 @@ class Client: run_at: datetime.datetime | None = None, unique_opts: dict[str, Any] | None = None, ordering_key: bytes | str | None = None, + opts: list[dict[str, Any] | None] | None = None, ) -> list[Job[dict[str, Any]]]: ... async def enqueue_many_copy( self, @@ -521,6 +522,8 @@ class Client: run_at: datetime.datetime | None = None, unique_opts: dict[str, Any] | None = None, ordering_key: bytes | str | None = None, + opts: list[dict[str, Any] | None] | None = None, + queue_storage_queue_stripe_count: int = 1, ) -> int: ... def periodic( self, @@ -699,6 +702,7 @@ class Client: run_at: datetime.datetime | None = None, unique_opts: dict[str, Any] | None = None, ordering_key: bytes | str | None = None, + opts: list[dict[str, Any] | None] | None = None, ) -> list[Job[dict[str, Any]]]: ... def enqueue_many_copy_sync( self, @@ -713,6 +717,8 @@ class Client: run_at: datetime.datetime | None = None, unique_opts: dict[str, Any] | None = None, ordering_key: bytes | str | None = None, + opts: list[dict[str, Any] | None] | None = None, + queue_storage_queue_stripe_count: int = 1, ) -> int: ... # External callback completion (sync) def complete_external_sync( diff --git a/awa-python/python/awa/client.py b/awa-python/python/awa/client.py index 57acf38d..bba682fa 100644 --- a/awa-python/python/awa/client.py +++ b/awa-python/python/awa/client.py @@ -162,8 +162,13 @@ async def insert_many_copy( run_at: Any | None = None, unique_opts: dict[str, Any] | None = None, ordering_key: bytes | str | None = None, + opts: list[dict[str, Any] | None] | None = None, ) -> list[Job]: - """Bulk insert jobs through the compatibility COPY surface.""" + """Bulk insert jobs through the compatibility COPY surface. + + Batch-level keyword arguments are defaults. ``opts`` can override + ``queue`` and ``ordering_key`` per job. + """ return await self._raw.insert_many_copy( jobs, kind=kind, @@ -175,6 +180,7 @@ async def insert_many_copy( run_at=run_at, unique_opts=unique_opts, ordering_key=ordering_key, + opts=opts, ) async def enqueue_many_copy( @@ -190,9 +196,14 @@ async def enqueue_many_copy( run_at: Any | None = None, unique_opts: dict[str, Any] | None = None, ordering_key: bytes | str | None = None, + opts: list[dict[str, Any] | None] | None = None, queue_storage_queue_stripe_count: int = DEFAULT_QUEUE_STORAGE_QUEUE_STRIPE_COUNT, ) -> int: - """Enqueue jobs into active queue storage using direct COPY.""" + """Enqueue jobs into active queue storage using direct COPY. + + Batch-level keyword arguments are defaults. ``opts`` can override + ``queue`` and ``ordering_key`` per job. + """ return await self._raw.enqueue_many_copy( jobs, kind=kind, @@ -204,6 +215,7 @@ async def enqueue_many_copy( run_at=run_at, unique_opts=unique_opts, ordering_key=ordering_key, + opts=opts, queue_storage_queue_stripe_count=queue_storage_queue_stripe_count, ) @@ -981,8 +993,13 @@ def insert_many_copy( run_at: Any | None = None, unique_opts: dict[str, Any] | None = None, ordering_key: bytes | str | None = None, + opts: list[dict[str, Any] | None] | None = None, ) -> list[Job]: - """Bulk insert jobs through the compatibility COPY surface.""" + """Bulk insert jobs through the compatibility COPY surface. + + Batch-level keyword arguments are defaults. ``opts`` can override + ``queue`` and ``ordering_key`` per job. + """ return self._raw.insert_many_copy_sync( jobs, kind=kind, @@ -994,6 +1011,7 @@ def insert_many_copy( run_at=run_at, unique_opts=unique_opts, ordering_key=ordering_key, + opts=opts, ) def enqueue_many_copy( @@ -1009,9 +1027,14 @@ def enqueue_many_copy( run_at: Any | None = None, unique_opts: dict[str, Any] | None = None, ordering_key: bytes | str | None = None, + opts: list[dict[str, Any] | None] | None = None, queue_storage_queue_stripe_count: int = DEFAULT_QUEUE_STORAGE_QUEUE_STRIPE_COUNT, ) -> int: - """Enqueue jobs into active queue storage using direct COPY.""" + """Enqueue jobs into active queue storage using direct COPY. + + Batch-level keyword arguments are defaults. ``opts`` can override + ``queue`` and ``ordering_key`` per job. + """ return self._raw.enqueue_many_copy_sync( jobs, kind=kind, @@ -1023,6 +1046,7 @@ def enqueue_many_copy( run_at=run_at, unique_opts=unique_opts, ordering_key=ordering_key, + opts=opts, queue_storage_queue_stripe_count=queue_storage_queue_stripe_count, ) diff --git a/awa-python/src/client.rs b/awa-python/src/client.rs index ad0a805a..3141fa87 100644 --- a/awa-python/src/client.rs +++ b/awa-python/src/client.rs @@ -1,5 +1,7 @@ use crate::args::{derive_kind, get_type_class_name, serialize_args}; -use crate::errors::{map_awa_error, map_connect_error, map_sqlx_error, state_error}; +use crate::errors::{ + map_awa_error, map_connect_error, map_sqlx_error, state_error, validation_error, +}; use crate::job::{py_to_json, PyJob}; use crate::transaction::{ insert_raw_job, parse_ordering_key, parse_run_at, parse_unique_opts, PySyncTransaction, @@ -2175,7 +2177,7 @@ impl PyClient { // ── COPY batch insert/enqueue (async + sync) ──────────────────── - #[pyo3(signature = (jobs, *, kind=None, queue="default".to_string(), priority=2, max_attempts=25, tags=vec![], metadata=None, run_at=None, unique_opts=None, ordering_key=None))] + #[pyo3(signature = (jobs, *, kind=None, queue="default".to_string(), priority=2, max_attempts=25, tags=vec![], metadata=None, run_at=None, unique_opts=None, ordering_key=None, opts=None))] #[allow(clippy::too_many_arguments)] fn insert_many_copy<'py>( &self, @@ -2190,6 +2192,7 @@ impl PyClient { run_at: Option>, unique_opts: Option>, ordering_key: Option>, + opts: Option>>, ) -> PyResult> { let pool = self.pool.clone(); let insert_params = prepare_insert_many_params( @@ -2204,6 +2207,7 @@ impl PyClient { run_at.as_ref(), unique_opts.as_ref(), ordering_key.as_ref(), + opts.as_deref(), )?; pyo3_async_runtimes::tokio::future_into_py(py, async move { @@ -2220,7 +2224,7 @@ impl PyClient { }) } - #[pyo3(signature = (jobs, *, kind=None, queue="default".to_string(), priority=2, max_attempts=25, tags=vec![], metadata=None, run_at=None, unique_opts=None, ordering_key=None))] + #[pyo3(signature = (jobs, *, kind=None, queue="default".to_string(), priority=2, max_attempts=25, tags=vec![], metadata=None, run_at=None, unique_opts=None, ordering_key=None, opts=None))] #[allow(clippy::too_many_arguments)] fn insert_many_copy_sync( &self, @@ -2235,6 +2239,7 @@ impl PyClient { run_at: Option>, unique_opts: Option>, ordering_key: Option>, + opts: Option>>, ) -> PyResult> { let pool = self.pool.clone(); let insert_params = prepare_insert_many_params( @@ -2249,6 +2254,7 @@ impl PyClient { run_at.as_ref(), unique_opts.as_ref(), ordering_key.as_ref(), + opts.as_deref(), )?; py.detach(|| { @@ -2261,7 +2267,7 @@ impl PyClient { }) } - #[pyo3(signature = (jobs, *, kind=None, queue="default".to_string(), priority=2, max_attempts=25, tags=vec![], metadata=None, run_at=None, unique_opts=None, ordering_key=None, queue_storage_queue_stripe_count=1))] + #[pyo3(signature = (jobs, *, kind=None, queue="default".to_string(), priority=2, max_attempts=25, tags=vec![], metadata=None, run_at=None, unique_opts=None, ordering_key=None, opts=None, queue_storage_queue_stripe_count=1))] #[allow(clippy::too_many_arguments)] fn enqueue_many_copy<'py>( &self, @@ -2276,6 +2282,7 @@ impl PyClient { run_at: Option>, unique_opts: Option>, ordering_key: Option>, + opts: Option>>, queue_storage_queue_stripe_count: u32, ) -> PyResult> { if queue_storage_queue_stripe_count == 0 { @@ -2298,6 +2305,7 @@ impl PyClient { run_at.as_ref(), unique_opts.as_ref(), ordering_key.as_ref(), + opts.as_deref(), )?; pyo3_async_runtimes::tokio::future_into_py(py, async move { @@ -2325,7 +2333,7 @@ impl PyClient { }) } - #[pyo3(signature = (jobs, *, kind=None, queue="default".to_string(), priority=2, max_attempts=25, tags=vec![], metadata=None, run_at=None, unique_opts=None, ordering_key=None, queue_storage_queue_stripe_count=1))] + #[pyo3(signature = (jobs, *, kind=None, queue="default".to_string(), priority=2, max_attempts=25, tags=vec![], metadata=None, run_at=None, unique_opts=None, ordering_key=None, opts=None, queue_storage_queue_stripe_count=1))] #[allow(clippy::too_many_arguments)] fn enqueue_many_copy_sync( &self, @@ -2340,6 +2348,7 @@ impl PyClient { run_at: Option>, unique_opts: Option>, ordering_key: Option>, + opts: Option>>, queue_storage_queue_stripe_count: u32, ) -> PyResult { if queue_storage_queue_stripe_count == 0 { @@ -2362,6 +2371,7 @@ impl PyClient { run_at.as_ref(), unique_opts.as_ref(), ordering_key.as_ref(), + opts.as_deref(), )?; py.detach(|| { @@ -3141,6 +3151,12 @@ impl PyClient { } /// Convert a list of Python job args into InsertParams for the COPY path. +#[derive(Debug, Clone, Default)] +struct PerJobInsertOpts { + queue: Option, + ordering_key: Option>>, +} + #[allow(clippy::too_many_arguments)] fn prepare_insert_many_params( py: Python<'_>, @@ -3154,6 +3170,7 @@ fn prepare_insert_many_params( run_at: Option<&Py>, unique_opts: Option<&Py>, ordering_key: Option<&Py>, + opts: Option<&[Py]>, ) -> PyResult> { let metadata_json = metadata .map(|value| py_to_json(py, value.bind(py))) @@ -3168,9 +3185,11 @@ fn prepare_insert_many_params( let ordering_key = ordering_key .map(|value| parse_ordering_key(py, value.bind(py))) .transpose()?; + let per_job_opts = parse_per_job_insert_opts(py, opts, jobs.len())?; jobs.iter() - .map(|job| { + .enumerate() + .map(|(idx, job)| { let bound = job.bind(py); let kind_str = match &kind { Some(k) => k.clone(), @@ -3180,18 +3199,27 @@ fn prepare_insert_many_params( }?, }; let args_json = serialize_args(py, bound)?; + let job_opts = per_job_opts.as_ref().map(|items| &items[idx]); + let job_queue = job_opts + .and_then(|opts| opts.queue.as_ref()) + .cloned() + .unwrap_or_else(|| queue.to_string()); + let job_ordering_key = match job_opts.and_then(|opts| opts.ordering_key.as_ref()) { + Some(value) => value.clone(), + None => ordering_key.clone(), + }; Ok(InsertParams { kind: kind_str, args: args_json, opts: InsertOpts { - queue: queue.to_string(), + queue: job_queue, priority, max_attempts, run_at: run_at_dt, metadata: metadata_json.clone(), tags: tags.to_vec(), unique: unique.clone(), - ordering_key: ordering_key.clone(), + ordering_key: job_ordering_key, ..Default::default() }, }) @@ -3199,6 +3227,76 @@ fn prepare_insert_many_params( .collect() } +fn parse_per_job_insert_opts( + py: Python<'_>, + opts: Option<&[Py]>, + jobs_len: usize, +) -> PyResult>> { + let Some(opts) = opts else { + return Ok(None); + }; + + if opts.len() != jobs_len { + return Err(validation_error(format!( + "opts length must match jobs length (got {}, expected {})", + opts.len(), + jobs_len + ))); + } + + opts.iter() + .map(|item| parse_one_job_insert_opts(py, item.bind(py))) + .collect::>>() + .map(Some) +} + +fn parse_one_job_insert_opts( + py: Python<'_>, + item: &Bound<'_, PyAny>, +) -> PyResult { + if item.is_none() { + return Ok(PerJobInsertOpts::default()); + } + + let dict: &Bound<'_, PyDict> = item + .cast() + .map_err(|_| validation_error("each opts entry must be a dict or None"))?; + + for (key, _) in dict.iter() { + let key: String = key + .extract() + .map_err(|_| validation_error("opts keys must be strings"))?; + match key.as_str() { + "queue" | "ordering_key" => {} + other => { + return Err(validation_error(format!( + "unsupported opts key '{other}'; expected 'queue' or 'ordering_key'" + ))); + } + } + } + + let queue = dict + .get_item("queue")? + .filter(|value| !value.is_none()) + .map(|value| value.extract::()) + .transpose()?; + if queue.as_deref() == Some("") { + return Err(validation_error("opts queue must not be empty")); + } + + let ordering_key = match dict.get_item("ordering_key")? { + None => None, + Some(value) if value.is_none() => Some(None), + Some(value) => Some(Some(parse_ordering_key(py, &value)?)), + }; + + Ok(PerJobInsertOpts { + queue, + ordering_key, + }) +} + /// Parsed queue configuration from Python input. struct ParsedQueueConfig { name: String, diff --git a/awa-python/src/lib.rs b/awa-python/src/lib.rs index 83029fcf..b919ba8d 100644 --- a/awa-python/src/lib.rs +++ b/awa-python/src/lib.rs @@ -4,7 +4,7 @@ mod client; mod dlq; mod errors; mod job; -mod queue_fanout; +mod partitioned_queue; mod telemetry; mod transaction; mod worker; @@ -96,7 +96,7 @@ fn _awa(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; + m.add_class::()?; // Functions m.add_function(wrap_pyfunction!(derive_kind, m)?)?; diff --git a/awa-python/src/queue_fanout.rs b/awa-python/src/partitioned_queue.rs similarity index 64% rename from awa-python/src/queue_fanout.rs rename to awa-python/src/partitioned_queue.rs index 11843433..13229aa6 100644 --- a/awa-python/src/queue_fanout.rs +++ b/awa-python/src/partitioned_queue.rs @@ -3,22 +3,22 @@ use crate::transaction::parse_ordering_key; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyDict, PyList, PyType}; -#[pyclass(frozen, name = "QueueFanout", skip_from_py_object)] +#[pyclass(frozen, name = "PartitionedQueue", skip_from_py_object)] #[derive(Debug, Clone)] -pub struct PyQueueFanout { - inner: awa_model::QueueFanout, +pub struct PyPartitionedQueue { + inner: awa_model::PartitionedQueue, } -impl PyQueueFanout { - fn from_inner(inner: awa_model::QueueFanout) -> Self { +impl PyPartitionedQueue { + fn from_inner(inner: awa_model::PartitionedQueue) -> Self { Self { inner } } - fn positive_width(width: i64) -> PyResult { - if width <= 0 { - return Err(validation_error("queue fanout width must be > 0")); + fn positive_partitions(partitions: i64) -> PyResult { + if partitions <= 0 { + return Err(validation_error("partitioned queue partitions must be > 0")); } - Ok(width as usize) + Ok(partitions as usize) } fn non_negative_index(index: i64) -> PyResult { @@ -29,19 +29,19 @@ impl PyQueueFanout { } } -fn map_fanout_error(err: awa_model::QueueFanoutError) -> PyErr { +fn map_partitioned_queue_error(err: awa_model::PartitionedQueueError) -> PyErr { validation_error(err.to_string()) } #[pymethods] -impl PyQueueFanout { +impl PyPartitionedQueue { #[new] - #[pyo3(signature = (logical_queue, width))] - fn new(logical_queue: String, width: i64) -> PyResult { - let width = Self::positive_width(width)?; - awa_model::QueueFanout::new(logical_queue, width) + #[pyo3(signature = (logical_queue, partitions))] + fn new(logical_queue: String, partitions: i64) -> PyResult { + let partitions = Self::positive_partitions(partitions)?; + awa_model::PartitionedQueue::new(logical_queue, partitions) .map(Self::from_inner) - .map_err(map_fanout_error) + .map_err(map_partitioned_queue_error) } #[classmethod] @@ -50,9 +50,9 @@ impl PyQueueFanout { logical_queue: String, physical_queues: Vec, ) -> PyResult { - awa_model::QueueFanout::from_physical_queues(logical_queue, physical_queues) + awa_model::PartitionedQueue::from_physical_queues(logical_queue, physical_queues) .map(Self::from_inner) - .map_err(map_fanout_error) + .map_err(map_partitioned_queue_error) } #[getter] @@ -66,8 +66,8 @@ impl PyQueueFanout { } #[getter] - fn width(&self) -> usize { - self.inner.width() + fn partitions(&self) -> usize { + self.inner.partitions() } fn queue_for_key(&self, py: Python<'_>, key: Py) -> PyResult { @@ -83,7 +83,7 @@ impl PyQueueFanout { /// Return keyword arguments for inserting a job routed by key. /// /// The returned dict has `queue` and `ordering_key`, so it can be passed - /// directly to `client.insert(..., **fanout.route_by_key(key))`. + /// directly to `client.insert(..., **queue.route_by_key(key))`. fn route_by_key(&self, py: Python<'_>, key: Py) -> PyResult> { let key = parse_ordering_key(py, key.bind(py))?; let kwargs = PyDict::new(py); @@ -100,37 +100,39 @@ impl PyQueueFanout { Ok(kwargs.into_any().unbind()) } - /// Expand the fanout into Python `start()` queue config dictionaries. + /// Expand the partitioned queue into Python `start()` queue config dictionaries. /// - /// Capacity arguments are named per physical queue because the returned - /// config declares each physical queue independently. - #[pyo3(signature = (max_workers_per_queue=None, *, min_workers_per_queue=None, weight=1, rate_limit_per_queue=None, priority_aging_interval_ms=None, deadline_duration_ms=None, claimers=None, claim_batch_size=None))] + /// Capacity arguments are named per partition because the returned config + /// declares each physical queue partition independently. + #[pyo3(signature = (max_workers_per_partition=None, *, min_workers_per_partition=None, weight=1, rate_limit_per_partition=None, priority_aging_interval_ms=None, deadline_duration_ms=None, claimers=None, claim_batch_size=None))] #[allow(clippy::too_many_arguments)] fn queue_configs( &self, py: Python<'_>, - max_workers_per_queue: Option, - min_workers_per_queue: Option, + max_workers_per_partition: Option, + min_workers_per_partition: Option, weight: u32, - rate_limit_per_queue: Option>, + rate_limit_per_partition: Option>, priority_aging_interval_ms: Option, deadline_duration_ms: Option, claimers: Option, claim_batch_size: Option, ) -> PyResult> { - match (max_workers_per_queue, min_workers_per_queue) { + match (max_workers_per_partition, min_workers_per_partition) { (Some(_), Some(_)) => { return Err(validation_error( - "use max_workers_per_queue or min_workers_per_queue, not both", + "use max_workers_per_partition or min_workers_per_partition, not both", )); } (None, None) => { return Err(validation_error( - "queue_configs requires max_workers_per_queue or min_workers_per_queue", + "queue_configs requires max_workers_per_partition or min_workers_per_partition", )); } (Some(0), None) => { - return Err(validation_error("max_workers_per_queue must be > 0")); + return Err(validation_error( + "max_workers_per_partition must be > 0", + )); } _ => {} } @@ -148,14 +150,14 @@ impl PyQueueFanout { for queue in self.inner.physical_queues() { let config = PyDict::new(py); config.set_item("name", queue)?; - if let Some(max_workers) = max_workers_per_queue { + if let Some(max_workers) = max_workers_per_partition { config.set_item("max_workers", max_workers)?; } - if let Some(min_workers) = min_workers_per_queue { + if let Some(min_workers) = min_workers_per_partition { config.set_item("min_workers", min_workers)?; config.set_item("weight", weight)?; } - if let Some(rate_limit) = rate_limit_per_queue.as_ref() { + if let Some(rate_limit) = rate_limit_per_partition.as_ref() { config.set_item("rate_limit", rate_limit.bind(py))?; } if let Some(value) = priority_aging_interval_ms { @@ -177,14 +179,14 @@ impl PyQueueFanout { } fn __len__(&self) -> usize { - self.inner.width() + self.inner.partitions() } fn __repr__(&self) -> String { format!( - "QueueFanout(logical_queue={:?}, width={})", + "PartitionedQueue(logical_queue={:?}, partitions={})", self.inner.logical_queue(), - self.inner.width() + self.inner.partitions() ) } } diff --git a/awa-python/tests/test_awa.py b/awa-python/tests/test_awa.py index 40d02033..64b420a9 100644 --- a/awa-python/tests/test_awa.py +++ b/awa-python/tests/test_awa.py @@ -354,6 +354,63 @@ async def test_enqueue_many_copy_queue_storage(client): assert counts["available_count"] == 2 +@pytest.mark.asyncio +async def test_copy_batch_per_job_opts_route_mixed_queues(client): + """COPY batch APIs accept per-job queue/ordering_key overrides.""" + inserted = await client.insert_many_copy( + [ + SendEmail(to="compat-a@example.com", subject="Compat A"), + SendEmail(to="compat-b@example.com", subject="Compat B"), + ], + queue="unused_default", + opts=[ + {"queue": "compat_partition_a", "ordering_key": "customer-a"}, + {"queue": "compat_partition_b", "ordering_key": None}, + ], + ) + assert [job.queue for job in inserted] == [ + "compat_partition_a", + "compat_partition_b", + ] + + schema = "awa_py_enqueue_many_copy_opts" + queue = awa.PartitionedQueue("py_partitioned_copy", 2) + await client.install_queue_storage(schema=schema, reset=True) + + count = await client.enqueue_many_copy( + [ + SendEmail(to="direct-a@example.com", subject="Direct A"), + SendEmail(to="direct-b@example.com", subject="Direct B"), + ], + queue="unused_default", + opts=[ + queue.route_by_index(0), + queue.route_by_index(1), + ], + ) + assert count == 2 + + tx = await client.transaction() + rows = await tx.fetch_all( + f""" + SELECT queue, count(*)::bigint AS count + FROM {schema}.ready_entries + WHERE queue IN ($1, $2) + GROUP BY queue + ORDER BY queue + """, + queue.queue_for_index(0), + queue.queue_for_index(1), + ) + await tx.execute("DELETE FROM awa.runtime_storage_backends WHERE backend = 'queue_storage'") + await tx.commit() + + assert [(row["queue"], row["count"]) for row in rows] == [ + (queue.queue_for_index(0), 1), + (queue.queue_for_index(1), 1), + ] + + @pytest.mark.asyncio async def test_ordering_key_routes_all_async_python_insert_paths(client): """ordering_key pins native async client, tx, and COPY inserts.""" diff --git a/awa-python/tests/test_partitioned_queue.py b/awa-python/tests/test_partitioned_queue.py new file mode 100644 index 00000000..a31cbb2a --- /dev/null +++ b/awa-python/tests/test_partitioned_queue.py @@ -0,0 +1,121 @@ +"""Tests for the Python PartitionedQueue helper.""" + +import pytest + +import awa + + +def test_partitioned_queue_default_names_and_key_routing(): + partitioned_queue = awa.PartitionedQueue("customer-updates", 4) + + assert partitioned_queue.logical_queue == "customer-updates" + assert partitioned_queue.partitions == 4 + assert len(partitioned_queue) == 4 + assert partitioned_queue.physical_queues == [ + "customer-updates", + "customer-updates__p1", + "customer-updates__p2", + "customer-updates__p3", + ] + assert partitioned_queue.queue_for_key(b"customer-42") in partitioned_queue.physical_queues + assert partitioned_queue.queue_for_key("customer-42") == partitioned_queue.queue_for_key( + b"customer-42" + ) + assert partitioned_queue.route_by_key("customer-42") == { + "queue": partitioned_queue.queue_for_key(b"customer-42"), + "ordering_key": b"customer-42", + } + + +def test_partitioned_queue_round_robin_index_routing(): + partitioned_queue = awa.PartitionedQueue("email", 4) + + assert partitioned_queue.queue_for_index(0) == "email" + assert partitioned_queue.queue_for_index(5) == "email__p1" + assert partitioned_queue.route_by_index(5) == {"queue": "email__p1"} + + with pytest.raises(awa.ValidationError, match="index must be >= 0"): + partitioned_queue.queue_for_index(-1) + with pytest.raises(awa.ValidationError, match="index must be >= 0"): + partitioned_queue.route_by_index(-1) + + +def test_partitioned_queue_explicit_physical_queues(): + partitioned_queue = awa.PartitionedQueue.from_physical_queues( + "email", ["email-fast", "email-slow"] + ) + + assert partitioned_queue.logical_queue == "email" + assert partitioned_queue.partitions == 2 + assert partitioned_queue.physical_queues == ["email-fast", "email-slow"] + + +def test_partitioned_queue_queue_configs_hard_reserved(): + partitioned_queue = awa.PartitionedQueue("email", 2) + + assert partitioned_queue.queue_configs( + max_workers_per_partition=8, + rate_limit_per_partition=(100.0, 200), + claimers=2, + claim_batch_size=64, + ) == [ + { + "name": "email", + "max_workers": 8, + "rate_limit": (100.0, 200), + "claimers": 2, + "claim_batch_size": 64, + }, + { + "name": "email__p1", + "max_workers": 8, + "rate_limit": (100.0, 200), + "claimers": 2, + "claim_batch_size": 64, + }, + ] + + +def test_partitioned_queue_queue_configs_weighted(): + partitioned_queue = awa.PartitionedQueue("email", 2) + + assert partitioned_queue.queue_configs(min_workers_per_partition=0, weight=3) == [ + {"name": "email", "min_workers": 0, "weight": 3}, + {"name": "email__p1", "min_workers": 0, "weight": 3}, + ] + + +def test_partitioned_queue_validation_errors(): + with pytest.raises(awa.ValidationError, match="logical queue must not be empty"): + awa.PartitionedQueue("", 4) + with pytest.raises(awa.ValidationError, match="partitions must be > 0"): + awa.PartitionedQueue("email", 0) + with pytest.raises(awa.ValidationError, match="partitions must be > 0"): + awa.PartitionedQueue("email", -1) + with pytest.raises(awa.ValidationError, match="partitions must be > 0"): + awa.PartitionedQueue.from_physical_queues("email", []) + with pytest.raises(awa.ValidationError, match="physical queue must not be empty"): + awa.PartitionedQueue.from_physical_queues("email", ["email-a", ""]) + with pytest.raises(awa.ValidationError, match="duplicated"): + awa.PartitionedQueue.from_physical_queues("email", ["email-a", "email-a"]) + + partitioned_queue = awa.PartitionedQueue("email", 2) + with pytest.raises(awa.ValidationError, match="ordering_key must be bytes-like or str"): + partitioned_queue.queue_for_key(object()) + with pytest.raises(awa.ValidationError, match="ordering_key must be bytes-like or str"): + partitioned_queue.route_by_key(object()) + with pytest.raises(awa.ValidationError, match="requires max_workers_per_partition"): + partitioned_queue.queue_configs() + with pytest.raises(awa.ValidationError, match="not both"): + partitioned_queue.queue_configs( + max_workers_per_partition=1, + min_workers_per_partition=1, + ) + with pytest.raises(awa.ValidationError, match="max_workers_per_partition must be > 0"): + partitioned_queue.queue_configs(max_workers_per_partition=0) + with pytest.raises(awa.ValidationError, match="weight must be > 0"): + partitioned_queue.queue_configs(min_workers_per_partition=0, weight=0) + with pytest.raises(awa.ValidationError, match="claimers must be > 0"): + partitioned_queue.queue_configs(max_workers_per_partition=1, claimers=0) + with pytest.raises(awa.ValidationError, match="claim_batch_size must be > 0"): + partitioned_queue.queue_configs(max_workers_per_partition=1, claim_batch_size=0) diff --git a/awa-python/tests/test_queue_fanout.py b/awa-python/tests/test_queue_fanout.py deleted file mode 100644 index 68980f78..00000000 --- a/awa-python/tests/test_queue_fanout.py +++ /dev/null @@ -1,116 +0,0 @@ -"""Tests for the Python QueueFanout helper.""" - -import pytest - -import awa - - -def test_queue_fanout_default_names_and_key_routing(): - fanout = awa.QueueFanout("customer-updates", 4) - - assert fanout.logical_queue == "customer-updates" - assert fanout.width == 4 - assert len(fanout) == 4 - assert fanout.physical_queues == [ - "customer-updates__p0", - "customer-updates__p1", - "customer-updates__p2", - "customer-updates__p3", - ] - assert fanout.queue_for_key(b"customer-42") == "customer-updates__p0" - assert fanout.queue_for_key("customer-42") == "customer-updates__p0" - assert fanout.route_by_key("customer-42") == { - "queue": "customer-updates__p0", - "ordering_key": b"customer-42", - } - - -def test_queue_fanout_round_robin_index_routing(): - fanout = awa.QueueFanout("email", 4) - - assert fanout.queue_for_index(0) == "email__p0" - assert fanout.queue_for_index(5) == "email__p1" - assert fanout.route_by_index(5) == {"queue": "email__p1"} - - with pytest.raises(awa.ValidationError, match="index must be >= 0"): - fanout.queue_for_index(-1) - with pytest.raises(awa.ValidationError, match="index must be >= 0"): - fanout.route_by_index(-1) - - -def test_queue_fanout_explicit_physical_queues(): - fanout = awa.QueueFanout.from_physical_queues( - "email", ["email-fast", "email-slow"] - ) - - assert fanout.logical_queue == "email" - assert fanout.width == 2 - assert fanout.physical_queues == ["email-fast", "email-slow"] - - -def test_queue_fanout_queue_configs_hard_reserved(): - fanout = awa.QueueFanout("email", 2) - - assert fanout.queue_configs( - max_workers_per_queue=8, - rate_limit_per_queue=(100.0, 200), - claimers=2, - claim_batch_size=64, - ) == [ - { - "name": "email__p0", - "max_workers": 8, - "rate_limit": (100.0, 200), - "claimers": 2, - "claim_batch_size": 64, - }, - { - "name": "email__p1", - "max_workers": 8, - "rate_limit": (100.0, 200), - "claimers": 2, - "claim_batch_size": 64, - }, - ] - - -def test_queue_fanout_queue_configs_weighted(): - fanout = awa.QueueFanout("email", 2) - - assert fanout.queue_configs(min_workers_per_queue=0, weight=3) == [ - {"name": "email__p0", "min_workers": 0, "weight": 3}, - {"name": "email__p1", "min_workers": 0, "weight": 3}, - ] - - -def test_queue_fanout_validation_errors(): - with pytest.raises(awa.ValidationError, match="logical queue must not be empty"): - awa.QueueFanout("", 4) - with pytest.raises(awa.ValidationError, match="width must be > 0"): - awa.QueueFanout("email", 0) - with pytest.raises(awa.ValidationError, match="width must be > 0"): - awa.QueueFanout("email", -1) - with pytest.raises(awa.ValidationError, match="width must be > 0"): - awa.QueueFanout.from_physical_queues("email", []) - with pytest.raises(awa.ValidationError, match="physical queue must not be empty"): - awa.QueueFanout.from_physical_queues("email", ["email-a", ""]) - with pytest.raises(awa.ValidationError, match="duplicated"): - awa.QueueFanout.from_physical_queues("email", ["email-a", "email-a"]) - - fanout = awa.QueueFanout("email", 2) - with pytest.raises(awa.ValidationError, match="ordering_key must be bytes-like or str"): - fanout.queue_for_key(object()) - with pytest.raises(awa.ValidationError, match="ordering_key must be bytes-like or str"): - fanout.route_by_key(object()) - with pytest.raises(awa.ValidationError, match="requires max_workers_per_queue"): - fanout.queue_configs() - with pytest.raises(awa.ValidationError, match="not both"): - fanout.queue_configs(max_workers_per_queue=1, min_workers_per_queue=1) - with pytest.raises(awa.ValidationError, match="max_workers_per_queue must be > 0"): - fanout.queue_configs(max_workers_per_queue=0) - with pytest.raises(awa.ValidationError, match="weight must be > 0"): - fanout.queue_configs(min_workers_per_queue=0, weight=0) - with pytest.raises(awa.ValidationError, match="claimers must be > 0"): - fanout.queue_configs(max_workers_per_queue=1, claimers=0) - with pytest.raises(awa.ValidationError, match="claim_batch_size must be > 0"): - fanout.queue_configs(max_workers_per_queue=1, claim_batch_size=0) diff --git a/awa-python/tests/test_start_config.py b/awa-python/tests/test_start_config.py index be1dea26..8a68c012 100644 --- a/awa-python/tests/test_start_config.py +++ b/awa-python/tests/test_start_config.py @@ -131,47 +131,47 @@ async def handle(job): @pytest.mark.asyncio -async def test_queue_fanout_configs_expand_and_dispatch(client): - """QueueFanout queue_configs() can be passed directly to start().""" - fanout = awa.QueueFanout("cfg_fanout", 2) +async def test_partitioned_queue_configs_expand_and_dispatch(client): + """PartitionedQueue queue_configs() can be passed directly to start().""" + partitioned_queue = awa.PartitionedQueue("cfg_partitioned_queue", 2) handled = [] - @client.task(ConfigTestJob, queue=fanout.physical_queues[0]) + @client.task(ConfigTestJob, queue=partitioned_queue.physical_queues[0]) async def handle(job): handled.append(job.queue) return None await client.start( - fanout.queue_configs(max_workers_per_queue=1), + partitioned_queue.queue_configs(max_workers_per_partition=1), poll_interval_ms=25, ) job = await client.insert( ConfigTestJob("physical-1"), - queue=fanout.queue_for_index(1), + queue=partitioned_queue.queue_for_index(1), ) result = await wait_for_job_state(client, job.id, awa.JobState.Completed, timeout=5.0) await client.shutdown() assert result.state == awa.JobState.Completed - assert handled == [fanout.queue_for_index(1)] + assert handled == [partitioned_queue.queue_for_index(1)] @pytest.mark.asyncio -async def test_duplicate_fanout_queue_config_is_rejected(client): +async def test_duplicate_partitioned_queue_queue_config_is_rejected(client): """Duplicate physical queue declarations are rejected before dispatchers start.""" - fanout = awa.QueueFanout("cfg_fanout_duplicate", 2) + partitioned_queue = awa.PartitionedQueue("cfg_partitioned_queue_duplicate", 2) - @client.task(ConfigTestJob, queue=fanout.physical_queues[0]) + @client.task(ConfigTestJob, queue=partitioned_queue.physical_queues[0]) async def handle(job): return None with pytest.raises(awa.AwaError, match="configured more than once"): await client.start( [ - {"name": fanout.physical_queues[0], "max_workers": 1}, - *fanout.queue_configs(max_workers_per_queue=1), + {"name": partitioned_queue.physical_queues[0], "max_workers": 1}, + *partitioned_queue.queue_configs(max_workers_per_partition=1), ] ) diff --git a/awa-python/tests/test_sync.py b/awa-python/tests/test_sync.py index ea877375..35dac7b2 100644 --- a/awa-python/tests/test_sync.py +++ b/awa-python/tests/test_sync.py @@ -266,6 +266,23 @@ def test_insert_many_copy_sync(client): assert job.args["to"] == f"copy{i}@example.com" +def test_insert_many_copy_sync_per_job_opts(client): + jobs_data = [ + SyncEmail(to="copy-a@example.com", subject="Copy A"), + SyncEmail(to="copy-b@example.com", subject="Copy B"), + ] + results = client.insert_many_copy( + jobs_data, + queue="unused_default", + opts=[ + {"queue": "sync_copy_a", "ordering_key": "customer-a"}, + {"queue": "sync_copy_b", "ordering_key": None}, + ], + ) + + assert [job.queue for job in results] == ["sync_copy_a", "sync_copy_b"] + + def test_enqueue_many_copy_sync_queue_storage(client): schema = "awa_py_sync_enqueue_many_copy" queue = "sync_enqueue_many_copy" diff --git a/awa-worker/src/client.rs b/awa-worker/src/client.rs index 8437944d..e24a18bc 100644 --- a/awa-worker/src/client.rs +++ b/awa-worker/src/client.rs @@ -13,7 +13,9 @@ use awa_model::admin::{ QueueRuntimeConfigSnapshot, QueueRuntimeMode, QueueRuntimeSnapshot, RateLimitSnapshot, RuntimeSnapshotInput, StorageCapability, TransitionRole, }; -use awa_model::{storage as transition, JobArgs, PeriodicJob, QueueFanout, QueueStorageConfig}; +use awa_model::{ + storage as transition, JobArgs, PartitionedQueue, PeriodicJob, QueueStorageConfig, +}; use chrono::{DateTime, Utc}; use serde::de::DeserializeOwned; use sqlx::PgPool; @@ -231,22 +233,27 @@ impl ClientBuilder { self } - /// Add every physical queue in a logical queue fanout. + /// Add every physical queue in a logical partitioned queue. /// /// This is equivalent to calling [`queue`] once for each physical queue in - /// the fanout. Producers should route inserts through the same - /// [`QueueFanout`] so workers and producers agree on the physical queue + /// the partitioned queue. Producers should route inserts through the same + /// [`PartitionedQueue`] so workers and producers agree on the physical queue /// names. /// /// The `config` is applied to each physical queue. In hard-reserved mode, - /// total logical capacity is therefore roughly `fanout.width() * - /// config.max_workers`; per-queue rate limits also apply per physical + /// total logical capacity is therefore roughly + /// `partitioned_queue.partitions() * config.max_workers`; per-queue rate + /// limits also apply per physical /// queue. Divide those knobs yourself, or use weighted mode with a /// `global_max_workers` cap, when you need a logical total. /// /// [`queue`]: ClientBuilder::queue - pub fn queue_fanout(mut self, fanout: &QueueFanout, config: QueueConfig) -> Self { - for queue in fanout.physical_queues() { + pub fn partitioned_queue( + mut self, + partitioned_queue: &PartitionedQueue, + config: QueueConfig, + ) -> Self { + for queue in partitioned_queue.physical_queues() { self.queues.push((queue.clone(), config.clone())); } self @@ -2489,6 +2496,7 @@ mod tests { match sqlx::query(&create_sql).execute(&admin_pool).await { Ok(_) => {} Err(sqlx::Error::Database(db_err)) if db_err.code().as_deref() == Some("42P04") => {} + Err(sqlx::Error::Database(db_err)) if db_err.code().as_deref() == Some("23505") => {} Err(err) => panic!("Failed to create client test database {database_name}: {err}"), } } @@ -2836,19 +2844,20 @@ mod tests { } #[tokio::test] - async fn queue_fanout_declares_each_physical_queue() { - let fanout = QueueFanout::new("email", 3).expect("fanout should build"); + async fn partitioned_queue_declares_each_physical_queue() { + let partitioned_queue = + PartitionedQueue::new("email", 3).expect("partitioned queue should build"); let client = Client::builder(lazy_pool()) - .queue_fanout( - &fanout, + .partitioned_queue( + &partitioned_queue, QueueConfig { max_workers: 7, ..QueueConfig::default() }, ) .build() - .expect("fanout queues should build"); + .expect("partitioned queue queues should build"); let queues: Vec<_> = client .queues @@ -2857,22 +2866,23 @@ mod tests { .collect(); assert_eq!( queues, - vec![("email__p0", 7), ("email__p1", 7), ("email__p2", 7)] + vec![("email", 7), ("email__p1", 7), ("email__p2", 7)] ); } #[tokio::test] async fn duplicate_queue_declarations_are_rejected() { - let fanout = QueueFanout::new("email", 2).expect("fanout should build"); + let partitioned_queue = + PartitionedQueue::new("email", 2).expect("partitioned queue should build"); let result = Client::builder(lazy_pool()) - .queue("email__p0", QueueConfig::default()) - .queue_fanout(&fanout, QueueConfig::default()) + .queue("email", QueueConfig::default()) + .partitioned_queue(&partitioned_queue, QueueConfig::default()) .build(); assert!(matches!( result, - Err(BuildError::DuplicateQueue { queue }) if queue == "email__p0" + Err(BuildError::DuplicateQueue { queue }) if queue == "email" )); } diff --git a/awa-worker/src/completion.rs b/awa-worker/src/completion.rs index f7e3b438..79c8f4db 100644 --- a/awa-worker/src/completion.rs +++ b/awa-worker/src/completion.rs @@ -354,17 +354,67 @@ mod tests { use awa_model::migrations; use sqlx::postgres::PgPoolOptions; use std::sync::Arc; - use std::time::Instant; + use std::time::{Duration, Instant}; - fn database_url() -> String { + fn base_database_url() -> String { std::env::var("DATABASE_URL") .unwrap_or_else(|_| "postgres://postgres:test@localhost:15432/awa_test".to_string()) } + fn replace_database_name(url: &str, database_name: &str) -> String { + let (without_query, query_suffix) = match url.split_once('?') { + Some((prefix, query)) => (prefix, Some(query)), + None => (url, None), + }; + let (base, _) = without_query + .rsplit_once('/') + .expect("database URL should include a database name"); + let mut out = format!("{base}/{database_name}"); + if let Some(query) = query_suffix { + out.push('?'); + out.push_str(query); + } + out + } + + fn database_url() -> String { + std::env::var("DATABASE_URL_WORKER_COMPLETION").unwrap_or_else(|_| { + replace_database_name(&base_database_url(), "awa_test_worker_completion") + }) + } + + async fn ensure_database_exists(url: &str) { + let database_name = url + .split_once('?') + .map(|(prefix, _)| prefix) + .unwrap_or(url) + .rsplit_once('/') + .map(|(_, database_name)| database_name.to_string()) + .expect("database URL should include a database name"); + let admin_url = replace_database_name(url, "postgres"); + let admin_pool = PgPoolOptions::new() + .max_connections(1) + .connect(&admin_url) + .await + .expect("Failed to connect to admin database for completion tests"); + let create_sql = format!("CREATE DATABASE {database_name}"); + match sqlx::query(&create_sql).execute(&admin_pool).await { + Ok(_) => {} + Err(sqlx::Error::Database(db_err)) if db_err.code().as_deref() == Some("42P04") => {} + Err(sqlx::Error::Database(db_err)) if db_err.code().as_deref() == Some("23505") => {} + Err(err) => { + panic!("Failed to create completion test database {database_name}: {err}") + } + } + } + async fn setup(max_conns: u32) -> PgPool { + let url = database_url(); + ensure_database_exists(&url).await; let pool = PgPoolOptions::new() .max_connections(max_conns) - .connect(&database_url()) + .acquire_timeout(Duration::from_secs(5)) + .connect(&url) .await .expect("Failed to connect to database"); migrations::run(&pool).await.expect("Failed to migrate"); diff --git a/awa-worker/src/lib.rs b/awa-worker/src/lib.rs index 9a1d2969..4002bc96 100644 --- a/awa-worker/src/lib.rs +++ b/awa-worker/src/lib.rs @@ -16,8 +16,8 @@ mod storage; // Re-exports pub use awa_model::{ - CallbackConfig, CronMissedFirePolicy, PeriodicJob, PeriodicJobBuilder, QueueFanout, - QueueFanoutError, + CallbackConfig, CronMissedFirePolicy, PartitionedQueue, PartitionedQueueError, PeriodicJob, + PeriodicJobBuilder, }; pub use client::{ BuildError, Client, ClientBuilder, HealthCheck, QueueCapacity, QueueHealth, diff --git a/awa/README.md b/awa/README.md index fe62f9ad..3959c2f1 100644 --- a/awa/README.md +++ b/awa/README.md @@ -76,17 +76,17 @@ awa::insert_with(&pool, &UpdateCustomer { ... }, opts).await?; Jobs sharing an `ordering_key` always pick the same shard, so the shard's strict FIFO carries over to per-key FIFO. At `enqueue_shards = 1` the key is ignored. See [ADR-025](../docs/adr/025-sharded-enqueue-heads.md) for the full contract. -## Logical Queue Fanout +## Partitioned Queues -For very hot workloads, `QueueFanout` gives Rust producers and workers the same deterministic physical queue set for one logical queue: +For very hot workloads, `PartitionedQueue` gives Rust producers and workers the same deterministic physical queue set for one logical queue: ```rust -use awa::{InsertOpts, QueueConfig, QueueFanout}; +use awa::{InsertOpts, QueueConfig, PartitionedQueue}; -let updates = QueueFanout::new("customer-updates", 4)?; +let updates = PartitionedQueue::new("customer-updates", 4)?; let client = awa::Client::builder(pool.clone()) - .queue_fanout(&updates, QueueConfig { + .partitioned_queue(&updates, QueueConfig { max_workers: 32, // per physical queue ..Default::default() }) @@ -100,7 +100,7 @@ let opts = updates.route_opts_by_key( awa::insert_with(&pool, &UpdateCustomer { ... }, opts).await?; ``` -This is ordinary Awa queueing under the hood: each physical queue keeps its durable claim, lease, retry, DLQ, and completion behavior. The helper only standardizes naming and routing so applications do not hand-roll fanout. +This is ordinary Awa queueing under the hood: each physical queue keeps its durable claim, lease, retry, DLQ, and completion behavior. The helper only standardizes naming and routing so applications do not hand-roll partitioning. ## Documentation diff --git a/awa/src/lib.rs b/awa/src/lib.rs index 4bc953bd..51d6b407 100644 --- a/awa/src/lib.rs +++ b/awa/src/lib.rs @@ -13,11 +13,11 @@ pub use awa_model; pub use awa_model::{ self as model, adapter, admin, bridge, callback_contract, insert, insert_many, insert_many_copy, insert_many_copy_from_pool, insert_with, map_sqlx_error, migrations, - prepare_job_insert, prepare_raw_job_insert, storage, AwaError, CallbackConfig, DefaultAction, - DlqMetadata, DlqRow, InsertOpts, InsertParams, JobArgs, JobKindDescriptor, JobRow, JobState, - ListDlqFilter, PreparedJobInsert, QueueDescriptor, QueueFanout, QueueFanoutError, QueueStorage, - QueueStorageConfig, ResolveOutcome, RetryFromDlqOpts, StorageCapability, StorageStatus, - UniqueOpts, + partition_for_ordering_key, partition_hash64, prepare_job_insert, prepare_raw_job_insert, + storage, AwaError, CallbackConfig, DefaultAction, DlqMetadata, DlqRow, InsertOpts, + InsertParams, JobArgs, JobKindDescriptor, JobRow, JobState, ListDlqFilter, PartitionedQueue, + PartitionedQueueError, PreparedJobInsert, QueueDescriptor, QueueStorage, QueueStorageConfig, + ResolveOutcome, RetryFromDlqOpts, StorageCapability, StorageStatus, UniqueOpts, }; // Re-export worker runtime diff --git a/correctness/README.md b/correctness/README.md index db9a105d..2dad893e 100644 --- a/correctness/README.md +++ b/correctness/README.md @@ -46,6 +46,7 @@ What is intentionally not modeled: - `storage/AwaSegmentedStorageInterleavings.cfg`: alternate two-worker config for the same segmented-storage spec, used to exercise stale completion and waiting/resume interleavings without changing the base safety model - `storage/AwaSegmentedStorageRaces.tla` / `storage/AwaSegmentedStorageRaces.cfg` / `storage/AwaSegmentedStorageRacesSafe.cfg`: focused race-exposure spec that splits `Claim` into `BeginClaim` / `CommitClaim` to model the claim path's cursor-read-without-lock behaviour. The race config produces a counterexample trace (claim snapshots segment, rotate+prune fire, commit lands lease in pruned segment — simultaneously the claim-vs-rotate race and the prune check-then-act race). The safe config uses a checked commit and passes. Production uses CAS on ring state plus child-partition locks and busy checks, not `FOR SHARE` on `lease_ring_state` — see `storage/MAPPING.md` for the full analysis. - `storage/AwaShardedPrune.tla` / `storage/AwaShardedPrune.cfg` / `storage/AwaShardedPruneBroken.cfg`: focused ADR-025 regression model for queue-ring prune across enqueue shards. The passing config requires ready rows to match done rows by `(enqueue_shard, lane_seq)` before prune can drop a sealed queue slot. The broken config intentionally ignores `enqueue_shard` and produces the historical counterexample: shard 0 has a done row at `lane_seq = 1`, shard 1 still has a pending ready row at `lane_seq = 1`, and broken prune drops both. +- `storage/AwaPartitionedQueueRouting.tla` / `storage/AwaPartitionedQueueRouting.cfg`: focused ADR-031 routing model for `PartitionedQueue`. Partitioned queues remain client-side routing over ordinary Awa queues, so this spec checks the refinement boundary above storage: every key routes to its selected partition, the original ordering key still determines the enqueue shard inside that partition, lane sequence identity stays scoped to `(partition, shard)`, and the representative routing table reaches every shard from every partition. - `storage/AwaStorageLockOrder.tla` / `storage/AwaStorageLockOrder.cfg` / `storage/AwaStorageLockOrderDeadlockDemo.cfg` / `storage/AwaStorageLockOrderOldStripedClaimDeadlock.cfg`: lock-ordering protocol spec. Models each storage-engine transaction (enqueue, claim, complete, close-receipt, rescue-receipts, ensure-running, cancel, rotate, and prune) as an ordered sequence of Postgres lock acquisitions with a shared/exclusive compatibility matrix. The striped enqueue path takes multiple physical queue lanes in stable order; the current striped claim path takes at most one physical stripe per transaction. Checks `NoDeadlock` via a waits-for cycle detector. The demo configs use a deliberately cycle-creating plan pair and the historical old striped logical-claim plan to prove the detector catches real cycles. - `storage/AwaStorageTransition.tla` / `storage/AwaStorageTransition.cfg` / `storage/AwaStorageTransitionCurrentGate.cfg`: focused model for the 0.5.x-to-0.6 storage transition control plane. It covers prepare, schema readiness, mixed-transition entry, canonical backlog drain, queue-storage routing, finalize, and abort interlocks. The desired config matches the v014+ SQL gate: it requires a live queue-storage target executor at mixed-transition entry and passes cleanly. The `CurrentGate`-named config is now a historical pre-v014 regression witness: it models the old capability-only gate where an auto runtime started before mixed transition reports `queue_storage` while prepared, then becomes drain-only after routing flips. - `storage/AwaSegmentedStorageTrace.tla` / `storage/AwaSegmentedStorageTrace.cfg` / `storage/AwaSegmentedStorageTraceReceiptRescue.cfg` / `storage/AwaSegmentedStorageTraceLostClaimAdvance.cfg` / `storage/AwaSegmentedStorageTraceRunningCancel.cfg` / `storage/AwaSegmentedStorageTraceReceiptOnlyCancel.cfg` / `storage/AwaSegmentedStorageTraceCallbackWait.cfg` / `storage/AwaSegmentedStorageTraceDlqRetry.cfg` / `storage/AwaSegmentedStorageTraceDlqPurge.cfg` / `storage/AwaSegmentedStorageTraceBroken.cfg`: trace-validation harness. Takes hand-transcribed sequences of queue-storage runtime events and verifies each transition is accepted by the storage spec. Current positive traces cover snooze, receipt rescue, running cancel, receipt-only cancel, callback wait/resume, DLQ retry, and DLQ purge. A deliberately-broken variant trips deadlock at traceIdx = 2 to confirm the checker rejects invalid sequences. @@ -84,6 +85,7 @@ Passing configs: ./correctness/run-tlc.sh storage/AwaStorageLockOrder.tla ./correctness/run-tlc.sh storage/AwaStorageTransition.tla ./correctness/run-tlc.sh storage/AwaShardedPrune.tla +./correctness/run-tlc.sh storage/AwaPartitionedQueueRouting.tla ``` Expected counterexample or positive-witness configs: @@ -162,6 +164,8 @@ To keep the state graph finite, `AwaExtended` bounds retries with `MaxAttempts = `AwaShardedPrune` covers the cross-shard prune property that the lifecycle model deliberately abstracts away: `lane_seq` values are only unique within an `enqueue_shard`, so any ready/done anti-join used for queue-slot prune must include `enqueue_shard`. The passing config checks the fixed predicate; the broken config remains as a regression witness. +`AwaPartitionedQueueRouting` covers the client-side routing layer introduced by ADR-031. It does not add a storage family: each partition is still an ordinary Awa queue covered by `AwaSegmentedStorage`. The model pins the composition property between partition routing and ADR-025 enqueue-shard routing so keyed traffic can fan across shards inside every partition. + `AwaBatcher` models the async completion path between handler return and DB update. In the real system (`awa-worker/src/completion.rs`), completed jobs are queued in a sharded in-memory buffer and flushed to the database in batches of up to 512 every 1ms. This introduces a window where a job has completed in the handler but not yet in the database — during which maintenance can rescue the job and a new worker can re-claim it. ### Mapping to Rust code diff --git a/correctness/storage/AwaPartitionedQueueRouting.cfg b/correctness/storage/AwaPartitionedQueueRouting.cfg new file mode 100644 index 00000000..05c1a1e9 --- /dev/null +++ b/correctness/storage/AwaPartitionedQueueRouting.cfg @@ -0,0 +1,15 @@ +CONSTANTS + Partitions = {0, 1, 2, 3} + Shards = {0, 1, 2, 3} + MaxJobs = 4 + +SPECIFICATION Spec + +CHECK_DEADLOCK FALSE + +INVARIANTS + TypeOK + RowsFollowRouting + SameKeySamePartition + SameKeySameShard + SeqUniqueWithinPartitionShard diff --git a/correctness/storage/AwaPartitionedQueueRouting.tla b/correctness/storage/AwaPartitionedQueueRouting.tla new file mode 100644 index 00000000..07c6a989 --- /dev/null +++ b/correctness/storage/AwaPartitionedQueueRouting.tla @@ -0,0 +1,99 @@ +---- MODULE AwaPartitionedQueueRouting ---- +EXTENDS Naturals, FiniteSets + +\* Focused ADR-031 routing model. +\* +\* Partitioned queues are not a new storage family: each partition is an +\* ordinary Awa queue covered by AwaSegmentedStorage. This model checks the +\* client-side refinement boundary that sits above those queues: +\* +\* - every produced job is routed to the partition selected by its key +\* - the original ordering key is still available to the storage layer, so +\* enqueue-shard routing is stable inside the chosen partition +\* - lane sequence identity is scoped to (partition, shard) +\* - the chosen routing table for the model reaches every shard from every +\* partition, which is the property the Rust domain-separated hash test +\* enforces for representative key sets + +CONSTANTS Partitions, + Shards, + MaxJobs + +VARIABLES jobs, + nextSeq + +vars == <> + +KeySet == Partitions \X Shards + +PartitionOf(k) == k[1] +ShardOf(k) == k[2] + +JobRows == + [id: 1..MaxJobs, + key: KeySet, + partition: Partitions, + shard: Shards, + seq: 1..(MaxJobs + 1)] + +Init == + /\ jobs = {} + /\ nextSeq = [p \in Partitions |-> [s \in Shards |-> 1]] + +Enqueue(k) == + /\ k \in KeySet + /\ Cardinality(jobs) < MaxJobs + /\ LET p == PartitionOf(k) + s == ShardOf(k) + id == Cardinality(jobs) + 1 + IN + /\ p \in Partitions + /\ s \in Shards + /\ jobs' = jobs \cup + {[id |-> id, + key |-> k, + partition |-> p, + shard |-> s, + seq |-> nextSeq[p][s]]} + /\ nextSeq' = [nextSeq EXCEPT ![p][s] = @ + 1] + +Next == + \E k \in KeySet : Enqueue(k) + +Spec == + Init /\ [][Next]_vars + +TypeOK == + /\ jobs \subseteq JobRows + /\ nextSeq \in [Partitions -> [Shards -> 1..(MaxJobs + 1)]] + /\ \A k \in KeySet : + /\ PartitionOf(k) \in Partitions + /\ ShardOf(k) \in Shards + +RowsFollowRouting == + \A row \in jobs : + /\ row.partition = PartitionOf(row.key) + /\ row.shard = ShardOf(row.key) + +SameKeySamePartition == + \A a, b \in jobs : + a.key = b.key => a.partition = b.partition + +SameKeySameShard == + \A a, b \in jobs : + a.key = b.key => a.shard = b.shard + +SeqUniqueWithinPartitionShard == + \A a, b \in jobs : + /\ a.id # b.id + /\ a.partition = b.partition + /\ a.shard = b.shard + => a.seq # b.seq + +PartitionShardCoverage == + \A p \in Partitions : + {ShardOf(k) : k \in {candidate \in KeySet : PartitionOf(candidate) = p}} = Shards + +ASSUME PartitionShardCoverage + +==== diff --git a/correctness/storage/MAPPING.md b/correctness/storage/MAPPING.md index 39090dcf..0ac97bd1 100644 --- a/correctness/storage/MAPPING.md +++ b/correctness/storage/MAPPING.md @@ -134,6 +134,14 @@ The TLA+ storage model treats `EnqueueReady` and `EnqueueDeferred` as logical pe Uniqueness itself is intentionally outside this storage model: duplicate rejection is covered by Rust integration tests around `job_unique_claims`. The model's enqueue preconditions start after a job has been admitted to the storage state, so batching uniqueness claims changes implementation granularity rather than the modeled lifecycle, lane, lease, or prune invariants. +## Partitioned queue routing note + +`PartitionedQueue` lives in `awa-model/src/partitioned_queue.rs`, is re-exported by `awa` and `awa-worker`, and is wrapped for Python as `awa.PartitionedQueue`. It maps one logical queue to a caller-declared list of ordinary physical queue names. The worker builder expands that list into ordinary `QueueConfig` declarations; producer helpers stamp the selected physical queue onto `InsertOpts`. + +That makes partitioned queues a routing refinement above `AwaSegmentedStorage`, not a new lifecycle variable. Each physical queue keeps the same `(queue, priority, enqueue_shard, lane_seq)` lane identity, lease/receipt safety, terminal retention, DLQ, and prune contracts already mapped in this document. + +[`AwaPartitionedQueueRouting.tla`](./AwaPartitionedQueueRouting.tla) pins the cross-layer routing property: the partition selector is domain-separated from ADR-025's enqueue-shard selector, but key-routed producers still pass the original ordering key into storage. The Rust test `partition_hash_is_domain_separated_from_enqueue_shard_hash` is the code-level distribution regression for the same issue. + ## Batch-operations control-plane note ADR-030 batch operations have two layers: diff --git a/correctness/storage/README.md b/correctness/storage/README.md index 323c2002..254ff5c0 100644 --- a/correctness/storage/README.md +++ b/correctness/storage/README.md @@ -113,6 +113,23 @@ ADR-026 adds the retained-body shape for ready-backed terminal rows. `TerminalHa See [`MAPPING.md`](./MAPPING.md) for the action-by-action correspondence between TLA+ transitions and the Rust implementation, including the SQL statements that enforce each guard. +## Partitioned-queue routing companion spec + +[`AwaPartitionedQueueRouting.tla`](./AwaPartitionedQueueRouting.tla) models the ADR-031 `PartitionedQueue` boundary above storage. A partitioned queue is not a new table family: each partition is an ordinary physical queue with the same ready, lease, receipt, terminal, DLQ, tombstone, and prune behaviour checked by `AwaSegmentedStorage`. + +The companion spec checks the routing refinement that storage deliberately abstracts away: + +- produced rows land in the partition selected for their key +- the original ordering key still chooses the queue-storage enqueue shard inside that partition +- lane sequence numbers are unique only within `(partition, shard)`, matching ADR-025's scoped FIFO +- the configured key set demonstrates that every partition can reach every shard, which is the model-level counterpart to the Rust domain-separated hash distribution test + +Run it with: + +```bash +./correctness/run-tlc.sh storage/AwaPartitionedQueueRouting.tla +``` + ## Lock-order companion spec [`AwaStorageLockOrder.tla`](./AwaStorageLockOrder.tla) models each storage-engine transaction (claim, rotate-leases, prune-leases, rotate-ready, prune-ready) as an ordered sequence of Postgres lock acquisitions, with a simplified shared/exclusive compatibility matrix that captures the cases relevant to deadlock analysis. Invariants: diff --git a/docs/adr/031-partitioned-queues.md b/docs/adr/031-partitioned-queues.md new file mode 100644 index 00000000..b9abe9cb --- /dev/null +++ b/docs/adr/031-partitioned-queues.md @@ -0,0 +1,125 @@ +# ADR-031: Partitioned Queues + +## Status + +Proposed. + +## Context + +Awa already has the machinery for spreading one hot logical queue across multiple physical queues. `PartitionedQueue` supplies deterministic physical queue names and producer routing, Rust `ClientBuilder::partitioned_queue()` and Python `PartitionedQueue.queue_configs()` register every partition, and the queue-storage COPY path already accepts per-job `InsertParams` and groups internally by `(queue, priority, enqueue_shard)`. + +The remaining design work is API stabilization and guardrail work before this surface becomes part of the stable `0.6` contract. + +The key defect is correlated hashing across partition and shard levels. Partition selection previously reused `shard_for_ordering_key(key, partitions)`, while queue storage later used `shard_for_ordering_key(key, enqueue_shards)`. Both operations reduced the same hash value modulo different numbers. Whenever `gcd(partitions, enqueue_shards) > 1`, keyed traffic in one partition reached only `enqueue_shards / gcd(partitions, enqueue_shards)` shards. At `partitions = 4` and `enqueue_shards = 4`, keyed traffic in each partition landed on exactly one enqueue shard, silently negating ADR-025 for keyed workloads. + +The ergonomics gaps are narrower: + +- Python batch COPY exposed one `queue` and one `ordering_key` per call even though the Rust core supports per-row routing. +- The public name should describe partitioned routing rather than messaging fanout. +- The default naming should not strand direct enqueues to the logical queue during a `1 -> N` rollout. +- Docs must distinguish partitions, enqueue shards, internal queue stripes, ordering keys, and claimers. + +## Terminology + +- **Logical queue**: the application-visible workload name, for example `customer-updates`. +- **Partition / physical queue**: an ordinary Awa queue that carries one slice of the logical queue. This is the ordering scope. +- **Ordering key**: caller-supplied bytes that route related jobs to the same partition and to the same enqueue shard within it. +- **Enqueue shard**: ADR-025's per-`(queue, priority)` head-row shard inside one physical queue. +- **Internal queue stripe**: queue-storage `queue_stripe_count` striping using `queue#N` names. This remains an advanced storage knob, not the public API. + +## Decision + +Promote the existing routing machinery as **partitioned queues**. + +The public type is `PartitionedQueue`. `QueueFanout` is not kept as an alias because the surface has not reached a stable release and two names for the same concept would make the partitioning vocabulary harder to teach. "Fanout" is also misleading in messaging systems because it often means duplicate delivery to every subscriber, while this feature routes each job to exactly one partition. + +`PartitionedQueue::new(logical_queue, partitions)` uses the logical queue name itself as partition 0. With four partitions, the default physical queues are: + +```text +customer-updates +customer-updates__p1 +customer-updates__p2 +customer-updates__p3 +``` + +This makes `partitions = 1` literally a plain queue, keeps direct enqueues to the logical name consumable, and makes growing `1 -> N` a normal rolling-deploy operation rather than an orphaned-queue hazard. + +Partition routing is domain-separated from enqueue-shard routing. The algorithm keeps ADR-025's portable base ordering-key hash intact for storage shard compatibility, then applies a partition-specific SplitMix64 finalizer before reducing modulo the partition count. Non-Rust producers that hand-roll routing must use that partition hash for physical queue selection and continue to pass the original key as `ordering_key`. + +Python `insert_many_copy()` and `enqueue_many_copy()` accept per-job `opts`, where each entry may override `queue` and `ordering_key` for the matching job. Batch-level kwargs remain defaults. This exposes the Rust core's existing per-row COPY shape without adding a partition-specific bulk API. + +Capacity remains per partition. `ClientBuilder::partitioned_queue()` applies `QueueConfig` to every physical queue; Python names this explicitly with `max_workers_per_partition`, `min_workers_per_partition`, and `rate_limit_per_partition`. In hard-reserved mode, logical capacity is roughly `partitions * max_workers_per_partition`. Use weighted mode plus `global_max_workers` when a logical total cap matters. + +Width changes are repartitioning events. Growing changes key-to-partition routing; per-key FIFO is not preserved across the transition if old jobs for a key remain in the old partition while new jobs route to the new one. Shrinking can leave jobs in removed partitions until they drain or are moved. The safe grow order is workers first, then producers. The safe shrink order is producers first, drain or move removed partitions, then workers. + +## Guarantees + +Job safety is unchanged. Every partition is an ordinary Awa queue with the ADR-019 queue-storage guarantees, ADR-023 receipt-ring guarantees, and ADR-026 terminal-history guarantees. Transactional enqueue, at-least-once delivery, rescue, and `(job_id, run_lease)` guarded finalization are untouched. + +Partitioning changes ordering scope only: + +- `partitions = 1`: the queue behaves like a normal queue. +- `partitions > 1`: FIFO is scoped to the selected physical queue and enqueue shard; no cross-partition FIFO is promised. +- Jobs with the same ordering key route to the same partition and carry the same `ordering_key` into queue storage. +- Per-key FIFO is not preserved across a partition-count change unless the old partition has drained before new jobs for that key are produced. + +## Relationship to Other ADRs + +- **ADR-019: Queue Storage Engine.** Partitions compose as independent ordinary queues. Append-only ready/terminal storage, ready tombstones, receipt paths, and prune safety are unchanged. +- **ADR-023: Receipt Plane Ring Partitioning.** Receipt safety is per partition and composes across the partition set. +- **ADR-025: Sharded Enqueue Heads.** Partitions split a logical workload across physical queues. Enqueue shards split head rows within one physical queue. The domain-separated partition hash is required for the two levels to compose for keyed traffic. +- **ADR-026: Narrow Terminal History.** Terminal counts stay per physical queue. Logical aggregation is a read-side concern. +- **ADR-008: Batch COPY Ingestion.** The Rust queue-storage COPY path is already multi-queue per row; this ADR exposes that shape in Python batch APIs. +- **ADR-011: Weighted Concurrency.** Partition registration multiplies per-queue hard reservations and weights. Global caps remain the way to express a logical total. +- **ADR-022: Descriptor Catalog.** If partition metadata becomes persisted, it belongs in descriptor-style metadata off the dispatch hot path. +- **ADR-030: Durable Batch Operations.** A partitioned queue as a source means the union of partitions. A partitioned queue as a destination is deferred because preserving key routing would require per-job rehashing inside the operation. + +## Consequences + +Positive: + +- Users get a single documented concept for the strongest demonstrated end-to-end throughput lever. +- The keyed-routing skew is fixed before the routing contract stabilizes. +- Python and Rust expose the same conceptual model. +- `queue_stripe_count` stops competing for the public fanout/striping story. + +Negative: + +- Awa still has three partitioning concepts: partitioned queues, enqueue shards, and internal queue stripes. Docs must keep those distinct. +- Partition-0-as-logical-name makes default physical names asymmetric. +- Logical aggregation in DB-only admin surfaces still needs runtime registration context or future persisted metadata. + +## Alternatives Considered + +### Keep `QueueFanout` + +Rejected. The name is misleading for a feature that routes each job to one partition, and the surface is not stable enough to justify keeping both names. + +### Preserve `logical__p0` as partition 0 + +Rejected. It leaves the bare logical queue name unconsumed at `partitions > 1`, so direct producers, cron schedules, and operator moves can strand work unless every caller is updated at once. + +### Add `enqueue_many_grouped_copy(group, jobs, key=...)` + +Rejected. The core COPY path already accepts per-row queue and ordering key. Per-job `opts` is simpler and works for every multi-queue batch, not just partitioned queues. + +### Promote `queue_stripe_count` + +Rejected as the public API. It is global queue-storage config with internal `queue#N` names and producer/worker configuration coupling. It remains an advanced internal storage knob. + +### Persist partition metadata first + +Deferred. A no-schema API layer proves the public contract. Persisted metadata becomes worthwhile when DB-side aggregation, UI discovery, or producer/worker partition-count mismatch detection needs it. + +## Validation + +The implementation should have: + +- Rust unit tests for default naming, explicit physical queues, routing, validation, and std-trait ergonomics. +- A hash distribution test for `partitions = enqueue_shards = 4` showing keyed traffic inside each partition reaches every enqueue shard. +- Python tests for `PartitionedQueue`, queue config expansion, and per-job COPY opts. +- Integration tests that a worker registered on a partitioned queue can claim from every partition. +- Benchmarks covering partition counts `1`, `2`, `4`, and `8`, plus at least one `partitions x enqueue_shards` keyed-routing cell. +- A focused TLA+ routing model (`AwaPartitionedQueueRouting`) checking that partition routing composes with enqueue-shard routing while lane sequence identity remains scoped to `(partition, shard)`. + +The ADR-019 storage lifecycle models do not need a new table or transition while partitioned queues remain client-side routing over independent ordinary queues. Revisit `AwaSegmentedStorage` itself only if routing or persisted partition metadata becomes part of the storage invariants. diff --git a/docs/adr/README.md b/docs/adr/README.md index 8c1f66c6..c5b9422e 100644 --- a/docs/adr/README.md +++ b/docs/adr/README.md @@ -38,6 +38,7 @@ Template: `Status / Context / Decision / Consequences (positive, negative) / Alt | 028 | [Maintenance-only runtime role](028-maintenance-only-runtime-role.md) | Proposed | Run promotion, rescue, pruning, and metadata maintenance without claiming or executing user jobs | Complements ADR-027 and ADR-018; uses ADR-029 for durable rescue-driven side effects | | 029 | [Transactional follow-up jobs](029-transactional-followup-jobs.md) | Accepted | Durable lifecycle side effects are delivered by enqueuing follow-up Awa jobs — atomically with the triggering state UPDATE for worker-driven outcomes and for callback resolution via the worker `Client`, best-effort in a separate transaction for maintenance rescue; hooks remain for observation | Codifies ADR-015's "enqueue another job" guidance; addresses the durable-event punt in ADRs 027/028 | | 030 | [Durable batch operations for operator bulk mutation](030-batch-operations.md) | Accepted | Filter-driven async bulk mutation with preview, progress, cancellation, retention, and maintenance-led execution; v0.6 starts with `set_priority` and `move_queue` | Refines ADR-019/025 operator mutation paths; complements ADR-028 | +| 031 | [Partitioned queues](031-partitioned-queues.md) | Proposed | First-class logical queue partitioning over ordinary physical queues, with domain-separated key routing and Python per-job COPY opts | Composes ADR-019/023/026 storage guarantees; refines the ADR-025 sharding interaction | ## Validation artifacts diff --git a/docs/architecture.md b/docs/architecture.md index fa1c5688..b1d70c61 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -153,7 +153,7 @@ Claim is cursor-based rather than heap-scan based: - `queue_enqueue_heads` allocates lane sequence ranges at enqueue time per `(physical queue, priority, enqueue_shard)`. - `queue_claim_heads` advances monotonically during claim and is the authority for the next claimable lane position on the same shard-qualified lane. - The dispatcher pre-acquires execution permits before claiming, so every claimed `running` job has reserved local capacity. -- `QueueFanout` is a Rust and Python helper for mapping one hot logical queue to several ordinary physical queue names. The storage engine does not add a separate group table for this; each physical queue keeps the normal lane, lease, DLQ, descriptor, and terminal-history contract. +- `PartitionedQueue` is a Rust and Python helper for mapping one hot logical queue to several ordinary physical queue names. The storage engine does not add a separate group table for this; each physical queue keeps the normal lane, lease, DLQ, descriptor, and terminal-history contract. - Queue striping and bounded claimers reduce contention on very hot logical queues. Per-queue `claimers` can add dispatcher/claimer loops inside one runtime, but those loops share the queue's worker permits and rate limiter; they do not own jobs. Recovery still follows the receipt/lease state in Postgres. Priority ordering is by effective priority first. Within one enqueue shard the lane sequence is FIFO; across enqueue shards, strict global lane order is not promised. With queue storage, priority aging is applied at claim time rather than by physically rewriting ready rows. The ready/done/lease partitions carry shard-aware lane indexes on `(queue, priority, enqueue_shard, lane_seq)` so deep backlog claim probes do not scan a non-shard-selective lane index and post-filter most rows. diff --git a/docs/benchmarking.md b/docs/benchmarking.md index 6df8afee..cfb08d28 100644 --- a/docs/benchmarking.md +++ b/docs/benchmarking.md @@ -157,7 +157,7 @@ Key observations: Queue-storage e2e sweeps separate tuning from storage design. For the hot single-queue shape, `enqueue_shards = 4` plus larger completion batches sustained `7.9k` completed jobs/s with `200ms` p99 end-to-end latency and bounded depth. Increasing `claimers` did not materially improve that shape. -When the application can accept partitioned ordering at the logical workload level, routing through several physical queues with `QueueFanout` is the preferred throughput lever: it creates independent claim and completion coordination streams rather than adding more claimers to one queue head. +When the application can accept partitioned ordering at the logical workload level, routing through several physical queues with `PartitionedQueue` is the preferred throughput lever: it creates independent claim and completion coordination streams rather than adding more claimers to one queue head. ADR-026 terminal-count deltas remove hot-path `queue_terminal_live_counts` updates. Completion and terminal-delete paths append signed deltas, exact reads include pending deltas, and maintenance folds sealed slots into compact counters only when the MVCC horizon is not pinned by another backend snapshot or idle transaction id. Benchmark runs should therefore sample `queue_terminal_count_deltas_*` alongside `queue_terminal_live_counts` so regressions distinguish pending append-only rows from mutable-counter dead tuples. diff --git a/docs/configuration.md b/docs/configuration.md index a82f7827..e3e28732 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -112,21 +112,21 @@ The key `QueueConfig` fields: Defaults intentionally favor the smallest blast radius: `enqueue_shards = 1`, `claimers = 1`, and `claim_batch_size = 512`. Raise `enqueue_shards` only when the queue can accept partitioned FIFO semantics. For a single hot queue, a larger claim batch usually helps before extra claimers: it reduces claim round-trips without adding more concurrent head coordinators. Benchmark `claimers = 2` or `4` only when a single claimer cannot keep worker permits full. -### Logical Queue Fanout +### Partitioned Queues A logical queue is the application concept: for example `email` or `customer-updates`. A physical queue is the queue name Awa stores in Postgres and workers claim from. Most applications use one physical queue per logical queue. -Very hot workloads can fan one logical queue out over multiple physical queues when the ordering contract allows it. This creates independent durable claim and completion streams. It is the most direct way to reduce hot-head coordination without weakening durability: each job is still written, claimed, leased, completed, retried, and rescued through the normal Awa tables. +Very hot workloads can partition one logical queue over multiple physical queues when the ordering contract allows it. This creates independent durable claim and completion streams. It is the most direct way to reduce hot-head coordination without weakening durability: each job is still written, claimed, leased, completed, retried, and rescued through the normal Awa tables. -Rust producers and workers can share `QueueFanout`: +Rust producers and workers can share `PartitionedQueue`: ```rust -use awa::{Client, InsertOpts, QueueConfig, QueueFanout}; +use awa::{Client, InsertOpts, QueueConfig, PartitionedQueue}; -let customer_updates = QueueFanout::new("customer-updates", 4)?; +let customer_updates = PartitionedQueue::new("customer-updates", 4)?; let client = Client::builder(pool.clone()) - .queue_fanout(&customer_updates, QueueConfig { + .partitioned_queue(&customer_updates, QueueConfig { // Applied per physical queue: 4 × 32 hard-reserved workers. max_workers: 32, ..Default::default() @@ -140,35 +140,35 @@ let opts = customer_updates.route_opts_by_key( ); ``` -With width `1`, `QueueFanout::new("email", 1)` uses the plain `email` queue. With width above `1`, the default physical names are stable: `email__p0`, `email__p1`, and so on. Key-based routing sets both the selected physical queue and `InsertOpts::ordering_key`, so related jobs keep per-key FIFO even if each physical queue later raises `enqueue_shards`. +With one partition, `PartitionedQueue::new("email", 1)` uses the plain `email` queue. With more than one partition, partition 0 remains the logical queue name and later partitions use stable suffixes: `email`, `email__p1`, `email__p2`, and so on. Key-based routing sets both the selected physical queue and `InsertOpts::ordering_key`, so related jobs keep per-key FIFO inside the selected partition even if each physical queue later raises `enqueue_shards`. -`queue_fanout` applies the `QueueConfig` to each physical queue. In hard-reserved mode, total logical capacity is roughly `fanout.width() * max_workers`; `rate_limit` is also per physical queue. Divide those values yourself, or use `global_max_workers`, when you need a logical total cap across the fanout. +`partitioned_queue` applies the `QueueConfig` to each physical queue. In hard-reserved mode, total logical capacity is roughly `partitioned_queue.partitions() * max_workers`; `rate_limit` is also per physical queue. Divide those values yourself, or use `global_max_workers`, when you need a logical total cap across the partitioned queue. -Python exposes the same deterministic router through the `awa.QueueFanout` class: +Python exposes the same deterministic router through the `awa.PartitionedQueue` class: ```python -fanout = awa.QueueFanout("customer-updates", 4) +queue = awa.PartitionedQueue("customer-updates", 4) -@client.task(UpdateCustomer, queue=fanout.physical_queues[0]) +@client.task(UpdateCustomer, queue=queue.physical_queues[0]) async def update_customer(job): ... await client.start( - fanout.queue_configs( - max_workers_per_queue=16, + queue.queue_configs( + max_workers_per_partition=16, claim_batch_size=512, ) ) await client.insert( UpdateCustomer(customer_id=customer_id, payload=payload), - **fanout.route_by_key(f"customer-{customer_id}"), + **queue.route_by_key(f"customer-{customer_id}"), ) ``` -Register the handler once and pass explicit fanout queue configs to `start()`. Python handlers are dispatched by job kind; the queue name on `@client.task` gives `start()` a declared queue to validate. +Register the handler once and pass explicit partition queue configs to `start()`. Python handlers are dispatched by job kind; the queue name on `@client.task` gives `start()` a declared queue to validate. -`route_by_key()` returns `{"queue": ..., "ordering_key": ...}` and can be passed directly to `insert()`, `insert_many_copy()`, or `enqueue_many_copy()` when the whole call shares the same key. `route_by_index()` returns only a queue for round-robin fanout when per-key FIFO is not needed. Python `enqueue_many_copy()` still takes one queue and one ordering key per call, so mixed-key COPY producers should group their batch before calling it. +`route_by_key()` returns `{"queue": ..., "ordering_key": ...}` and can be passed directly to `insert()`. For `insert_many_copy()` and `enqueue_many_copy()`, pass `opts=[queue.route_by_key(key) for ...]` when a batch contains jobs for multiple partitions. `route_by_index()` returns only a queue for round-robin partitioning when per-key FIFO is not needed. ### Choosing a throughput lever @@ -176,11 +176,11 @@ These knobs solve different bottlenecks: | Knob | What changes | Use it when | | --- | --- | --- | -| `QueueFanout` | Routes one logical workload across multiple physical queues. Each physical queue has its own queue-level capacity, rate limit, claim cursor, completion stream, and metrics. | The workload can be partitioned by key or round-robin, and you want more end-to-end throughput from independent queue coordination paths. | +| `PartitionedQueue` | Routes one logical workload across multiple physical queues. Each physical queue has its own queue-level capacity, rate limit, claim cursor, completion stream, and metrics. | The workload can be partitioned by key or round-robin, and you want more end-to-end throughput from independent queue coordination paths. | | `enqueue_shards` | Keeps one physical queue name, but splits that queue into multiple ordered lanes. FIFO becomes per `(queue, priority, enqueue_shard)` instead of global per `(queue, priority)`. | Operators should still see one queue, but the workload can accept partitioned FIFO inside that queue. | | `claimers` | Adds more dispatcher/claimer loops for the same physical queue inside one worker runtime. They share the queue's worker permits and rate limiter. | One claimer is leaving worker permits idle. This is not the first lever for a hot queue because extra claimers can add transaction pressure without creating independent capacity domains. | -For a hot workload that can be partitioned, start with `QueueFanout`. If the public queue should stay singular, consider `enqueue_shards`. Raise `claimers` only when measurements show the runtime is claim-starved rather than Postgres-bound. +For a hot workload that can be partitioned, start with `PartitionedQueue`. If the public queue should stay singular, consider `enqueue_shards`. Raise `claimers` only when measurements show the runtime is claim-starved rather than Postgres-bound. ### Python diff --git a/docs/getting-started-python.md b/docs/getting-started-python.md index 22e63de5..9dc13867 100644 --- a/docs/getting-started-python.md +++ b/docs/getting-started-python.md @@ -126,7 +126,7 @@ python -m awa --database-url "$DATABASE_URL" serve - `await client.migrate()` runs migrations from Python instead of the CLI. - `awa.Client` provides a synchronous API for worker/admin/direct-producer code — all methods are plain (e.g., `client.insert(...)`, `client.migrate()`). - `client.start()` accepts tuple queue configs for hard-reserved mode and dict configs for weighted mode. See [Configuration reference](configuration.md). -- `awa.QueueFanout` helps one hot logical queue use several physical queues while keeping routing deterministic. See [Logical queue fanout](configuration.md#logical-queue-fanout). +- `awa.PartitionedQueue` helps one hot logical queue use several physical queues while keeping routing deterministic. See [Partitioned queues](configuration.md#partitioned-queues). ## ORM Transaction Bridging From 3df59ffe77e8d3f79d55450924ea36d98101152d Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Fri, 12 Jun 2026 19:57:31 +1200 Subject: [PATCH 2/3] Refine partitioned queue routing model --- correctness/README.md | 5 +- .../storage/AwaPartitionedQueueRouting.cfg | 5 +- .../storage/AwaPartitionedQueueRouting.tla | 100 ++++++++++++++---- .../AwaPartitionedQueueRoutingBroken.cfg | 17 +++ correctness/storage/MAPPING.md | 2 +- correctness/storage/README.md | 12 ++- docs/adr/031-partitioned-queues.md | 2 +- 7 files changed, 114 insertions(+), 29 deletions(-) create mode 100644 correctness/storage/AwaPartitionedQueueRoutingBroken.cfg diff --git a/correctness/README.md b/correctness/README.md index 2dad893e..3edd7b87 100644 --- a/correctness/README.md +++ b/correctness/README.md @@ -46,7 +46,7 @@ What is intentionally not modeled: - `storage/AwaSegmentedStorageInterleavings.cfg`: alternate two-worker config for the same segmented-storage spec, used to exercise stale completion and waiting/resume interleavings without changing the base safety model - `storage/AwaSegmentedStorageRaces.tla` / `storage/AwaSegmentedStorageRaces.cfg` / `storage/AwaSegmentedStorageRacesSafe.cfg`: focused race-exposure spec that splits `Claim` into `BeginClaim` / `CommitClaim` to model the claim path's cursor-read-without-lock behaviour. The race config produces a counterexample trace (claim snapshots segment, rotate+prune fire, commit lands lease in pruned segment — simultaneously the claim-vs-rotate race and the prune check-then-act race). The safe config uses a checked commit and passes. Production uses CAS on ring state plus child-partition locks and busy checks, not `FOR SHARE` on `lease_ring_state` — see `storage/MAPPING.md` for the full analysis. - `storage/AwaShardedPrune.tla` / `storage/AwaShardedPrune.cfg` / `storage/AwaShardedPruneBroken.cfg`: focused ADR-025 regression model for queue-ring prune across enqueue shards. The passing config requires ready rows to match done rows by `(enqueue_shard, lane_seq)` before prune can drop a sealed queue slot. The broken config intentionally ignores `enqueue_shard` and produces the historical counterexample: shard 0 has a done row at `lane_seq = 1`, shard 1 still has a pending ready row at `lane_seq = 1`, and broken prune drops both. -- `storage/AwaPartitionedQueueRouting.tla` / `storage/AwaPartitionedQueueRouting.cfg`: focused ADR-031 routing model for `PartitionedQueue`. Partitioned queues remain client-side routing over ordinary Awa queues, so this spec checks the refinement boundary above storage: every key routes to its selected partition, the original ordering key still determines the enqueue shard inside that partition, lane sequence identity stays scoped to `(partition, shard)`, and the representative routing table reaches every shard from every partition. +- `storage/AwaPartitionedQueueRouting.tla` / `storage/AwaPartitionedQueueRouting.cfg` / `storage/AwaPartitionedQueueRoutingBroken.cfg`: focused ADR-031 routing model for `PartitionedQueue`. Partitioned queues remain client-side routing over ordinary Awa queues, so this spec checks the refinement boundary above storage: abstract key hashes route to their selected partition, the same base hash still determines the enqueue shard inside that partition, lane sequence identity stays scoped to `(partition, shard)`, and the configured hash domain reaches every shard from every partition. The broken config reuses the base hash modulo at both routing levels and demonstrates the correlated-hash defect. - `storage/AwaStorageLockOrder.tla` / `storage/AwaStorageLockOrder.cfg` / `storage/AwaStorageLockOrderDeadlockDemo.cfg` / `storage/AwaStorageLockOrderOldStripedClaimDeadlock.cfg`: lock-ordering protocol spec. Models each storage-engine transaction (enqueue, claim, complete, close-receipt, rescue-receipts, ensure-running, cancel, rotate, and prune) as an ordered sequence of Postgres lock acquisitions with a shared/exclusive compatibility matrix. The striped enqueue path takes multiple physical queue lanes in stable order; the current striped claim path takes at most one physical stripe per transaction. Checks `NoDeadlock` via a waits-for cycle detector. The demo configs use a deliberately cycle-creating plan pair and the historical old striped logical-claim plan to prove the detector catches real cycles. - `storage/AwaStorageTransition.tla` / `storage/AwaStorageTransition.cfg` / `storage/AwaStorageTransitionCurrentGate.cfg`: focused model for the 0.5.x-to-0.6 storage transition control plane. It covers prepare, schema readiness, mixed-transition entry, canonical backlog drain, queue-storage routing, finalize, and abort interlocks. The desired config matches the v014+ SQL gate: it requires a live queue-storage target executor at mixed-transition entry and passes cleanly. The `CurrentGate`-named config is now a historical pre-v014 regression witness: it models the old capability-only gate where an auto runtime started before mixed transition reports `queue_storage` while prepared, then becomes drain-only after routing flips. - `storage/AwaSegmentedStorageTrace.tla` / `storage/AwaSegmentedStorageTrace.cfg` / `storage/AwaSegmentedStorageTraceReceiptRescue.cfg` / `storage/AwaSegmentedStorageTraceLostClaimAdvance.cfg` / `storage/AwaSegmentedStorageTraceRunningCancel.cfg` / `storage/AwaSegmentedStorageTraceReceiptOnlyCancel.cfg` / `storage/AwaSegmentedStorageTraceCallbackWait.cfg` / `storage/AwaSegmentedStorageTraceDlqRetry.cfg` / `storage/AwaSegmentedStorageTraceDlqPurge.cfg` / `storage/AwaSegmentedStorageTraceBroken.cfg`: trace-validation harness. Takes hand-transcribed sequences of queue-storage runtime events and verifies each transition is accepted by the storage spec. Current positive traces cover snooze, receipt rescue, running cancel, receipt-only cancel, callback wait/resume, DLQ retry, and DLQ purge. A deliberately-broken variant trips deadlock at traceIdx = 2 to confirm the checker rejects invalid sequences. @@ -103,6 +103,7 @@ Expected counterexample or positive-witness configs: ./correctness/run-tlc.sh storage/AwaSegmentedStorageTrace.tla storage/AwaSegmentedStorageTraceDlqRetry.cfg ./correctness/run-tlc.sh storage/AwaSegmentedStorageTrace.tla storage/AwaSegmentedStorageTraceDlqPurge.cfg ./correctness/run-tlc.sh storage/AwaSegmentedStorageTrace.tla storage/AwaSegmentedStorageTraceBroken.cfg +./correctness/run-tlc.sh storage/AwaPartitionedQueueRouting.tla storage/AwaPartitionedQueueRoutingBroken.cfg ./correctness/run-tlc.sh storage/AwaStorageLockOrder.tla storage/AwaStorageLockOrderDeadlockDemo.cfg ./correctness/run-tlc.sh storage/AwaStorageLockOrder.tla storage/AwaStorageLockOrderOldStripedClaimDeadlock.cfg ./correctness/run-tlc.sh storage/AwaStorageTransition.tla storage/AwaStorageTransitionCurrentGate.cfg @@ -164,7 +165,7 @@ To keep the state graph finite, `AwaExtended` bounds retries with `MaxAttempts = `AwaShardedPrune` covers the cross-shard prune property that the lifecycle model deliberately abstracts away: `lane_seq` values are only unique within an `enqueue_shard`, so any ready/done anti-join used for queue-slot prune must include `enqueue_shard`. The passing config checks the fixed predicate; the broken config remains as a regression witness. -`AwaPartitionedQueueRouting` covers the client-side routing layer introduced by ADR-031. It does not add a storage family: each partition is still an ordinary Awa queue covered by `AwaSegmentedStorage`. The model pins the composition property between partition routing and ADR-025 enqueue-shard routing so keyed traffic can fan across shards inside every partition. +`AwaPartitionedQueueRouting` covers the client-side routing layer introduced by ADR-031. It does not add a storage family: each partition is still an ordinary Awa queue covered by `AwaSegmentedStorage`. The model treats keys as abstract base hash values, derives the storage shard as `h % ShardCount`, derives the partition from a domain-separated partition hash, and pins the composition property that keyed traffic can fan across shards inside every partition. The broken config is the `AwaShardedPrune`-style witness for the shape we do not want: if partition and shard both reuse the same low bits, a partition only sees the correlated shard subset. `AwaBatcher` models the async completion path between handler return and DB update. In the real system (`awa-worker/src/completion.rs`), completed jobs are queued in a sharded in-memory buffer and flushed to the database in batches of up to 512 every 1ms. This introduces a window where a job has completed in the handler but not yet in the database — during which maintenance can rescue the job and a new worker can re-claim it. diff --git a/correctness/storage/AwaPartitionedQueueRouting.cfg b/correctness/storage/AwaPartitionedQueueRouting.cfg index 05c1a1e9..75cb7382 100644 --- a/correctness/storage/AwaPartitionedQueueRouting.cfg +++ b/correctness/storage/AwaPartitionedQueueRouting.cfg @@ -1,6 +1,7 @@ CONSTANTS - Partitions = {0, 1, 2, 3} - Shards = {0, 1, 2, 3} + PartitionCount = 4 + ShardCount = 4 + MaxHash = 15 MaxJobs = 4 SPECIFICATION Spec diff --git a/correctness/storage/AwaPartitionedQueueRouting.tla b/correctness/storage/AwaPartitionedQueueRouting.tla index 07c6a989..c7298d45 100644 --- a/correctness/storage/AwaPartitionedQueueRouting.tla +++ b/correctness/storage/AwaPartitionedQueueRouting.tla @@ -1,5 +1,5 @@ ---- MODULE AwaPartitionedQueueRouting ---- -EXTENDS Naturals, FiniteSets +EXTENDS Naturals, Integers, FiniteSets \* Focused ADR-031 routing model. \* @@ -7,16 +7,18 @@ EXTENDS Naturals, FiniteSets \* ordinary Awa queue covered by AwaSegmentedStorage. This model checks the \* client-side refinement boundary that sits above those queues: \* -\* - every produced job is routed to the partition selected by its key +\* - keys are abstract base hash values, not pre-routed partition/shard pairs +\* - every produced job is routed to the partition selected by its key hash \* - the original ordering key is still available to the storage layer, so \* enqueue-shard routing is stable inside the chosen partition \* - lane sequence identity is scoped to (partition, shard) -\* - the chosen routing table for the model reaches every shard from every -\* partition, which is the property the Rust domain-separated hash test -\* enforces for representative key sets +\* - domain-separated partition routing reaches every shard from every +\* partition. The broken spec reuses the base hash modulo both levels and +\* demonstrates the correlated-hash defect from ADR-031. -CONSTANTS Partitions, - Shards, +CONSTANTS PartitionCount, + ShardCount, + MaxHash, MaxJobs VARIABLES jobs, @@ -24,14 +26,32 @@ VARIABLES jobs, vars == <> -KeySet == Partitions \X Shards +ASSUME /\ PartitionCount > 0 + /\ ShardCount > 0 + /\ MaxHash >= 0 + /\ MaxJobs > 0 -PartitionOf(k) == k[1] -ShardOf(k) == k[2] +Partitions == 0..(PartitionCount - 1) +Shards == 0..(ShardCount - 1) +KeyHashes == 0..MaxHash + +\* ADR-025 enqueue-shard routing uses the portable base ordering-key hash. +BaseHash(h) == h +ShardOf(h) == BaseHash(h) % ShardCount + +\* Abstracts the Rust SplitMix64-style finalizer over +\* `ordering_key_hash64(key) ^ PARTITION_HASH_DOMAIN`. The important property +\* for this storage-level model is that the partition hash folds independent +\* hash bits into the low bits before modulo reduction. +PartitionHash(h) == (BaseHash(h) + (BaseHash(h) \div ShardCount)) % (MaxHash + 1) +PartitionOf(h) == PartitionHash(h) % PartitionCount + +\* Historical broken shape: partition and shard both reduce the same base hash. +CorrelatedPartitionOf(h) == BaseHash(h) % PartitionCount JobRows == [id: 1..MaxJobs, - key: KeySet, + keyHash: KeyHashes, partition: Partitions, shard: Shards, seq: 1..(MaxJobs + 1)] @@ -41,7 +61,7 @@ Init == /\ nextSeq = [p \in Partitions |-> [s \in Shards |-> 1]] Enqueue(k) == - /\ k \in KeySet + /\ k \in KeyHashes /\ Cardinality(jobs) < MaxJobs /\ LET p == PartitionOf(k) s == ShardOf(k) @@ -51,37 +71,72 @@ Enqueue(k) == /\ s \in Shards /\ jobs' = jobs \cup {[id |-> id, - key |-> k, + keyHash |-> k, + partition |-> p, + shard |-> s, + seq |-> nextSeq[p][s]]} + /\ nextSeq' = [nextSeq EXCEPT ![p][s] = @ + 1] + +EnqueueBroken(k) == + /\ k \in KeyHashes + /\ Cardinality(jobs) < MaxJobs + /\ LET p == CorrelatedPartitionOf(k) + s == ShardOf(k) + id == Cardinality(jobs) + 1 + IN + /\ p \in Partitions + /\ s \in Shards + /\ jobs' = jobs \cup + {[id |-> id, + keyHash |-> k, partition |-> p, shard |-> s, seq |-> nextSeq[p][s]]} /\ nextSeq' = [nextSeq EXCEPT ![p][s] = @ + 1] Next == - \E k \in KeySet : Enqueue(k) + \E k \in KeyHashes : Enqueue(k) + +NextBroken == + \E k \in KeyHashes : EnqueueBroken(k) Spec == Init /\ [][Next]_vars +SpecBroken == + Init /\ [][NextBroken]_vars + TypeOK == /\ jobs \subseteq JobRows /\ nextSeq \in [Partitions -> [Shards -> 1..(MaxJobs + 1)]] - /\ \A k \in KeySet : + /\ \A k \in KeyHashes : /\ PartitionOf(k) \in Partitions /\ ShardOf(k) \in Shards +TypeOKBroken == + /\ jobs \subseteq JobRows + /\ nextSeq \in [Partitions -> [Shards -> 1..(MaxJobs + 1)]] + /\ \A k \in KeyHashes : + /\ CorrelatedPartitionOf(k) \in Partitions + /\ ShardOf(k) \in Shards + RowsFollowRouting == \A row \in jobs : - /\ row.partition = PartitionOf(row.key) - /\ row.shard = ShardOf(row.key) + /\ row.partition = PartitionOf(row.keyHash) + /\ row.shard = ShardOf(row.keyHash) + +RowsFollowCorrelatedRouting == + \A row \in jobs : + /\ row.partition = CorrelatedPartitionOf(row.keyHash) + /\ row.shard = ShardOf(row.keyHash) SameKeySamePartition == \A a, b \in jobs : - a.key = b.key => a.partition = b.partition + a.keyHash = b.keyHash => a.partition = b.partition SameKeySameShard == \A a, b \in jobs : - a.key = b.key => a.shard = b.shard + a.keyHash = b.keyHash => a.shard = b.shard SeqUniqueWithinPartitionShard == \A a, b \in jobs : @@ -92,7 +147,12 @@ SeqUniqueWithinPartitionShard == PartitionShardCoverage == \A p \in Partitions : - {ShardOf(k) : k \in {candidate \in KeySet : PartitionOf(candidate) = p}} = Shards + {ShardOf(k) : k \in {candidate \in KeyHashes : PartitionOf(candidate) = p}} = Shards + +CorrelatedPartitionShardCoverage == + \A p \in Partitions : + {ShardOf(k) : k \in {candidate \in KeyHashes : + CorrelatedPartitionOf(candidate) = p}} = Shards ASSUME PartitionShardCoverage diff --git a/correctness/storage/AwaPartitionedQueueRoutingBroken.cfg b/correctness/storage/AwaPartitionedQueueRoutingBroken.cfg new file mode 100644 index 00000000..ee3fd742 --- /dev/null +++ b/correctness/storage/AwaPartitionedQueueRoutingBroken.cfg @@ -0,0 +1,17 @@ +CONSTANTS + PartitionCount = 4 + ShardCount = 4 + MaxHash = 15 + MaxJobs = 4 + +SPECIFICATION SpecBroken + +CHECK_DEADLOCK FALSE + +INVARIANTS + TypeOKBroken + RowsFollowCorrelatedRouting + SameKeySamePartition + SameKeySameShard + SeqUniqueWithinPartitionShard + CorrelatedPartitionShardCoverage diff --git a/correctness/storage/MAPPING.md b/correctness/storage/MAPPING.md index 0ac97bd1..d16c687c 100644 --- a/correctness/storage/MAPPING.md +++ b/correctness/storage/MAPPING.md @@ -140,7 +140,7 @@ Uniqueness itself is intentionally outside this storage model: duplicate rejecti That makes partitioned queues a routing refinement above `AwaSegmentedStorage`, not a new lifecycle variable. Each physical queue keeps the same `(queue, priority, enqueue_shard, lane_seq)` lane identity, lease/receipt safety, terminal retention, DLQ, and prune contracts already mapped in this document. -[`AwaPartitionedQueueRouting.tla`](./AwaPartitionedQueueRouting.tla) pins the cross-layer routing property: the partition selector is domain-separated from ADR-025's enqueue-shard selector, but key-routed producers still pass the original ordering key into storage. The Rust test `partition_hash_is_domain_separated_from_enqueue_shard_hash` is the code-level distribution regression for the same issue. +[`AwaPartitionedQueueRouting.tla`](./AwaPartitionedQueueRouting.tla) pins the cross-layer routing property with abstract key hashes. Storage shard routing is derived as `h % ShardCount`, while partition routing is derived from a domain-separated partition hash. The broken config reuses the same low bits for both modulo reductions and fails the coverage invariant, matching the Rust `partition_hash_is_domain_separated_from_enqueue_shard_hash` distribution regression. ## Batch-operations control-plane note diff --git a/correctness/storage/README.md b/correctness/storage/README.md index 254ff5c0..5814e87b 100644 --- a/correctness/storage/README.md +++ b/correctness/storage/README.md @@ -119,17 +119,23 @@ See [`MAPPING.md`](./MAPPING.md) for the action-by-action correspondence between The companion spec checks the routing refinement that storage deliberately abstracts away: -- produced rows land in the partition selected for their key -- the original ordering key still chooses the queue-storage enqueue shard inside that partition +- abstract key hashes land in the partition selected by the domain-separated partition hash +- the same base hash still chooses the queue-storage enqueue shard as `h % ShardCount` inside that partition - lane sequence numbers are unique only within `(partition, shard)`, matching ADR-025's scoped FIFO - the configured key set demonstrates that every partition can reach every shard, which is the model-level counterpart to the Rust domain-separated hash distribution test -Run it with: +Run the passing config with: ```bash ./correctness/run-tlc.sh storage/AwaPartitionedQueueRouting.tla ``` +Run the broken correlated-hash witness with: + +```bash +./correctness/run-tlc.sh storage/AwaPartitionedQueueRouting.tla storage/AwaPartitionedQueueRoutingBroken.cfg +``` + ## Lock-order companion spec [`AwaStorageLockOrder.tla`](./AwaStorageLockOrder.tla) models each storage-engine transaction (claim, rotate-leases, prune-leases, rotate-ready, prune-ready) as an ordered sequence of Postgres lock acquisitions, with a simplified shared/exclusive compatibility matrix that captures the cases relevant to deadlock analysis. Invariants: diff --git a/docs/adr/031-partitioned-queues.md b/docs/adr/031-partitioned-queues.md index b9abe9cb..d4ff6c98 100644 --- a/docs/adr/031-partitioned-queues.md +++ b/docs/adr/031-partitioned-queues.md @@ -120,6 +120,6 @@ The implementation should have: - Python tests for `PartitionedQueue`, queue config expansion, and per-job COPY opts. - Integration tests that a worker registered on a partitioned queue can claim from every partition. - Benchmarks covering partition counts `1`, `2`, `4`, and `8`, plus at least one `partitions x enqueue_shards` keyed-routing cell. -- A focused TLA+ routing model (`AwaPartitionedQueueRouting`) checking that partition routing composes with enqueue-shard routing while lane sequence identity remains scoped to `(partition, shard)`. +- A focused TLA+ routing model (`AwaPartitionedQueueRouting`) checking abstract key-hash routing: storage shard selection is derived from the base hash, partition selection is domain-separated, lane sequence identity remains scoped to `(partition, shard)`, and a broken correlated-hash config fails as expected. The ADR-019 storage lifecycle models do not need a new table or transition while partitioned queues remain client-side routing over independent ordinary queues. Revisit `AwaSegmentedStorage` itself only if routing or persisted partition metadata becomes part of the storage invariants. From 67a3c99eef9c9a23487819ec39b31185afd21fa5 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Fri, 12 Jun 2026 20:04:31 +1200 Subject: [PATCH 3/3] Address review findings: fix CI readme path, changelog, and metrics attribution - Point awa-python project.readme at the SDK README.md; newer maturin rejects readme paths outside the package root, which broke the Python build + test and chaos smoke CI jobs. - Restore the 0.6.0-beta.2 changelog entry to the QueueFanout surface that release actually shipped, and document the PartitionedQueue replacement (naming, domain-separated routing, per-job COPY opts) under Unreleased instead. - Attribute enqueue_many_copy batch metrics to the routed queues: per-job opts can split one batch across queues, so recording the whole batch under the batch-level default queue mislabels both histograms. Homogeneous batches keep the store-reported total. - Reattach the prepare_insert_many_params doc comment that the PerJobInsertOpts struct insertion had captured, and document the tri-state ordering_key semantics on the struct. - Raise a ValidationError for non-string opts queue values to match the neighboring opts validation errors. - Mark ADR-031 Accepted now that this PR implements it. --- CHANGELOG.md | 12 ++- awa-python/pyproject.toml | 2 +- awa-python/src/client.rs | 117 ++++++++++++++++++++++++---- awa-python/src/partitioned_queue.rs | 4 +- docs/adr/031-partitioned-queues.md | 2 +- docs/adr/README.md | 2 +- 6 files changed, 119 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d0357c3..efb2d51f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,16 @@ Notable changes between releases. Detailed migration notes for storage transitions live in [`docs/upgrade-0.5-to-0.6.md`](docs/upgrade-0.5-to-0.6.md). +## [Unreleased] + +### Added + +- **Per-job routing in Python COPY batch APIs** ([#348](https://github.com/hardbyte/awa/pull/348), [ADR-031](docs/adr/031-partitioned-queues.md)). `insert_many_copy` / `enqueue_many_copy` (async and sync) accept `opts=[...]` entries that override `queue` and `ordering_key` per job, so one COPY call can carry a mixed-partition batch. Batch-level kwargs remain defaults; an explicit `"ordering_key": None` clears the key for that job. + +### Changed + +- **Breaking (beta series): `QueueFanout` is replaced by `PartitionedQueue`** ([#348](https://github.com/hardbyte/awa/pull/348), [ADR-031](docs/adr/031-partitioned-queues.md)); no compatibility alias. Besides the rename (`width` → `partitions`, `queue_fanout` → `partitioned_queue`, Python `*_per_queue` config kwargs → `*_per_partition`), routing behavior differs from beta.2: partition 0 is now the logical queue name itself (`email`, `email__p1`, ...) so direct enqueues to the logical name stay consumable, and key→partition selection uses a domain-separated hash so it composes with ADR-025 enqueue-shard routing (the same key may land on a different partition than under beta.2 `QueueFanout`). Drain fanout queues before upgrading producers/workers, or pin explicit names via `from_physical_queues`. + ## [0.6.0-beta.2] — 2026-06-10 Second beta of the 0.6 line. The headline is storage-engine work under pinned MVCC horizons ([#169](https://github.com/hardbyte/awa/issues/169)): sequence-backed cursors, append-only ready segments, and a terminal-count delta ledger replace the remaining hot-row update paths that beta.1 still carried. This is also the release the #169 stable gate will be validated against — see "Benchmark evidence" below. @@ -25,7 +35,7 @@ Migrations v022–v031 apply via `awa migrate` (or the SQL-only path in [`docs/m - **Transactional follow-up jobs** ([ADR-029](docs/adr/029-transactional-followup-jobs.md), [#285](https://github.com/hardbyte/awa/pull/285)/[#288](https://github.com/hardbyte/awa/pull/288)). `ClientBuilder::on_completed_enqueue` (and friends) registers a follow-up job inserted in the same transaction as the lifecycle transition for worker-driven outcomes; callback resolution through the worker `Client` commits transition + follow-up atomically too. Maintenance rescue stays best-effort by design. - **Callback-only router and user-owned callback layers** ([#291](https://github.com/hardbyte/awa/pull/291)/[#293](https://github.com/hardbyte/awa/pull/293), closes [#281](https://github.com/hardbyte/awa/issues/281)). The callback ingress contract is shared and the URL prefix configurable; embed Awa's router or implement the contract in your own API layer — axum and FastAPI examples documented. - **`WaitingForCallback` lifecycle event + client-side callback resolution** ([#276](https://github.com/hardbyte/awa/pull/276)). Jobs parking on `WaitForCallback` are now visible to lifecycle hooks, and `Client` gains `resolve_callback` / `complete_external` / `fail_external` / `retry_external` that dispatch the matching terminal event in-process. -- **Partitioned queue helper** ([#327](https://github.com/hardbyte/awa/pull/327)). Rust `PartitionedQueue` + `ClientBuilder::partitioned_queue` and Python `awa.PartitionedQueue` for deterministic routing from one hot logical queue to multiple physical queues; duplicate physical declarations are rejected. +- **Queue fanout helper** ([#327](https://github.com/hardbyte/awa/pull/327)). Rust `QueueFanout` + `ClientBuilder::queue_fanout` and Python `awa.QueueFanout` for deterministic routing from one hot logical queue to multiple physical queues; duplicate physical declarations are rejected. Replaced by `PartitionedQueue` in [#348](https://github.com/hardbyte/awa/pull/348). - **Cron schedule pause / resume** ([#320](https://github.com/hardbyte/awa/pull/320), migration v026). `POST /api/cron/{name}/pause` / `/resume`; the evaluator skips paused rows and the `atomic_enqueue` CTE re-checks `paused_at IS NULL` so a pause asserted mid-evaluation still takes effect. `last_enqueued_at` is untouched while paused, so `missed_fire_policy` decides catch-up on resume. Manual `trigger_cron_job` bypasses pause. The `/cron` UI gains Pause/Resume controls. - **`awa storage finalize --wait` / `--check`** ([#298](https://github.com/hardbyte/awa/pull/298)). `--wait` polls and finalizes once readiness gates stay clear for two consecutive observations; `--check` is a dry-run that exits 2 when blocked. - **Storage-transition readiness UI** ([#299](https://github.com/hardbyte/awa/pull/299)). Time-in-state, epoch-anchored backlog history, a prominent `prepared_schema_ready=false` warning with the remediation command, and a rollback-boundaries panel. diff --git a/awa-python/pyproject.toml b/awa-python/pyproject.toml index 380e36ee..162f5992 100644 --- a/awa-python/pyproject.toml +++ b/awa-python/pyproject.toml @@ -7,7 +7,7 @@ name = "awa-pg" requires-python = ">=3.10" version = "0.6.0-beta.2" description = "Postgres-native background job queue — Python SDK with async/sync workers, transactional enqueue, and progress tracking. Install awa-pg[ui] to add the bundled web dashboard." -readme = {file = "../README.md", content-type = "text/markdown"} +readme = "README.md" license = {text = "MIT OR Apache-2.0"} authors = [{name = "Brian Thorne"}] keywords = ["postgres", "job-queue", "background-jobs", "async", "celery-alternative"] diff --git a/awa-python/src/client.rs b/awa-python/src/client.rs index 3141fa87..f1247393 100644 --- a/awa-python/src/client.rs +++ b/awa-python/src/client.rs @@ -1343,11 +1343,7 @@ impl PyClient { } /// Fetch one durable batch operation by UUID string. - fn get_batch_operation<'py>( - &self, - py: Python<'py>, - id: String, - ) -> PyResult> { + fn get_batch_operation<'py>(&self, py: Python<'py>, id: String) -> PyResult> { let id = Uuid::parse_str(&id).map_err(|err| { pyo3::exceptions::PyValueError::new_err(format!("invalid batch operation id: {err}")) })?; @@ -2307,6 +2303,7 @@ impl PyClient { ordering_key.as_ref(), opts.as_deref(), )?; + let queue_counts = enqueue_batch_queue_counts(&insert_params); pyo3_async_runtimes::tokio::future_into_py(py, async move { let schema = QueueStorage::active_schema(&pool) @@ -2328,7 +2325,7 @@ impl PyClient { .enqueue_params_copy(&pool, &insert_params) .await .map_err(map_awa_error)?; - metrics.record_enqueue_batch(&queue, count as u64, started.elapsed()); + record_enqueue_batch_metrics(&metrics, &queue_counts, count as u64, started.elapsed()); Ok(count) }) } @@ -2373,6 +2370,7 @@ impl PyClient { ordering_key.as_ref(), opts.as_deref(), )?; + let queue_counts = enqueue_batch_queue_counts(&insert_params); py.detach(|| { pyo3_async_runtimes::tokio::get_runtime().block_on(async { @@ -2395,7 +2393,12 @@ impl PyClient { .enqueue_params_copy(&pool, &insert_params) .await .map_err(map_awa_error)?; - metrics.record_enqueue_batch(&queue, count as u64, started.elapsed()); + record_enqueue_batch_metrics( + &metrics, + &queue_counts, + count as u64, + started.elapsed(), + ); Ok(count) }) }) @@ -3150,13 +3153,51 @@ impl PyClient { } } -/// Convert a list of Python job args into InsertParams for the COPY path. +/// Group prepared COPY params by their routed queue for enqueue metrics. +/// +/// Per-job `opts` can route one batch across several queues, so the +/// enqueue metrics cannot attribute the whole batch to the batch-level +/// default queue name. Counts are prepared rows per queue; uniqueness +/// skips inside the store are not visible per queue, so a homogeneous +/// batch should prefer the store-reported total. +fn enqueue_batch_queue_counts(params: &[InsertParams]) -> Vec<(String, u64)> { + let mut counts: std::collections::BTreeMap<&str, u64> = std::collections::BTreeMap::new(); + for param in params { + *counts.entry(param.opts.queue.as_str()).or_insert(0) += 1; + } + counts + .into_iter() + .map(|(queue, count)| (queue.to_string(), count)) + .collect() +} + +fn record_enqueue_batch_metrics( + metrics: &awa_worker::AwaMetrics, + queue_counts: &[(String, u64)], + stored_total: u64, + elapsed: std::time::Duration, +) { + match queue_counts { + [(queue, _)] => metrics.record_enqueue_batch(queue, stored_total, elapsed), + many => { + for (queue, prepared) in many { + metrics.record_enqueue_batch(queue, *prepared, elapsed); + } + } + } +} + +/// Per-job overrides parsed from a COPY batch `opts` entry. +/// +/// `ordering_key` is tri-state: absent falls back to the batch-level +/// key, while an explicit `None` clears it for that job. #[derive(Debug, Clone, Default)] struct PerJobInsertOpts { queue: Option, ordering_key: Option>>, } +/// Convert a list of Python job args into InsertParams for the COPY path. #[allow(clippy::too_many_arguments)] fn prepare_insert_many_params( py: Python<'_>, @@ -3279,7 +3320,11 @@ fn parse_one_job_insert_opts( let queue = dict .get_item("queue")? .filter(|value| !value.is_none()) - .map(|value| value.extract::()) + .map(|value| { + value + .extract::() + .map_err(|_| validation_error("opts queue must be a string")) + }) .transpose()?; if queue.as_deref() == Some("") { return Err(validation_error("opts queue must not be empty")); @@ -3638,10 +3683,12 @@ fn parse_batch_operation_filter( let state = optional_dict_item(dict, "state")? .map(|value| value.extract::().and_then(|s| parse_job_state(&s))) .transpose()?; - let created_at_gte = - optional_dict_item(dict, "created_at_gte")?.map(extract_datetime).transpose()?; - let created_at_lt = - optional_dict_item(dict, "created_at_lt")?.map(extract_datetime).transpose()?; + let created_at_gte = optional_dict_item(dict, "created_at_gte")? + .map(extract_datetime) + .transpose()?; + let created_at_lt = optional_dict_item(dict, "created_at_lt")? + .map(extract_datetime) + .transpose()?; Ok(BatchOperationFilter { kind, @@ -3803,3 +3850,47 @@ fn parse_queue_retention_overrides( Ok(overrides) } + +#[cfg(test)] +mod tests { + use super::enqueue_batch_queue_counts; + use awa_model::{InsertOpts, InsertParams}; + + fn params_for_queue(queue: &str) -> InsertParams { + InsertParams { + kind: "test".to_string(), + args: serde_json::json!({}), + opts: InsertOpts { + queue: queue.to_string(), + ..Default::default() + }, + } + } + + #[test] + fn enqueue_batch_queue_counts_groups_by_routed_queue() { + let params = vec![ + params_for_queue("partition-a"), + params_for_queue("partition-b"), + params_for_queue("partition-a"), + ]; + + assert_eq!( + enqueue_batch_queue_counts(¶ms), + vec![ + ("partition-a".to_string(), 2), + ("partition-b".to_string(), 1), + ] + ); + } + + #[test] + fn enqueue_batch_queue_counts_homogeneous_batch_is_single_entry() { + let params = vec![params_for_queue("email"), params_for_queue("email")]; + + assert_eq!( + enqueue_batch_queue_counts(¶ms), + vec![("email".to_string(), 2)] + ); + } +} diff --git a/awa-python/src/partitioned_queue.rs b/awa-python/src/partitioned_queue.rs index 13229aa6..e48e57d6 100644 --- a/awa-python/src/partitioned_queue.rs +++ b/awa-python/src/partitioned_queue.rs @@ -130,9 +130,7 @@ impl PyPartitionedQueue { )); } (Some(0), None) => { - return Err(validation_error( - "max_workers_per_partition must be > 0", - )); + return Err(validation_error("max_workers_per_partition must be > 0")); } _ => {} } diff --git a/docs/adr/031-partitioned-queues.md b/docs/adr/031-partitioned-queues.md index d4ff6c98..96410342 100644 --- a/docs/adr/031-partitioned-queues.md +++ b/docs/adr/031-partitioned-queues.md @@ -2,7 +2,7 @@ ## Status -Proposed. +Accepted. ## Context diff --git a/docs/adr/README.md b/docs/adr/README.md index c5b9422e..2ac7cbc9 100644 --- a/docs/adr/README.md +++ b/docs/adr/README.md @@ -38,7 +38,7 @@ Template: `Status / Context / Decision / Consequences (positive, negative) / Alt | 028 | [Maintenance-only runtime role](028-maintenance-only-runtime-role.md) | Proposed | Run promotion, rescue, pruning, and metadata maintenance without claiming or executing user jobs | Complements ADR-027 and ADR-018; uses ADR-029 for durable rescue-driven side effects | | 029 | [Transactional follow-up jobs](029-transactional-followup-jobs.md) | Accepted | Durable lifecycle side effects are delivered by enqueuing follow-up Awa jobs — atomically with the triggering state UPDATE for worker-driven outcomes and for callback resolution via the worker `Client`, best-effort in a separate transaction for maintenance rescue; hooks remain for observation | Codifies ADR-015's "enqueue another job" guidance; addresses the durable-event punt in ADRs 027/028 | | 030 | [Durable batch operations for operator bulk mutation](030-batch-operations.md) | Accepted | Filter-driven async bulk mutation with preview, progress, cancellation, retention, and maintenance-led execution; v0.6 starts with `set_priority` and `move_queue` | Refines ADR-019/025 operator mutation paths; complements ADR-028 | -| 031 | [Partitioned queues](031-partitioned-queues.md) | Proposed | First-class logical queue partitioning over ordinary physical queues, with domain-separated key routing and Python per-job COPY opts | Composes ADR-019/023/026 storage guarantees; refines the ADR-025 sharding interaction | +| 031 | [Partitioned queues](031-partitioned-queues.md) | Accepted | First-class logical queue partitioning over ordinary physical queues, with domain-separated key routing and Python per-job COPY opts | Composes ADR-019/023/026 storage guarantees; refines the ADR-025 sharding interaction | ## Validation artifacts