|
| 1 | +# Scheduler |
| 2 | + |
| 3 | +This page explains how NUClear's task scheduler works internally — the lock-free queues, thread pools, group tokens, and the path from `emit()` to a running reaction callback. |
| 4 | + |
| 5 | +For the user-facing view of pools, priorities, groups, and idle tasks, see [Threading Model](threading.md). For DSL usage, see the [Scheduling](../reference/dsl/index.md) reference words. |
| 6 | + |
| 7 | +## Role in the system |
| 8 | + |
| 9 | +Every reaction execution is a **task** (`ReactionTask`) submitted to the scheduler. The `PowerPlant` owns a single `Scheduler` instance and forwards all work to it: |
| 10 | + |
| 11 | +1. A trigger (message emit, timer, IO event, etc.) creates a `ReactionTask`. |
| 12 | +2. `PowerPlant::submit()` calls `Scheduler::submit()`. |
| 13 | +3. The scheduler resolves the target **pool**, acquires any required **group** tokens, and enqueues the task. |
| 14 | +4. A pool worker dequeues the task, runs the callback, and releases group locks when the callback returns. |
| 15 | + |
| 16 | +`PowerPlant::start()` calls `Scheduler::start()`, which starts worker pools and then blocks the calling thread in the **MainThread** pool until shutdown. `PowerPlant::shutdown()` emits the shutdown event and calls `Scheduler::stop()`. |
| 17 | + |
| 18 | +```mermaid |
| 19 | +flowchart LR |
| 20 | + subgraph PowerPlant |
| 21 | + PP[PowerPlant] |
| 22 | + end |
| 23 | +
|
| 24 | + subgraph Scheduler |
| 25 | + S[Scheduler] |
| 26 | + G[Groups] |
| 27 | + end |
| 28 | +
|
| 29 | + subgraph Pools |
| 30 | + DP[Default Pool] |
| 31 | + CP[Custom Pools] |
| 32 | + MT[MainThread] |
| 33 | + end |
| 34 | +
|
| 35 | + PP -->|submit| S |
| 36 | + S --> G |
| 37 | + S --> DP |
| 38 | + S --> CP |
| 39 | + S --> MT |
| 40 | + DP -->|run callback| PP |
| 41 | + CP -->|run callback| PP |
| 42 | + MT -->|run callback| PP |
| 43 | +``` |
| 44 | + |
| 45 | +## Core components |
| 46 | + |
| 47 | +### Scheduler |
| 48 | + |
| 49 | +The scheduler is the central coordinator. It: |
| 50 | + |
| 51 | +- **Owns pools** — lazily created from `ThreadPoolDescriptor` values (default pool, `MainThread`, custom `Pool<T>`, etc.). |
| 52 | +- **Owns groups** — lazily created from `GroupDescriptor` values (`Sync<T>`, `Group<T>`, etc.). |
| 53 | +- **Routes submission** — resolves pool and group constraints, then hands runnable work to the correct pool. |
| 54 | +- **Tracks idle reactions** — global idle tasks and a count of pools that participate in idle detection. |
| 55 | + |
| 56 | +Pool and group maps are protected by mutexes, but those locks are **not** on the hot path for steady-state submission: pool pointers are cached on each `Reaction`, and single-group tasks use a lock-free fast path (see below). |
| 57 | + |
| 58 | +Destruction order matters: `groups` are declared after `pools` in the scheduler so groups (which hold non-owning `Pool*` in parked waiters) are destroyed before pools. |
| 59 | + |
| 60 | +### Pool |
| 61 | + |
| 62 | +Each pool is a set of worker threads (or a single thread for `MainThread`) plus: |
| 63 | + |
| 64 | +- **Five priority-bucket queues** — one lock-free queue per priority level. |
| 65 | +- **A condition variable** — workers sleep when no runnable work is available. |
| 66 | +- **Idle machinery** — per-pool and global idle reactions, counting locks, and a `pending_idle` latch for external waiters. |
| 67 | + |
| 68 | +Workers loop in `Pool::run()`: dequeue a task, call `ReactionTask::run()`, repeat until shutdown. |
| 69 | + |
| 70 | +The default pool's thread count comes from `Configuration::default_pool_concurrency` (typically hardware concurrency). Other pools use the `concurrency` value from their descriptor. |
| 71 | + |
| 72 | +### Group |
| 73 | + |
| 74 | +A group limits how many tasks sharing the same descriptor may run concurrently. `Sync<T>` is a group with concurrency 1. |
| 75 | + |
| 76 | +Groups maintain: |
| 77 | + |
| 78 | +- A **token counter** (`tokens`) — starts at the group's concurrency; decremented when a task runs, incremented when it finishes. |
| 79 | +- **Fast-path waiter buckets** — lock-free `TaskQueue` instances keyed by priority, holding tasks that could not acquire a token immediately. |
| 80 | +- **Slow-path queue** — mutex-backed sorted list used when a task needs locks on **multiple** groups at once (`CombinedLock`). |
| 81 | + |
| 82 | +## Task submission path |
| 83 | + |
| 84 | +When `Scheduler::submit()` receives a task: |
| 85 | + |
| 86 | +```mermaid |
| 87 | +sequenceDiagram |
| 88 | + participant RT as ReactionTask |
| 89 | + participant S as Scheduler |
| 90 | + participant R as Reaction cache |
| 91 | + participant G as Group |
| 92 | + participant P as Pool |
| 93 | +
|
| 94 | + RT->>S: submit(task) |
| 95 | + S->>R: load scheduler_data (Pool*) |
| 96 | + alt cache miss |
| 97 | + S->>S: get_pool(descriptor) |
| 98 | + S->>R: store Pool* |
| 99 | + end |
| 100 | +
|
| 101 | + alt single group (fast path) |
| 102 | + alt run_inline and token free |
| 103 | + S->>RT: run() immediately |
| 104 | + else |
| 105 | + S->>G: try_submit(task, pool) |
| 106 | + alt token available |
| 107 | + G->>P: submit with RunningLock |
| 108 | + else |
| 109 | + G->>G: park in wait bucket |
| 110 | + end |
| 111 | + end |
| 112 | + else multiple groups (slow path) |
| 113 | + S->>S: CombinedLock over groups |
| 114 | + S->>P: submit(task, lock) |
| 115 | + end |
| 116 | +``` |
| 117 | + |
| 118 | +### Pool resolution cache |
| 119 | + |
| 120 | +The first submit for a reaction calls `get_pool()` under `pools_mutex`. The resulting `Pool*` is stored in `Reaction::scheduler_data` — a plain `std::atomic<Pool*>` rather than `atomic<shared_ptr>` to avoid libstdc++'s hashed mutex pool for atomic shared pointers, which would contend on hot paths. |
| 121 | + |
| 122 | +Subsequent submits load the cached pointer with acquire semantics. Concurrent first submits may both resolve the pool; they store the same pointer, so the race is benign. |
| 123 | + |
| 124 | +### Inline execution |
| 125 | + |
| 126 | +If a reaction is bound with `Inline` and belongs to a single group, the scheduler tries to acquire a group token and run the callback on the submitting thread without enqueueing. This avoids queue overhead for synchronous emit paths. |
| 127 | + |
| 128 | +## Thread pools and queue selection |
| 129 | + |
| 130 | +Each pool holds an array of five `Queue<Task>` instances — one per priority bucket. At construction time the pool chooses the concrete queue type: |
| 131 | + |
| 132 | +| Pool kind | Queue type | Why | |
| 133 | +| --------- | ---------- | --- | |
| 134 | +| Default pool (`Pool<>`) | `TaskQueue` (MPMC) | Concurrency may differ from the descriptor's nominal value; multiple workers dequeue concurrently. | |
| 135 | +| `MainThread`, Trace pool, any pool with `concurrency == 1` | `MPSCQueue` (MPSC) | Exactly one consumer; simpler and cheaper than MPMC. | |
| 136 | +| Custom pools with `concurrency > 1` | `TaskQueue` (MPMC) | Multiple workers compete for tasks. | |
| 137 | + |
| 138 | +The virtual `Queue` interface lets `Pool` store both implementations in one `std::array` without templating the entire pool. The virtual call cost is negligible compared to the atomic operations inside enqueue and dequeue. |
| 139 | + |
| 140 | +Workers identify themselves via a thread-local `Pool::current_pool` pointer, set when `run()` starts. `Pool::current()` returns a `shared_ptr` to the active pool, or `nullptr` off-scheduler threads. |
| 141 | + |
| 142 | +## Priority buckets |
| 143 | + |
| 144 | +Tasks are not kept in one monolithic priority queue. Instead, each pool has **five fixed buckets** scanned from highest to lowest priority: |
| 145 | + |
| 146 | +| Bucket | Priority range | DSL level | |
| 147 | +| ------ | -------------- | --------- | |
| 148 | +| REALTIME | ≥ 1000 | `Priority::REALTIME` | |
| 149 | +| HIGH | ≥ 750 | `Priority::HIGH` | |
| 150 | +| NORMAL | ≥ 500 | `Priority::NORMAL` (default) | |
| 151 | +| LOW | ≥ 250 | `Priority::LOW` | |
| 152 | +| IDLE | < 250 | `Priority::IDLE` | |
| 153 | + |
| 154 | +`Pool::try_dequeue_task()` walks buckets 0→4 and returns the first available task. Within a bucket, ordering is **FIFO** (per-producer FIFO in the MPMC queue; strict FIFO in MPSC). Priority therefore dominates bucket order; tie-breaking within a bucket follows enqueue order, not reaction ID. |
| 155 | + |
| 156 | +Priority affects **queuing order only**. Running tasks are never preempted. |
| 157 | + |
| 158 | +## Lock-free queues |
| 159 | + |
| 160 | +Both queue implementations use a **block-based** design: fixed-size blocks of 64 slots linked in a list. Producers claim slots with `write.fetch_add(1)`, construct the payload in place, then set a `committed` flag. Consumers read committed slots and advance head/tail as blocks drain. |
| 161 | + |
| 162 | +### TaskQueue (MPMC) |
| 163 | + |
| 164 | +Used where multiple pool threads dequeue concurrently. |
| 165 | + |
| 166 | +- **Producers**: wait-free slot claim within a non-full block; lock-free block linking when a block overflows. |
| 167 | +- **Consumers**: CAS on per-block read index; may spin briefly waiting for a producer to commit a slot. |
| 168 | +- **Graveyard**: fully drained blocks are retired to a graveyard list rather than deleted immediately, so producers still referencing an old block via `tail` cannot use freed memory. Blocks are freed when the queue is destroyed. |
| 169 | + |
| 170 | +Cross-producer ordering is not guaranteed; per-producer FIFO is preserved. |
| 171 | + |
| 172 | +### MPSCQueue (MPSC) |
| 173 | + |
| 174 | +Used for single-consumer pools (`MainThread`, concurrency-1 custom pools). |
| 175 | + |
| 176 | +The producer side matches `TaskQueue`. The consumer side is simpler: a plain (non-atomic) read index, no CAS on dequeue, and immediate block retirement to the graveyard when advancing. |
| 177 | + |
| 178 | +`try_dequeue` must only be called from the designated consumer thread. Force shutdown from another thread delegates queue draining to that consumer via `discard_queues_requested`. |
| 179 | + |
| 180 | +### Shared block helpers |
| 181 | + |
| 182 | +`queue/detail/block_ops.hpp` provides `link_next_block`, `retire_block`, and spin/backoff helpers shared by both queues. |
| 183 | + |
| 184 | +### Lock-free vs wait-free |
| 185 | + |
| 186 | +The queues are **lock-free** at the algorithm level: no mutexes, and the system makes progress under contention. They are **not wait-free end-to-end**: |
| 187 | + |
| 188 | +- Block allocation uses `operator new`. |
| 189 | +- Overflow paths use CAS loops on list pointers. |
| 190 | +- Consumers may spin waiting for a producer's `committed` flag. |
| 191 | + |
| 192 | +The hot-path slot claim via `fetch_add` is wait-free within a non-full block. See `docs/spikes/taskqueue-waitfree-assessment.md` for a detailed progress-guarantee analysis. |
| 193 | + |
| 194 | +## Group and sync semantics |
| 195 | + |
| 196 | +### Single-group fast path |
| 197 | + |
| 198 | +Most reactions belong to at most one group (including `Sync<T>`). For these, `Group::try_submit()`: |
| 199 | + |
| 200 | +1. Tries to decrement `tokens` with a compare-exchange. |
| 201 | +2. On success, submits to the pool immediately with a `RunningLock` that calls `release_token()` on destruction. |
| 202 | +3. On failure, **parks** the task in priority-ordered waiter buckets via `park_publish()` / `park_reconcile()`. |
| 203 | + |
| 204 | +The token counter can go **negative** when waiters reserve slots they have not yet consumed. This signed counter, combined with per-waiter **arbiter slots** (`atomic<bool>`), ensures no lost wakeups and exact accounting when multiple waiters race with draining threads. |
| 205 | + |
| 206 | +When a running task finishes, `release_token()` increments `tokens` and drains at most one parked waiter into the pool — keeping running count bounded by the group's concurrency. |
| 207 | + |
| 208 | +### Multi-group slow path |
| 209 | + |
| 210 | +Tasks bound to multiple groups (`Sync<A>` and `Sync<B>`, etc.) use `CombinedLock`: each group gets a `GroupLock` backed by a mutex-protected sorted queue. `slow_pending` on each group prevents fast-path submitters from jumping ahead of older multi-group waiters. |
| 211 | + |
| 212 | +When a `GroupLock` is released, the group may drain a fast-path waiter even if slow-path waiters exist, if the pre-release token count indicates a committed fast waiter is owed a slot — avoiding deadlocks between fast and slow paths. |
| 213 | + |
| 214 | +### External waiters |
| 215 | + |
| 216 | +When a task is parked in a group's wait buckets (not yet in the pool queue), the destination pool must not go idle as if it had no work. `Pool::register_external_waiter()` increments `external_waiters`, keeping workers alive until the parked task is drained or the registration is destroyed. |
| 217 | + |
| 218 | +If idle reactions are registered for that pool (or globally), a `pending_idle` latch ensures one idle epoch fires before the next dequeue — preserving the invariant that parking a non-runnable task triggers idle detection, even if the worker is preempted and a runnable task arrives in the queue before the worker resumes. |
| 219 | + |
| 220 | +### Slow-path locks in the pool |
| 221 | + |
| 222 | +Tasks submitted with a `GroupLock` (slow path) or dequeued before their lock is acquirable are re-enqueued and the worker waits on the condition variable until `notify()` runs from lock release. |
| 223 | + |
| 224 | +## Idle tasks and shutdown |
| 225 | + |
| 226 | +### Idle tasks |
| 227 | + |
| 228 | +Idle reactions (`on<Idle<>>`, `on<Idle<Pool<T>>>`) are registered via `PowerPlant::add_idle_task()` → `Scheduler::add_idle_task()`. |
| 229 | + |
| 230 | +When a pool worker finds no runnable task: |
| 231 | + |
| 232 | +1. It tries `get_idle_task()` — acquiring counting locks that track per-thread and per-pool idle state. |
| 233 | +2. When all threads in a pool are idle and the pool holds the global idle lock, global idle reactions are collected. |
| 234 | +3. A synthetic `ReactionTask` runs that re-submits each idle reaction's task via `scheduler.submit()`. |
| 235 | + |
| 236 | +`global_idle_count` is an atomic so pools can cheaply check whether global idle exists without locking the scheduler on every external-waiter registration. |
| 237 | + |
| 238 | +### Shutdown sequence |
| 239 | + |
| 240 | +`Scheduler::stop(force)` sets `running = false` and stops all pools. |
| 241 | + |
| 242 | +| Stop type | Behaviour | |
| 243 | +| --------- | --------- | |
| 244 | +| `NORMAL` | Pools stop accepting new work (except **persistent** pools, which keep accepting during shutdown). Workers drain queued tasks. | |
| 245 | +| `FINAL` | Used after the main thread exits `start()`; even persistent pools stop once their queues empty. | |
| 246 | +| `FORCE` | Clears queues and wakes all threads; used for forced test timeouts. MPSC pools require the consumer thread to perform the drain. | |
| 247 | + |
| 248 | +`Scheduler::start()` starts worker pools first, then blocks in `MainThread::start()`. When the main thread pool exits (after shutdown), pools are stopped in order — non-persistent pools before persistent ones — then joined. |
| 249 | + |
| 250 | +Persistent pools (`ThreadPoolDescriptor::persistent`) continue accepting tasks during a normal shutdown so networking or logging reactors can finish in-flight work. |
| 251 | + |
| 252 | +## Design tradeoffs |
| 253 | + |
| 254 | +| Choice | Rationale | |
| 255 | +| ------ | --------- | |
| 256 | +| Virtual `Queue` interface | One bucket array in `Pool` without templating the entire pool; indirection cost is dwarfed by atomics. | |
| 257 | +| Separate `MPSCQueue` | Single-consumer pools avoid MPMC CAS on dequeue; meaningful win for `MainThread` and concurrency-1 pools. | |
| 258 | +| Priority buckets vs one sorted queue | Fixed five buckets give O(1) bucket selection and lock-free queues per level; fine-grained priority within a bucket is FIFO, not strict global ordering by task ID. | |
| 259 | +| Lock-free group fast path | Single-group `Sync` is the common case; parking in lock-free buckets avoids mutex contention on submission. | |
| 260 | +| Mutex for pool/group maps | Pools and groups are created once per descriptor; mutex cost is paid on first use, not every submit. | |
| 261 | +| Condition variable for workers | Lock-free queues hold tasks, but workers must sleep when idle; CV + `live` flag avoids busy-waiting. | |
| 262 | +| Non-preemptive execution | Simpler reasoning, no priority inversion from preemption; long tasks hold a thread until completion. | |
| 263 | + |
| 264 | +## See also |
| 265 | + |
| 266 | +- [Threading Model](threading.md) — pools, priorities, groups, and idle tasks from a user perspective |
| 267 | +- [Synchronization](../how-to/synchronization.md) — using `Sync` and `Group` in reactors |
| 268 | +- [Priority](../reference/dsl/priority.md) — DSL priority levels and values |
| 269 | +- [Pool](../reference/dsl/pool.md) — routing reactions to custom thread pools |
| 270 | +- [Group](../reference/dsl/group.md) — limiting concurrent execution |
| 271 | +- [Idle](../reference/dsl/idle.md) — running work when pools are idle |
0 commit comments