Add proxy-based cluster routing for backend engine nodes#44
Add proxy-based cluster routing for backend engine nodes#44
Conversation
- introduce Cluster gRPC service and node registration/heartbeat messages - add new `proxy` binary with routing, backend discovery, and stale-node reaping - refactor `server` into backend service module and add cluster client auto-registration - add proxy cluster integration tests and RFC 1004 documentation
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
📝 WalkthroughWalkthroughIntroduces a gRPC proxy layer enabling task routing across backend engine nodes via a new Cluster service. Adds backend registration with heartbeats, tag-based node selection, configuration parsing, and comprehensive integration tests. Implements cluster membership management with in-memory state and TTL-based node pruning. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant B as Backend
participant P as Proxy
participant C as Client
rect rgba(0, 100, 200, 0.5)
Note over B,P: Cluster Registration & Heartbeat
B->>+P: RegisterNode(node_id, advertise_addr, tags, tasks)
P->>P: upsert_node(NodeState)
P-->>-B: NodeRegisterResponse(session_id, heartbeat_interval)
loop Every heartbeat_interval
B->>+P: Heartbeat(node_id, session_id)
P->>P: Update last_seen_unix
P-->>-B: NodeHeartbeatAck(server_time)
end
end
rect rgba(0, 150, 100, 0.5)
Note over C,P: Task Routing via Proxy
C->>+P: CreateTask(task_spec)
P->>P: candidate_nodes_for_task(namespace:task)
P->>P: select_create_candidate(candidates)
P->>-B: CreateTask(forwarded request)
B-->>C: Task(instance_id with node_id@local_id)
end
rect rgba(150, 100, 0, 0.5)
Note over C,P: Node Membership Maintenance
P->>P: reap_stale_nodes() periodic task
alt Node TTL expired
P->>P: Remove from nodes map
else Healthy
P->>P: Keep in membership
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (6)
engine/src/routing.rs (1)
135-140: Redundant rule fallback check.Lines 135-140 check if
rules.is_empty()and add a default rule, but this condition can never be true. The preceding code at lines 122-133 already ensuresrulesis non-empty:
- If
config.rules.is_empty()(line 122), a default rule is added (lines 123-126)- Otherwise, rules are populated from config (lines 128-132)
♻️ Remove redundant check
} else { let mut rules = Vec::with_capacity(config.rules.len()); for rule in config.rules.iter().cloned() { rules.push(RouteRule::from_toml(rule)?); } rules }; - if rules.is_empty() { - rules.push(RouteRule::from_toml(RouteRuleToml { - r#match: "*:*".into(), - require_tags: Vec::new(), - })?); - } - Ok(Self {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/routing.rs` around lines 135 - 140, The if block that checks rules.is_empty() and pushes a default RouteRule via RouteRule::from_toml(RouteRuleToml { r#match: "*:*".into(), require_tags: Vec::new(), })? is redundant because earlier logic already guarantees rules is non-empty; remove this redundant check and push so there is a single code path that constructs the default rule (i.e., delete the entire if rules.is_empty() { ... } block) to avoid dead code and keep RouteRule/RouteRuleToml handling centralized.rfc/rfc1004.md (1)
20-20: Minor style suggestion from static analysis.The phrase "originally created" could be simplified to just "created" for conciseness. This is a documentation nitpick.
📝 Suggested text simplification
-- follow-up RPCs like `PublishTask` depend on whichever backend originally created the task. +- follow-up RPCs like `PublishTask` depend on whichever backend created the task.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rfc/rfc1004.md` at line 20, The sentence in rfc1004.md referencing follow-up RPCs uses the wordy phrase "originally created"; update the line that mentions follow-up RPCs like `PublishTask` so it reads "follow-up RPCs like `PublishTask` depend on whichever backend created the task." — locate the sentence that contains `PublishTask` and replace "originally created" with "created" for conciseness.engine/src/service/backend.rs (2)
186-186: Consider using read lock instead of write lock forget_tasks.The
get_tasksmethod acquires a write lock (self.EngineAPI.write().await) but only reads data. Using a read lock would allow concurrent read operations.♻️ Use read lock for get_tasks
async fn get_tasks( &self, request: tonic::Request<proto::TaskPageRequest>, ) -> Result<Response<proto::TaskPage>, Status> { - let mut api = self.EngineAPI.write().await; + let api = self.EngineAPI.read().await; let challenge = get_auth(&request); let db = api.db.clone(); - if !Events::CheckAdminAuth(&mut api, challenge, ("".into(), "".into()), db) { + // Note: CheckAdminAuth may need refactoring to accept &api instead of &mut api + // if it doesn't actually mutate stateNote: This refactor depends on whether
Events::CheckAdminAuthactually requires mutable access. If it does mutate state, the write lock is necessary.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/service/backend.rs` at line 186, The get_tasks function currently grabs a write lock via self.EngineAPI.write().await but only performs reads; change that to a read lock by using self.EngineAPI.read().await and update any local variable types/usages accordingly so you treat api as an immutable reference for the read path; before switching, verify whether the Events::CheckAdminAuth branch requires mutation—if it does, keep the write lock there and narrow the write lock only to the mutation scope, otherwise replace the write lock with a read lock throughout get_tasks.
50-64: Duplicateparse_task_keyimplementation.This function duplicates the logic in
engine/src/service/proxy.rs(lines 61-73). Consider extracting to a shared utility module to maintain DRY principle.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/service/backend.rs` around lines 50 - 64, The parse_task_key implementation in backend.rs duplicates logic already in proxy.rs (parse_task_key -> returns Identifier via ID and uses Status) — extract this parsing logic into a shared utility (e.g., service::util or service::task_id) that exposes a public function parse_task_key(task_id: &str) -> Result<Identifier, Status>, move the current logic there, replace the duplicate implementations in both backend.rs and proxy.rs with calls to the new shared parse_task_key, and update imports/visibility (pub use if needed) so existing callers (including functions referencing ID, Identifier, and Status) continue to work without changing behavior.engine/src/cluster_client.rs (1)
86-135: Consider adding graceful shutdown support for the registration task.The
spawn_registrationfunction runs an infinite loop with no shutdown mechanism. While this works for normal operation (task runs until process exits), it prevents graceful shutdown coordination. Consider accepting aCancellationTokenoroneshot::Receiverfor shutdown signaling.♻️ Example with shutdown support
-pub fn spawn_registration(registration: NodeRegistration) -> JoinHandle<()> { +pub fn spawn_registration( + registration: NodeRegistration, + mut shutdown: tokio::sync::oneshot::Receiver<()>, +) -> JoinHandle<()> { tokio::spawn(async move { loop { + tokio::select! { + _ = &mut shutdown => { + info!("registration task shutting down for {}", registration.node_id); + return; + } + result = ClusterClient::connect(registration.cluster_proxy_addr.clone()) => { - match ClusterClient::connect(registration.cluster_proxy_addr.clone()).await { + match result { // ... rest of the logic + } + } + } } }) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/cluster_client.rs` around lines 86 - 135, spawn_registration currently spawns an infinite background task with no way to stop it; change the signature to accept a shutdown signal (e.g., tokio_util::sync::CancellationToken or a oneshot::Receiver<()>) and use it to drive graceful exit: update spawn_registration(registration: NodeRegistration) to spawn_registration(registration: NodeRegistration, shutdown: CancellationToken) (or shutdown_rx), then inside the outer and inner loops replace plain sleep calls with tokio::select! between the sleep/heartbeat futures and shutdown.cancelled()/shutdown_rx to break out and return; also break/return immediately on shutdown while ensuring any outstanding client is dropped/cleaned up before exit. Ensure register_node, heartbeat loops and error-handling paths check the same shutdown signal so the task can terminate cleanly.engine/tests/proxy_cluster.rs (1)
257-265: Potential fragility in task ordering assertion.The assertion expects a specific ordering
["dist:work", "ml:train", "node:node-a", "node:node-b"]. If the sorting or deduplication logic changes, or if task registration order varies, this test could become flaky. Consider using a set comparison or sorting both sides before comparison.♻️ Suggested fix for resilient ordering
+ let mut expected = vec![ + "dist:work".to_string(), + "ml:train".to_string(), + "node:node-a".to_string(), + "node:node-b".to_string(), + ]; + expected.sort(); + let mut actual = response.tasks.clone(); + actual.sort(); assert_eq!( - response.tasks, - vec![ - "dist:work".to_string(), - "ml:train".to_string(), - "node:node-a".to_string(), - "node:node-b".to_string(), - ] + actual, + expected );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/tests/proxy_cluster.rs` around lines 257 - 265, The test currently asserts exact ordering of response.tasks which is fragile; modify the assertion to be order-insensitive by either sorting both sides or comparing as sets—locate the assertion using response.tasks in engine/tests/proxy_cluster.rs and replace the strict assert_eq! with an assertion that compares a sorted response.tasks (or converts response.tasks and the expected Vec into HashSets) against the expected items ("dist:work", "ml:train", "node:node-a", "node:node-b") so the test passes regardless of task ordering or registration order.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine/src/bin/proxy.rs`:
- Around line 18-33: The reaper JoinHandle returned by spawn_reaper is being
dropped (let _reaper = ...) which hides if it panics or exits; keep the handle
(e.g., let reaper_handle = spawn_reaper(state.clone())) and ensure the process
fails fast if that task stops: either spawn a watcher task that awaits
reaper_handle.await and on Err/panic or Ok(Err) logs and calls process::exit(1),
or wire the watcher to the server shutdown signal so
Server::builder().serve_with_incoming(...) is aborted when the reaper finishes.
Reference spawn_reaper, the JoinHandle (reaper_handle) and the
serve_with_incoming call when adding the watcher.
In `@engine/src/service/backend.rs`:
- Around line 447-448: The log message in publish_task is a copy-paste: update
the info! call inside the publish_task authentication failure branch to use a
correct message (e.g., "Publish Task denied due to Invalid Auth" or similar)
instead of "Aquire Task denied due to Invalid Auth"; locate the info! invocation
in the publish_task function and change only the string literal to reflect
"Publish" so logs accurately identify the method.
In `@engine/tests/common/mod.rs`:
- Around line 172-189: Move the registration/start-advertise step so it happens
only after the server is actually serving: spawn the Server (server_task) first,
then after the server is started and wait_for_node(proxy_addr, node_id).await?
completes, create and spawn the registration task (call
registration_from_api(&api_guard)? and then .map(spawn_registration) or invoke
spawn_registration) so the node is added to the proxy only once EngineService is
accepting RPCs. Update the order around registration_task, server_task, and
wait_for_node and keep the shutdown channel logic unchanged.
- Around line 244-276: wait_for_node and wait_for_node_count abort on the first
cluster_client or list_nodes error because the loop uses ? which propagates
errors; change the loop to handle transient RPC failures by catching errors from
cluster_client(...) and client.list_nodes(...) (inside wait_for_node and
wait_for_node_count), treat them as transient (optionally log), sleep for 50ms
and continue the loop instead of returning Err, and only return Ok when the
condition is met or let the outer timeout expire; ensure you still propagate
non-transient errors outside the loop if needed but do not use ? directly inside
the polling loop.
In `@enginelib/src/config.rs`:
- Around line 28-37: The config struct that currently derives Debug (the struct
containing node_id, advertise_addr, cluster_proxy_addr, cluster_token,
node_tags) must stop deriving Debug and instead implement a manual fmt::Debug
that prints all fields normally but replaces secret fields with a redaction
placeholder; specifically redact cluster_token (and any other token-like fields
present) by printing "<redacted>" instead of the actual value. Remove
#[derive(Debug)] from the struct, add impl std::fmt::Debug for <the config
struct> that formats each field (use the existing field names such as node_id,
advertise_addr, cluster_proxy_addr, node_tags) and for cluster_token (and other
*token fields) write the literal "<redacted>" to the formatter.
---
Nitpick comments:
In `@engine/src/cluster_client.rs`:
- Around line 86-135: spawn_registration currently spawns an infinite background
task with no way to stop it; change the signature to accept a shutdown signal
(e.g., tokio_util::sync::CancellationToken or a oneshot::Receiver<()>) and use
it to drive graceful exit: update spawn_registration(registration:
NodeRegistration) to spawn_registration(registration: NodeRegistration,
shutdown: CancellationToken) (or shutdown_rx), then inside the outer and inner
loops replace plain sleep calls with tokio::select! between the sleep/heartbeat
futures and shutdown.cancelled()/shutdown_rx to break out and return; also
break/return immediately on shutdown while ensuring any outstanding client is
dropped/cleaned up before exit. Ensure register_node, heartbeat loops and
error-handling paths check the same shutdown signal so the task can terminate
cleanly.
In `@engine/src/routing.rs`:
- Around line 135-140: The if block that checks rules.is_empty() and pushes a
default RouteRule via RouteRule::from_toml(RouteRuleToml { r#match:
"*:*".into(), require_tags: Vec::new(), })? is redundant because earlier logic
already guarantees rules is non-empty; remove this redundant check and push so
there is a single code path that constructs the default rule (i.e., delete the
entire if rules.is_empty() { ... } block) to avoid dead code and keep
RouteRule/RouteRuleToml handling centralized.
In `@engine/src/service/backend.rs`:
- Line 186: The get_tasks function currently grabs a write lock via
self.EngineAPI.write().await but only performs reads; change that to a read lock
by using self.EngineAPI.read().await and update any local variable types/usages
accordingly so you treat api as an immutable reference for the read path; before
switching, verify whether the Events::CheckAdminAuth branch requires mutation—if
it does, keep the write lock there and narrow the write lock only to the
mutation scope, otherwise replace the write lock with a read lock throughout
get_tasks.
- Around line 50-64: The parse_task_key implementation in backend.rs duplicates
logic already in proxy.rs (parse_task_key -> returns Identifier via ID and uses
Status) — extract this parsing logic into a shared utility (e.g., service::util
or service::task_id) that exposes a public function parse_task_key(task_id:
&str) -> Result<Identifier, Status>, move the current logic there, replace the
duplicate implementations in both backend.rs and proxy.rs with calls to the new
shared parse_task_key, and update imports/visibility (pub use if needed) so
existing callers (including functions referencing ID, Identifier, and Status)
continue to work without changing behavior.
In `@engine/tests/proxy_cluster.rs`:
- Around line 257-265: The test currently asserts exact ordering of
response.tasks which is fragile; modify the assertion to be order-insensitive by
either sorting both sides or comparing as sets—locate the assertion using
response.tasks in engine/tests/proxy_cluster.rs and replace the strict
assert_eq! with an assertion that compares a sorted response.tasks (or converts
response.tasks and the expected Vec into HashSets) against the expected items
("dist:work", "ml:train", "node:node-a", "node:node-b") so the test passes
regardless of task ordering or registration order.
In `@rfc/rfc1004.md`:
- Line 20: The sentence in rfc1004.md referencing follow-up RPCs uses the wordy
phrase "originally created"; update the line that mentions follow-up RPCs like
`PublishTask` so it reads "follow-up RPCs like `PublishTask` depend on whichever
backend created the task." — locate the sentence that contains `PublishTask` and
replace "originally created" with "created" for conciseness.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: eb48c256-e306-4f4b-a75f-1ac38794f25b
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (16)
engine/Cargo.tomlengine/proto/engine.protoengine/src/bin/proxy.rsengine/src/bin/server.rsengine/src/cluster_client.rsengine/src/lib.rsengine/src/proto.rsengine/src/proxy_config.rsengine/src/routing.rsengine/src/service/backend.rsengine/src/service/mod.rsengine/src/service/proxy.rsengine/tests/common/mod.rsengine/tests/proxy_cluster.rsenginelib/src/config.rsrfc/rfc1004.md
engine/tests/common/mod.rs
Outdated
| let registration_task = { | ||
| let api_guard = api.read().await; | ||
| registration_from_api(&api_guard)? | ||
| } | ||
| .map(spawn_registration); | ||
|
|
||
| let (shutdown_tx, shutdown_rx) = oneshot::channel(); | ||
| let engine_service = BackendEngineService::new(api.clone()); | ||
| let server_task = tokio::spawn(async move { | ||
| Server::builder() | ||
| .add_service(proto::engine_server::EngineServer::new(engine_service)) | ||
| .serve_with_incoming_shutdown(TcpListenerStream::new(listener), async { | ||
| let _ = shutdown_rx.await; | ||
| }) | ||
| .await | ||
| }); | ||
|
|
||
| wait_for_node(proxy_addr, node_id).await?; |
There was a problem hiding this comment.
Advertise the backend only after it is serving.
The registration task starts before the backend server is actually running, and wait_for_node only checks proxy membership. That can make tests route to a node that is registered but not yet accepting RPCs.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/tests/common/mod.rs` around lines 172 - 189, Move the
registration/start-advertise step so it happens only after the server is
actually serving: spawn the Server (server_task) first, then after the server is
started and wait_for_node(proxy_addr, node_id).await? completes, create and
spawn the registration task (call registration_from_api(&api_guard)? and then
.map(spawn_registration) or invoke spawn_registration) so the node is added to
the proxy only once EngineService is accepting RPCs. Update the order around
registration_task, server_task, and wait_for_node and keep the shutdown channel
logic unchanged.
- Add shared task ID parsing utilities and use them across backend/proxy services - Track and react to reaper/registration task lifecycle with cancellation-aware loops - Improve cluster test stability (retry node-list polling, deterministic task list ordering) - Redact sensitive config tokens in debug output
There was a problem hiding this comment.
Actionable comments posted: 11
♻️ Duplicate comments (1)
engine/src/cluster_client.rs (1)
11-18:⚠️ Potential issue | 🟠 MajorRedact
cluster_tokenfromNodeRegistrationdebug output.Deriving
Debugon a secret-bearing struct reintroduces the same plaintext token leak the config redaction just fixed. Please remove the derive or implement a manualDebugimpl that printscluster_tokenas<redacted>.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/cluster_client.rs` around lines 11 - 18, The NodeRegistration struct currently derives Debug which will print the secret cluster_token; replace the automatic derive with a manual implementation: remove Debug from #[derive(...)] on NodeRegistration and implement std::fmt::Debug for NodeRegistration that formats all fields the same but prints cluster_token as "<redacted>" (keep Clone/other derives intact), ensuring the Debug impl references the struct name NodeRegistration and the field cluster_token to locate the change.
🧹 Nitpick comments (2)
engine/src/bin/server.rs (1)
39-41: Hook the registration task into shutdown.The cancellation token and join handle are created, but never used after spawn, so the cancellation-aware loop still relies on runtime teardown instead of a controlled stop. Cancel the token and await the task when the server future exits.
Also applies to: 51-55
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/bin/server.rs` around lines 39 - 41, The registration cancellation token (_registration_shutdown) and task handle (_registration_task) are created but never awaited or cancelled; update the shutdown logic so when the server future is terminating you call _registration_shutdown.cancel() and then await the spawned task (unwrap or propagate the join result) from _registration_task to ensure the cancellation-aware loop in spawn_registration finishes cleanly; apply the same change to the other token/task pair created at the later block (lines 51–55) so both spawn_registration-like tasks are explicitly cancelled and awaited on server shutdown.engine/tests/proxy_cluster.rs (1)
124-180: These owner-routing tests still pass on broadcast fallback.In both cases only one backend actually holds the prefixed task, so a regression that broadcasts owner-prefixed IDs would still satisfy the assertions. The delete test is especially non-observable because both
ownerbranches assert the samequeued_a + queued_b == 0. Seed the same ID on both backends or add per-backend call tracking so the non-owner path becomes visible.Also applies to: 205-237
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/tests/proxy_cluster.rs` around lines 124 - 180, The test currently passes even if owner-prefixed IDs are broadcast because only one backend holds the prefixed task; make the owner-routing observable by ensuring both backends contain the same prefixed ID (or by adding per-backend call counters) before asserting ownership. Specifically, in proxy_cluster.rs modify the test setup around the create/aquire/publish sequence (and the similar block at lines 205-237) to seed the solved_tasks map on both backend_a and backend_b with the ID("dist","work") (or increment a per-backend call tracker) so that when you check solved_a and solved_b the non-owner branch will fail if the code incorrectly broadcasts instead of routing to the owner. Ensure you update both the create/publish test and the delete test to perform the same seeding/tracking so owner-specific behavior is enforced.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine/src/bin/server.rs`:
- Around line 35-38: registration_from_api returns
Result<Option<NodeRegistration>, String> which can't be propagated with ? from
main() -> Result<(), Box<dyn std::error::Error>>; wrap or convert the String
into a boxed error before using ?. Replace the call in main (the registration
assignment) to convert the String error, e.g.
registration_from_api(&api_guard).map_err(|e|
std::io::Error::new(std::io::ErrorKind::Other, e))? or change
registration_from_api to return Result<..., Box<dyn std::error::Error>> (or
anyhow::Error) so the ? operator in main can propagate the error; target
symbols: the registration_from_api(...) call inside main and the main() ->
Result<(), Box<dyn std::error::Error>> signature.
In `@engine/src/cluster_client.rs`:
- Around line 71-85: In normalize_advertise_addr, reject wildcard bind hosts
when advertise_addr is not provided: if advertise_addr is None (you currently
fall back to host) and host represents a wildcard bind (e.g. equals or starts
with "0.0.0.0" or "[::]" or patterns like "0.0.0.0:PORT" / "[::]:PORT"), return
an Err requiring an explicit advertise_addr instead of returning a routable URL;
otherwise keep the existing logic that prepends "http://" when needed and
validates non-empty value. Use the function name normalize_advertise_addr and
the parameters advertise_addr/host to locate and implement this check.
In `@engine/src/service/backend.rs`:
- Around line 308-318: The info logs currently emit raw uid (e.g., in the task
registry auth path around EngineAPI/Events::CheckAuth and the later
acquire/publish paths); replace those direct uid exposures by either omitting
the uid from logs or logging a one-way derived identifier (compute a secure hash
like SHA-256 or HMAC of uid and log the hex value) so the logs no longer contain
raw PII, and apply the same change consistently for the success and failure
branches that reference uid (search for occurrences around Events::CheckAuth,
and the acquire/publish handlers) to ensure all user identifiers in logs are
redacted or replaced with the hashed/correlation id.
- Around line 372-408: The current move from task_queue to executing_tasks
performs two separate DB inserts (api.db.insert("tasks", ...) and
api.db.insert("executing_tasks", ...)) so a failure on the second leaves
persistent state inconsistent; change this to perform an atomic persistence for
both states together by either (a) serializing and inserting a single snapshot
that contains both task_queue and executing_tasks (e.g., create a combined
struct or wrapper and call postcard::to_allocvec once and api.db.insert once) or
(b) use a DB transaction/batch API if available to commit both keys in one
operation; apply the same change pattern to publish_task to avoid partial-update
risk. Ensure you update the code paths that reference task_queue,
executing_tasks, and api.db.insert so only the atomic write is used and proper
error handling returns before in-memory state is mutated if persistence fails.
- Around line 520-550: The fallback branch currently returns a generic
Status::aborted("Error") which hides the real cause; replace that final
Err(Status::aborted("Error")) with a typed gRPC error such as
Err(Status::invalid_argument("Unknown task type")) (or
Err(Status::not_found(...) if you prefer not-found semantics) so clients get
correct retry semantics; update the branch after the task_registry lookup (the
else of the if let Some(tsk_reg) = tsk_reg) to return the chosen typed Status to
match other endpoints that use invalid_argument/not_found.
- Around line 125-153: Replace the current match on data.state() (which coerces
unknown discriminants to TaskState::Queued) with a fallible conversion using
TaskState::try_from(data.state) and, on Err, return an invalid_argument gRPC
error; then match the Ok(TaskState::...) to call delete_task_from_collection for
Processing/Solved/Queued as before. Apply the same change in get_tasks so it
also uses TaskState::try_from(data.state) and returns invalid_argument on
conversion failure. Ensure you reference TaskState::try_from, the raw data.state
field, delete_task_from_collection, and get_tasks when making the edits.
In `@engine/src/service/proxy.rs`:
- Around line 343-348: The code computes requested_end and then silently clamps
fanout_limit to admin_fanout_limit which causes later pages to be offset but
fetched only up to the clamp, yielding empty/partial pages; modify the logic in
proxy.rs so that you either (A) cap the requested page number before computing
offsets (compute max_pages = admin_fanout_limit / page_size and reject or adjust
request.page if request.page >= max_pages), or (B) apply the admin_fanout_limit
to the offset computation (compute effective_start = min(request.page *
page_size, admin_fanout_limit) and effective_end =
min((request.page+1)*page_size, admin_fanout_limit)) and use those bounds when
setting fanout_limit and when slicing results; update the code around
requested_end and fanout_limit (and the analogous block at 371-381) to use one
of these approaches so pages beyond admin_fanout_limit are handled
deterministically.
- Around line 232-245: The loop currently returns immediately on any Err(status)
(except NotFound), which causes transient transport failures (e.g., a just-dead
node) to abort exploring other candidates; change the logic in the for loop
around forward_acquire(node, &request).await so that: 1) you treat NotFound as
before (continue), 2) treat transient transport errors (e.g., Code::Unavailable,
Code::Unknown, Code::DeadlineExceeded, etc.) as transient and continue to the
next candidate instead of returning immediately, and 3) record the first
non-NotFound, non-transient error (or the last one) and after the loop return
that error if present, otherwise return Status::not_found("No queued tasks
available"); reference forward_acquire, request, Code::NotFound, and
Status::not_found to locate and update the matching logic.
- Around line 195-217: select_create_candidate currently breaks ties by picking
the lexicographically smaller node_id when left.in_flight_create ==
right.in_flight_create, biasing traffic; change the tie-breaker to pick randomly
between left and right instead (use the existing thread_rng() instance or
generate a random boolean) so that when left_load == right_load you return
Some(left) or Some(right) with 50/50 probability; keep references to the same
symbols (select_create_candidate, left, right, in_flight_create, node_id) when
making the change.
- Around line 155-189: The loops in broadcast_publish and broadcast_delete
currently return immediately on any non-NotFound Err(status), which causes
failures when encountering transient transport/unreachable nodes; update the
match handling in both broadcast_publish and broadcast_delete so that
Err(status) with transient codes (e.g., Code::Unavailable, Code::Unknown,
Code::DeadlineExceeded) are treated as retryable — continue iterating to search
other nodes — while still returning immediately for truly non-retryable errors;
keep the existing tracking of last_not_found and the final unwrap_or_else
not_found return if nothing succeeds.
In `@rfc/rfc1004.md`:
- Line 12: Update the RFC summary to match the single-proxy config model by
changing the phrase "one or more stateless proxies" to reflect a single proxy
(e.g., "a stateless proxy" or "the stateless proxy") so it aligns with the
configuration key cluster_proxy_addr; ensure any other references to backends
registering with proxies (and terms like "sled DB", "backend", "proxy")
consistently use singular wording or explicitly note multi-proxy support if you
intend to change the config instead.
---
Duplicate comments:
In `@engine/src/cluster_client.rs`:
- Around line 11-18: The NodeRegistration struct currently derives Debug which
will print the secret cluster_token; replace the automatic derive with a manual
implementation: remove Debug from #[derive(...)] on NodeRegistration and
implement std::fmt::Debug for NodeRegistration that formats all fields the same
but prints cluster_token as "<redacted>" (keep Clone/other derives intact),
ensuring the Debug impl references the struct name NodeRegistration and the
field cluster_token to locate the change.
---
Nitpick comments:
In `@engine/src/bin/server.rs`:
- Around line 39-41: The registration cancellation token
(_registration_shutdown) and task handle (_registration_task) are created but
never awaited or cancelled; update the shutdown logic so when the server future
is terminating you call _registration_shutdown.cancel() and then await the
spawned task (unwrap or propagate the join result) from _registration_task to
ensure the cancellation-aware loop in spawn_registration finishes cleanly; apply
the same change to the other token/task pair created at the later block (lines
51–55) so both spawn_registration-like tasks are explicitly cancelled and
awaited on server shutdown.
In `@engine/tests/proxy_cluster.rs`:
- Around line 124-180: The test currently passes even if owner-prefixed IDs are
broadcast because only one backend holds the prefixed task; make the
owner-routing observable by ensuring both backends contain the same prefixed ID
(or by adding per-backend call counters) before asserting ownership.
Specifically, in proxy_cluster.rs modify the test setup around the
create/aquire/publish sequence (and the similar block at lines 205-237) to seed
the solved_tasks map on both backend_a and backend_b with the ID("dist","work")
(or increment a per-backend call tracker) so that when you check solved_a and
solved_b the non-owner branch will fail if the code incorrectly broadcasts
instead of routing to the owner. Ensure you update both the create/publish test
and the delete test to perform the same seeding/tracking so owner-specific
behavior is enforced.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c1b1ebed-11e3-44e6-9687-3342d9133fd5
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (16)
engine/Cargo.tomlengine/src/bin/proxy.rsengine/src/bin/server.rsengine/src/cluster_client.rsengine/src/lib.rsengine/src/routing.rsengine/src/service/backend.rsengine/src/service/proxy.rsengine/src/task_id.rsengine/tests/common/mod.rsengine/tests/proxy_cluster.rsenginelib/macros/src/lib.rsenginelib/src/api.rsenginelib/src/config.rsenginelib/src/events/mod.rsrfc/rfc1004.md
✅ Files skipped from review due to trivial changes (1)
- enginelib/src/events/mod.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- engine/src/lib.rs
- engine/src/routing.rs
| let registration = { | ||
| let api_guard = api.read().await; | ||
| registration_from_api(&api_guard)? | ||
| }; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, locate and examine the server.rs file
find . -type f -name "server.rs" | grep -E "engine/src/bin" | head -5Repository: voltaero/engine
Length of output: 85
🏁 Script executed:
# Once found, examine the specific lines and surrounding context
cat -n engine/src/bin/server.rs | head -60Repository: voltaero/engine
Length of output: 2347
🏁 Script executed:
# Find the registration_from_api function definition
rg "fn registration_from_api" -A 5Repository: voltaero/engine
Length of output: 466
🏁 Script executed:
# Check the main function signature
rg "fn main" engine/src/bin/server.rs -A 3Repository: voltaero/engine
Length of output: 222
🏁 Script executed:
# Check what error types are in scope/imported
head -30 engine/src/bin/server.rsRepository: voltaero/engine
Length of output: 895
Fix type mismatch: String error cannot be converted to Box<dyn std::error::Error> with ?.
Line 37 uses ? on Result<Option<NodeRegistration>, String> within main() -> Result<(), Box<dyn std::error::Error>>. The ? operator cannot convert the String error type because String does not implement std::error::Error.
💡 Suggested fix
let registration = {
let api_guard = api.read().await;
- registration_from_api(&api_guard)?
+ registration_from_api(&api_guard).map_err(|e| Box::new(std::io::Error::other(e)) as Box<dyn std::error::Error>)?
};🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/src/bin/server.rs` around lines 35 - 38, registration_from_api returns
Result<Option<NodeRegistration>, String> which can't be propagated with ? from
main() -> Result<(), Box<dyn std::error::Error>>; wrap or convert the String
into a boxed error before using ?. Replace the call in main (the registration
assignment) to convert the String error, e.g.
registration_from_api(&api_guard).map_err(|e|
std::io::Error::new(std::io::ErrorKind::Other, e))? or change
registration_from_api to return Result<..., Box<dyn std::error::Error>> (or
anyhow::Error) so the ? operator in main can propagate the error; target
symbols: the registration_from_api(...) call inside main and the main() ->
Result<(), Box<dyn std::error::Error>> signature.
| pub fn normalize_advertise_addr( | ||
| advertise_addr: Option<&str>, | ||
| host: &str, | ||
| ) -> Result<String, String> { | ||
| let value = advertise_addr.unwrap_or(host).trim(); | ||
| if value.is_empty() { | ||
| return Err("advertise address must not be empty".into()); | ||
| } | ||
|
|
||
| if value.starts_with("http://") || value.starts_with("https://") { | ||
| Ok(value.to_string()) | ||
| } else { | ||
| Ok(format!("http://{value}")) | ||
| } | ||
| } |
There was a problem hiding this comment.
Reject wildcard bind addresses as advertised endpoints.
When advertise_addr is omitted, this falls back to host. Values like 0.0.0.0:50051 or [::]:50051 will validate here but are not routable destinations for the proxy, so the node registers successfully and then gets traffic at an unusable address. Require an explicit advertise_addr when the bind host is unspecified.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/src/cluster_client.rs` around lines 71 - 85, In
normalize_advertise_addr, reject wildcard bind hosts when advertise_addr is not
provided: if advertise_addr is None (you currently fall back to host) and host
represents a wildcard bind (e.g. equals or starts with "0.0.0.0" or "[::]" or
patterns like "0.0.0.0:PORT" / "[::]:PORT"), return an Err requiring an explicit
advertise_addr instead of returning a routable URL; otherwise keep the existing
logic that prepends "http://" when needed and validates non-empty value. Use the
function name normalize_advertise_addr and the parameters advertise_addr/host to
locate and implement this check.
engine/src/service/backend.rs
Outdated
| let result = match data.state() { | ||
| TaskState::Processing => delete_task_from_collection( | ||
| &mut api.executing_tasks.tasks, | ||
| &id, | ||
| &data.id, | ||
| "Processing", | ||
| &data.namespace, | ||
| &data.task, | ||
| |f| &f.id, | ||
| ), | ||
| TaskState::Solved => delete_task_from_collection( | ||
| &mut api.solved_tasks.tasks, | ||
| &id, | ||
| &data.id, | ||
| "Solved", | ||
| &data.namespace, | ||
| &data.task, | ||
| |f| &f.id, | ||
| ), | ||
| TaskState::Queued => delete_task_from_collection( | ||
| &mut api.task_queue.tasks, | ||
| &id, | ||
| &data.id, | ||
| "Queued", | ||
| &data.namespace, | ||
| &data.task, | ||
| |f| &f.id, | ||
| ), | ||
| }; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and read the backend.rs file
find . -type f -name "backend.rs" | grep -E "(engine|service)" | head -5Repository: voltaero/engine
Length of output: 90
🏁 Script executed:
# Find the TaskState enum definition
rg "enum TaskState" --type rust -B 2 -A 5Repository: voltaero/engine
Length of output: 41
🏁 Script executed:
# Examine the specific lines mentioned (125-153) in backend.rs
rg "delete_task_from_collection" --type rust -B 10 -A 20 | head -100Repository: voltaero/engine
Length of output: 4990
🏁 Script executed:
# Find TaskState enum definition
rg "enum TaskState" --type rust -B 5 -A 10Repository: voltaero/engine
Length of output: 41
🏁 Script executed:
# Check for proto definitions
find . -type f -name "*.proto" | head -10Repository: voltaero/engine
Length of output: 86
🏁 Script executed:
# Search for TaskState in proto files
rg "TaskState" --type-add proto:*.proto -A 5 | head -50Repository: voltaero/engine
Length of output: 2348
🏁 Script executed:
# Look for the get_tasks method to verify the similar pattern
rg "fn get_tasks" --type rust -B 5 -A 50 | head -100Repository: voltaero/engine
Length of output: 6063
🏁 Script executed:
# Read the proto file to understand TaskState definition
cat -n ./engine/proto/engine.proto | grep -A 10 "enum TaskState"Repository: voltaero/engine
Length of output: 334
🏁 Script executed:
# Look at the generated proto bindings or see how TaskState is imported
rg "use.*proto" engine/src/service/backend.rs | head -20Repository: voltaero/engine
Length of output: 41
🏁 Script executed:
# Search for the get_tasks implementation in backend.rs (it's an RPC handler)
rg "async fn get_tasks" engine/src/service/backend.rs -A 60 | head -80Repository: voltaero/engine
Length of output: 2528
🏁 Script executed:
# Check how state() accessor is actually used elsewhere
rg "\.state\(\)" engine/src/service/backend.rs -B 2 -A 2Repository: voltaero/engine
Length of output: 608
🏁 Script executed:
# Look at Cargo.toml to understand prost version
cat engine/Cargo.toml | grep -A 2 "prost"Repository: voltaero/engine
Length of output: 193
🏁 Script executed:
# Check the actual generated proto code to understand the state() method
find . -type f -name "*.rs" -path "*/proto/*" | head -5Repository: voltaero/engine
Length of output: 41
🏁 Script executed:
# Look for where proto bindings are generated/located
find . -type f -name "*.rs" | xargs grep -l "pub fn state" | head -5Repository: voltaero/engine
Length of output: 41
🏁 Script executed:
# Search for any references to TaskState::try_from
rg "TaskState::try_from" --type rustRepository: voltaero/engine
Length of output: 41
🏁 Script executed:
# Check if there's documentation or comments about prost enum handling
rg "prost" engine/src/service/backend.rs -B 2 -A 2Repository: voltaero/engine
Length of output: 41
🏁 Script executed:
# Verify the exact location of the match on data.state() in backend.rs
sed -n '115,155p' engine/src/service/backend.rsRepository: voltaero/engine
Length of output: 1368
🏁 Script executed:
# Let me search for how TaskState is used in proto generated code
find . -type d -name "*generated*" -o -name "*proto*" | head -10Repository: voltaero/engine
Length of output: 123
🏁 Script executed:
# Check the imports at the top of backend.rs to see where TaskState comes from
head -30 engine/src/service/backend.rs | grep -E "^(use|mod|pub)"Repository: voltaero/engine
Length of output: 253
🏁 Script executed:
# Let me create a small test to understand prost enum behavior
python3 << 'EOF'
# Prost 0.14 generates enums with a specific pattern
# When an unknown discriminant is encountered, it uses the default variant (lowest numbered)
# Let's check what prost does
import subprocess
result = subprocess.run(
["rg", "impl.*TaskState", "--type", "rust", "-A", "20"],
capture_output=True,
text=True,
cwd="."
)
print("=== TaskState impl search ===")
print(result.stdout if result.stdout else "No results found")
print(result.stderr if result.stderr else "")
EOFRepository: voltaero/engine
Length of output: 106
🏁 Script executed:
# Check if the match statement is exhaustive by looking at its structure
# In Rust, if the match is exhaustive, it means it covers all enum variants
# If it's not exhaustive (missing a catchall), the compiler would error
# Let's verify by checking if there's an underscore (_) or unreachable pattern
rg "match data\.state\(\)" engine/src/service/backend.rs -A 30Repository: voltaero/engine
Length of output: 2238
🏁 Script executed:
# Let's check the proto.rs file to understand how state() is generated
cat engine/src/proto.rs | head -100Repository: voltaero/engine
Length of output: 259
🏁 Script executed:
# Get more of proto.rs focusing on TaskState
rg "enum TaskState" engine/src/proto.rs -A 30Repository: voltaero/engine
Length of output: 41
🏁 Script executed:
# Check if there's a default implementation or how prost handles unknown values
rg "impl.*Default.*TaskState" engine/src/proto.rs -A 10Repository: voltaero/engine
Length of output: 41
🏁 Script executed:
# Check the build.rs to understand how proto is compiled
cat engine/build.rs 2>/dev/null || echo "No build.rs found"Repository: voltaero/engine
Length of output: 403
🏁 Script executed:
# Look for any manual enum definitions in the codebase
rg "enum TaskState\s*\{" --type rust -A 5Repository: voltaero/engine
Length of output: 41
🏁 Script executed:
# Let's check the documentation for prost to understand the state() method behavior
web_search
prost enum scalar field accessor behavior unknown discriminantRepository: voltaero/engine
Length of output: 153
🌐 Web query:
prost generated enum accessor method behavior unknown discriminant default
💡 Result:
In prost, protobuf enum fields are stored as i32 in the generated message so that unknown/forward-compatible enum values can be decoded and preserved (protobuf enums are “open”). The generated typed getter/setter (“accessor”) methods are just convenience wrappers over that i32. [1]
What the generated enum-field getter does on an unknown discriminant
The generated getter (e.g. fn r#type(&self) -> PhoneType) returns the enum’s Rust Default value if the stored i32 is not a valid variant. [1]
What “default” means for a prost-generated enum
#[derive(::prost::Enumeration)] generates impl Default for YourEnum such that default() is the first enum variant in the .proto declaration (i.e., the first variant in the generated Rust enum). [2]
So, behavior is effectively:
- Message stores raw value:
pub r#type: i32(unknown values preserved here). [1] - Getter returns:
TryFrom<i32>→ ifErr(UnknownEnumValue(_)), fall back toYourEnum::default(), which is the first variant. [1][2]
Implication: if you need to distinguish “unknown enum value on the wire” from a real/default value, you must look at the raw i32 field (or attempt YourEnum::try_from(raw_i32) yourself), not the accessor. [1][2]
References:
[1] prost docs: enum fields as i32, accessors, and “getter returns default on invalid i32” + rationale (“enums are open”). (docs.rs)
[2] prost-derive source: impl Default for generated enums uses the first variant. (docs.rs)
Citations:
🏁 Script executed:
# Verify the raw field name by checking how to access it
rg "state:" engine/src/service/backend.rs | head -5Repository: voltaero/engine
Length of output: 89
🏁 Script executed:
# Check if data.state (raw i32) can be accessed directly
python3 << 'EOF'
# Verify the exact proto message structure
with open('engine/proto/engine.proto', 'r') as f:
content = f.read()
# Find DeleteTaskRequest
import re
match = re.search(r'message DeleteTaskRequest \{([^}]+)\}', content, re.DOTALL)
if match:
print("=== DeleteTaskRequest ===")
print(match.group(0))
match = re.search(r'message TaskPageRequest \{([^}]+)\}', content, re.DOTALL)
if match:
print("\n=== TaskPageRequest ===")
print(match.group(0))
EOFRepository: voltaero/engine
Length of output: 219
Reject unknown TaskState values instead of coercing them.
prost's generated state() accessor returns the default variant (TaskState::Queued) for unknown discriminants, so a malformed request can silently delete from the wrong bucket. Use TaskState::try_from(data.state) on the raw i32 field and return invalid_argument on failure. Apply the same fix to the get_tasks method below.
💡 Suggested fix
- let result = match data.state() {
+ let state = TaskState::try_from(data.state)
+ .map_err(|_| Status::invalid_argument("unknown task state"))?;
+ let result = match state {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let result = match data.state() { | |
| TaskState::Processing => delete_task_from_collection( | |
| &mut api.executing_tasks.tasks, | |
| &id, | |
| &data.id, | |
| "Processing", | |
| &data.namespace, | |
| &data.task, | |
| |f| &f.id, | |
| ), | |
| TaskState::Solved => delete_task_from_collection( | |
| &mut api.solved_tasks.tasks, | |
| &id, | |
| &data.id, | |
| "Solved", | |
| &data.namespace, | |
| &data.task, | |
| |f| &f.id, | |
| ), | |
| TaskState::Queued => delete_task_from_collection( | |
| &mut api.task_queue.tasks, | |
| &id, | |
| &data.id, | |
| "Queued", | |
| &data.namespace, | |
| &data.task, | |
| |f| &f.id, | |
| ), | |
| }; | |
| let state = TaskState::try_from(data.state) | |
| .map_err(|_| Status::invalid_argument("unknown task state"))?; | |
| let result = match state { | |
| TaskState::Processing => delete_task_from_collection( | |
| &mut api.executing_tasks.tasks, | |
| &id, | |
| &data.id, | |
| "Processing", | |
| &data.namespace, | |
| &data.task, | |
| |f| &f.id, | |
| ), | |
| TaskState::Solved => delete_task_from_collection( | |
| &mut api.solved_tasks.tasks, | |
| &id, | |
| &data.id, | |
| "Solved", | |
| &data.namespace, | |
| &data.task, | |
| |f| &f.id, | |
| ), | |
| TaskState::Queued => delete_task_from_collection( | |
| &mut api.task_queue.tasks, | |
| &id, | |
| &data.id, | |
| "Queued", | |
| &data.namespace, | |
| &data.task, | |
| |f| &f.id, | |
| ), | |
| }; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/src/service/backend.rs` around lines 125 - 153, Replace the current
match on data.state() (which coerces unknown discriminants to TaskState::Queued)
with a fallible conversion using TaskState::try_from(data.state) and, on Err,
return an invalid_argument gRPC error; then match the Ok(TaskState::...) to call
delete_task_from_collection for Processing/Solved/Queued as before. Apply the
same change in get_tasks so it also uses TaskState::try_from(data.state) and
returns invalid_argument on conversion failure. Ensure you reference
TaskState::try_from, the raw data.state field, delete_task_from_collection, and
get_tasks when making the edits.
engine/src/service/backend.rs
Outdated
| info!("Task registry request received from user: {}", uid); | ||
| let mut api = self.EngineAPI.write().await; | ||
| let db = api.db.clone(); | ||
|
|
||
| debug!("Validating authentication for task registry request"); | ||
| if !Events::CheckAuth(&mut api, uid.clone(), challenge, db) { | ||
| info!( | ||
| "Task registry request denied - invalid authentication for user: {}", | ||
| uid | ||
| ); | ||
| return Err(Status::permission_denied("Invalid authentication")); |
There was a problem hiding this comment.
Avoid emitting raw uid values in info logs.
These new log lines persist the caller's identifier on both successful and failed auth paths. If uid is a user identifier, that's durable PII in logs; prefer omitting it or logging a one-way hash/correlation ID instead. The same pattern repeats in the later acquire/publish paths.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/src/service/backend.rs` around lines 308 - 318, The info logs
currently emit raw uid (e.g., in the task registry auth path around
EngineAPI/Events::CheckAuth and the later acquire/publish paths); replace those
direct uid exposures by either omitting the uid from logs or logging a one-way
derived identifier (compute a secure hash like SHA-256 or HMAC of uid and log
the hex value) so the logs no longer contain raw PII, and apply the same change
consistently for the success and failure branches that reference uid (search for
occurrences around Events::CheckAuth, and the acquire/publish handlers) to
ensure all user identifiers in logs are redacted or replaced with the
hashed/correlation id.
engine/src/service/backend.rs
Outdated
| let ttask = map.remove(0); | ||
| let task_payload = ttask.bytes.clone(); | ||
| api.task_queue.tasks.insert(key.clone(), map); | ||
| match postcard::to_allocvec(&api.task_queue.clone()) { | ||
| Ok(store) => { | ||
| if let Err(e) = api.db.insert("tasks", store) { | ||
| return Err(Status::internal(format!("DB insert error: {}", e))); | ||
| } | ||
| } | ||
| Err(e) => { | ||
| return Err(Status::internal(format!("Serialization error: {}", e))); | ||
| } | ||
| } | ||
|
|
||
| let mut exec_tsks = api | ||
| .executing_tasks | ||
| .tasks | ||
| .get(&key) | ||
| .cloned() | ||
| .unwrap_or_default(); | ||
| exec_tsks.push(StoredExecutingTask { | ||
| bytes: task_payload.clone(), | ||
| user_id: uid, | ||
| given_at: Utc::now(), | ||
| id: ttask.id.clone(), | ||
| }); | ||
| api.executing_tasks.tasks.insert(key.clone(), exec_tsks); | ||
| match postcard::to_allocvec(&api.executing_tasks.clone()) { | ||
| Ok(store) => { | ||
| if let Err(e) = api.db.insert("executing_tasks", store) { | ||
| return Err(Status::internal(format!("DB insert error: {}", e))); | ||
| } | ||
| } | ||
| Err(e) => { | ||
| return Err(Status::internal(format!("Serialization error: {}", e))); | ||
| } | ||
| } |
There was a problem hiding this comment.
Persist the queue→processing move atomically.
This transition is split across two DB writes. If the second insert fails, the RPC returns an error after the task has already been removed from tasks, and the task can disappear from persistent state on restart. Please commit both keys together, or persist a single snapshot, before returning success. The same partial-update risk exists in publish_task.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/src/service/backend.rs` around lines 372 - 408, The current move from
task_queue to executing_tasks performs two separate DB inserts
(api.db.insert("tasks", ...) and api.db.insert("executing_tasks", ...)) so a
failure on the second leaves persistent state inconsistent; change this to
perform an atomic persistence for both states together by either (a) serializing
and inserting a single snapshot that contains both task_queue and
executing_tasks (e.g., create a combined struct or wrapper and call
postcard::to_allocvec once and api.db.insert once) or (b) use a DB
transaction/batch API if available to commit both keys in one operation; apply
the same change pattern to publish_task to avoid partial-update risk. Ensure you
update the code paths that reference task_queue, executing_tasks, and
api.db.insert so only the atomic write is used and proper error handling returns
before in-memory state is mutated if persistence fails.
| async fn broadcast_publish( | ||
| nodes: Vec<Arc<NodeState>>, | ||
| request: &Request<proto::Task>, | ||
| ) -> Result<Response<proto::Empty>, Status> { | ||
| let mut last_not_found = None; | ||
| for node in nodes { | ||
| match forward_publish(node, request).await { | ||
| Ok(response) => return Ok(response), | ||
| Err(status) if status.code() == Code::NotFound => { | ||
| last_not_found = Some(status); | ||
| } | ||
| Err(status) => return Err(status), | ||
| } | ||
| } | ||
|
|
||
| Err(last_not_found.unwrap_or_else(|| Status::not_found("Task not found"))) | ||
| } | ||
|
|
||
| async fn broadcast_delete( | ||
| nodes: Vec<Arc<NodeState>>, | ||
| request: &Request<proto::TaskSelector>, | ||
| ) -> Result<Response<proto::Empty>, Status> { | ||
| let mut last_not_found = None; | ||
| for node in nodes { | ||
| match forward_delete(node, request).await { | ||
| Ok(response) => return Ok(response), | ||
| Err(status) if status.code() == Code::NotFound => { | ||
| last_not_found = Some(status); | ||
| } | ||
| Err(status) => return Err(status), | ||
| } | ||
| } | ||
|
|
||
| Err(last_not_found.unwrap_or_else(|| Status::not_found("Task not found"))) | ||
| } |
There was a problem hiding this comment.
Keep legacy broadcast fallback searching past stale nodes.
Membership is only pruned periodically, so these loops can still see an unreachable candidate. Returning the first non-NotFound status makes legacy PublishTask and DeleteTask fail even when a later backend actually owns the task. Treat retryable transport failures as “keep searching” here too.
Suggested direction
- let mut last_not_found = None;
+ let mut last_retryable = None;
for node in nodes {
match forward_publish(node, request).await {
Ok(response) => return Ok(response),
- Err(status) if status.code() == Code::NotFound => {
- last_not_found = Some(status);
+ Err(status) if matches!(status.code(), Code::NotFound | Code::Unavailable) => {
+ last_retryable = Some(status);
}
Err(status) => return Err(status),
}
}
- Err(last_not_found.unwrap_or_else(|| Status::not_found("Task not found")))
+ Err(last_retryable.unwrap_or_else(|| Status::not_found("Task not found")))Apply the same change to broadcast_delete.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/src/service/proxy.rs` around lines 155 - 189, The loops in
broadcast_publish and broadcast_delete currently return immediately on any
non-NotFound Err(status), which causes failures when encountering transient
transport/unreachable nodes; update the match handling in both broadcast_publish
and broadcast_delete so that Err(status) with transient codes (e.g.,
Code::Unavailable, Code::Unknown, Code::DeadlineExceeded) are treated as
retryable — continue iterating to search other nodes — while still returning
immediately for truly non-retryable errors; keep the existing tracking of
last_not_found and the final unwrap_or_else not_found return if nothing
succeeds.
| fn select_create_candidate(mut candidates: Vec<Arc<NodeState>>) -> Option<Arc<NodeState>> { | ||
| if candidates.is_empty() { | ||
| return None; | ||
| } | ||
| let mut rng = thread_rng(); | ||
| candidates.shuffle(&mut rng); | ||
| if candidates.len() == 1 { | ||
| return candidates.into_iter().next(); | ||
| } | ||
|
|
||
| let left = candidates[0].clone(); | ||
| let right = candidates[1].clone(); | ||
| let left_load = left.in_flight_create.load(Ordering::SeqCst); | ||
| let right_load = right.in_flight_create.load(Ordering::SeqCst); | ||
| if left_load < right_load { | ||
| Some(left) | ||
| } else if right_load < left_load { | ||
| Some(right) | ||
| } else if left.node_id <= right.node_id { | ||
| Some(left) | ||
| } else { | ||
| Some(right) | ||
| } |
There was a problem hiding this comment.
Break equal-load create ties randomly.
When both sampled nodes have the same in_flight_create count, this always biases toward the lexicographically smaller node_id. Low-concurrency create traffic will skew hard toward one backend instead of distributing.
Suggested direction
- } else if left.node_id <= right.node_id {
- Some(left)
- } else {
- Some(right)
+ } else {
+ // `candidates` was shuffled above, so `left` is already a random tie-breaker.
+ Some(left)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn select_create_candidate(mut candidates: Vec<Arc<NodeState>>) -> Option<Arc<NodeState>> { | |
| if candidates.is_empty() { | |
| return None; | |
| } | |
| let mut rng = thread_rng(); | |
| candidates.shuffle(&mut rng); | |
| if candidates.len() == 1 { | |
| return candidates.into_iter().next(); | |
| } | |
| let left = candidates[0].clone(); | |
| let right = candidates[1].clone(); | |
| let left_load = left.in_flight_create.load(Ordering::SeqCst); | |
| let right_load = right.in_flight_create.load(Ordering::SeqCst); | |
| if left_load < right_load { | |
| Some(left) | |
| } else if right_load < left_load { | |
| Some(right) | |
| } else if left.node_id <= right.node_id { | |
| Some(left) | |
| } else { | |
| Some(right) | |
| } | |
| fn select_create_candidate(mut candidates: Vec<Arc<NodeState>>) -> Option<Arc<NodeState>> { | |
| if candidates.is_empty() { | |
| return None; | |
| } | |
| let mut rng = thread_rng(); | |
| candidates.shuffle(&mut rng); | |
| if candidates.len() == 1 { | |
| return candidates.into_iter().next(); | |
| } | |
| let left = candidates[0].clone(); | |
| let right = candidates[1].clone(); | |
| let left_load = left.in_flight_create.load(Ordering::SeqCst); | |
| let right_load = right.in_flight_create.load(Ordering::SeqCst); | |
| if left_load < right_load { | |
| Some(left) | |
| } else if right_load < left_load { | |
| Some(right) | |
| } else { | |
| // `candidates` was shuffled above, so `left` is already a random tie-breaker. | |
| Some(left) | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/src/service/proxy.rs` around lines 195 - 217, select_create_candidate
currently breaks ties by picking the lexicographically smaller node_id when
left.in_flight_create == right.in_flight_create, biasing traffic; change the
tie-breaker to pick randomly between left and right instead (use the existing
thread_rng() instance or generate a random boolean) so that when left_load ==
right_load you return Some(left) or Some(right) with 50/50 probability; keep
references to the same symbols (select_create_candidate, left, right,
in_flight_create, node_id) when making the change.
| candidates.shuffle(&mut thread_rng()); | ||
| let hops = min( | ||
| self.state.config.max_acquire_hops as usize, | ||
| candidates.len(), | ||
| ); | ||
| for node in candidates.into_iter().take(hops) { | ||
| match forward_acquire(node, &request).await { | ||
| Ok(response) => return Ok(response), | ||
| Err(status) if status.code() == Code::NotFound => continue, | ||
| Err(status) => return Err(status), | ||
| } | ||
| } | ||
|
|
||
| Err(Status::not_found("No queued tasks available")) |
There was a problem hiding this comment.
Retry AquireTask past transient backend failures.
With reap-only liveness, a just-dead node can still be selected for one more interval. Returning its transport error immediately makes workers miss queued work on later candidates.
Suggested direction
- Err(status) if status.code() == Code::NotFound => continue,
+ Err(status) if matches!(status.code(), Code::NotFound | Code::Unavailable) => {
+ continue;
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| candidates.shuffle(&mut thread_rng()); | |
| let hops = min( | |
| self.state.config.max_acquire_hops as usize, | |
| candidates.len(), | |
| ); | |
| for node in candidates.into_iter().take(hops) { | |
| match forward_acquire(node, &request).await { | |
| Ok(response) => return Ok(response), | |
| Err(status) if status.code() == Code::NotFound => continue, | |
| Err(status) => return Err(status), | |
| } | |
| } | |
| Err(Status::not_found("No queued tasks available")) | |
| candidates.shuffle(&mut thread_rng()); | |
| let hops = min( | |
| self.state.config.max_acquire_hops as usize, | |
| candidates.len(), | |
| ); | |
| for node in candidates.into_iter().take(hops) { | |
| match forward_acquire(node, &request).await { | |
| Ok(response) => return Ok(response), | |
| Err(status) if matches!(status.code(), Code::NotFound | Code::Unavailable) => { | |
| continue; | |
| } | |
| Err(status) => return Err(status), | |
| } | |
| } | |
| Err(Status::not_found("No queued tasks available")) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/src/service/proxy.rs` around lines 232 - 245, The loop currently
returns immediately on any Err(status) (except NotFound), which causes transient
transport failures (e.g., a just-dead node) to abort exploring other candidates;
change the logic in the for loop around forward_acquire(node, &request).await so
that: 1) you treat NotFound as before (continue), 2) treat transient transport
errors (e.g., Code::Unavailable, Code::Unknown, Code::DeadlineExceeded, etc.) as
transient and continue to the next candidate instead of returning immediately,
and 3) record the first non-NotFound, non-transient error (or the last one) and
after the loop return that error if present, otherwise return
Status::not_found("No queued tasks available"); reference forward_acquire,
request, Code::NotFound, and Status::not_found to locate and update the matching
logic.
| let requested_end = request | ||
| .get_ref() | ||
| .page | ||
| .saturating_add(1) | ||
| .saturating_mul(request.get_ref().page_size as u64); | ||
| let fanout_limit = min(requested_end, self.state.config.admin_fanout_limit as u64) as u32; |
There was a problem hiding this comment.
Don't silently truncate pages beyond admin_fanout_limit.
Once (page + 1) * page_size exceeds the limit, the proxy only fetches the first admin_fanout_limit tasks per node but still applies the original page offset. Later pages can therefore come back empty or partial while more data exists.
Suggested direction
let requested_end = request
.get_ref()
.page
.saturating_add(1)
.saturating_mul(request.get_ref().page_size as u64);
- let fanout_limit = min(requested_end, self.state.config.admin_fanout_limit as u64) as u32;
+ if requested_end > self.state.config.admin_fanout_limit as u64 {
+ return Err(Status::out_of_range("requested page exceeds admin_fanout_limit"));
+ }
+ let fanout_limit = requested_end
+ .try_into()
+ .map_err(|_| Status::invalid_argument("requested page is too large"))?;Also applies to: 371-381
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/src/service/proxy.rs` around lines 343 - 348, The code computes
requested_end and then silently clamps fanout_limit to admin_fanout_limit which
causes later pages to be offset but fetched only up to the clamp, yielding
empty/partial pages; modify the logic in proxy.rs so that you either (A) cap the
requested page number before computing offsets (compute max_pages =
admin_fanout_limit / page_size and reject or adjust request.page if request.page
>= max_pages), or (B) apply the admin_fanout_limit to the offset computation
(compute effective_start = min(request.page * page_size, admin_fanout_limit) and
effective_end = min((request.page+1)*page_size, admin_fanout_limit)) and use
those bounds when setting fanout_limit and when slicing results; update the code
around requested_end and fanout_limit (and the analogous block at 371-381) to
use one of these approaches so pages beyond admin_fanout_limit are handled
deterministically.
| - task instances can be distributed across multiple nodes, | ||
| - `PublishTask` and `DeleteTask` can route back to the owning node without sticky proxy state. | ||
|
|
||
| The design keeps backend storage shard-local. Each backend keeps its own sled DB and dynamically registers itself with one or more stateless proxies. |
There was a problem hiding this comment.
Align the summary with the single-proxy config model.
This says a backend registers with “one or more” proxies, but the config section below only exposes singular cluster_proxy_addr. As written, the RFC overstates the shipped behavior.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rfc/rfc1004.md` at line 12, Update the RFC summary to match the single-proxy
config model by changing the phrase "one or more stateless proxies" to reflect a
single proxy (e.g., "a stateless proxy" or "the stateless proxy") so it aligns
with the configuration key cluster_proxy_addr; ensure any other references to
backends registering with proxies (and terms like "sled DB", "backend", "proxy")
consistently use singular wording or explicitly note multi-proxy support if you
intend to change the config instead.
- Move `BackendEngineService` implementation from `service/backend.rs` into `bin/server.rs` - Switch task creation to `task_id::mint_task_instance_id` and keep task key parsing centralized - Update service module/proxy/tests to match the new backend service layout
proxybinary with routing, backend discovery, and stale-node reapingserverinto backend service module and add cluster client auto-registrationSummary by CodeRabbit
Release Notes
New Features
Chores