Skip to content

feat(ha): Control Plane HA#101

Open
jackthepunished wants to merge 13 commits intopoyrazK:mainfrom
jackthepunished:feat/control-plane-ha-15day
Open

feat(ha): Control Plane HA#101
jackthepunished wants to merge 13 commits intopoyrazK:mainfrom
jackthepunished:feat/control-plane-ha-15day

Conversation

@jackthepunished
Copy link
Copy Markdown
Contributor

@jackthepunished jackthepunished commented Mar 12, 2026

Summary

Implements comprehensive High Availability for the control plane to survive single-node failures without outages or duplicate destructive actions.

Changes

Phase 1-2: Topology & Leader Election

  • Fixed APP_ROLEROLE env var mismatch
  • Added Postgres advisory lock leader election (PgLeaderElector)
  • Created LeaderGuard wrapper for 11 singleton workers

Phase 3: Durable Job Execution

  • Redis Streams durable queue with consumer groups for exactly-once delivery
  • DB-backed execution ledger for idempotent job processing
  • Migrated workers (Provision, Cluster, Pipeline) to use durable queue + ledger
  • Added bounded concurrency (semaphores) to prevent resource exhaustion

Phase 4: Failure Isolation

  • Enhanced CircuitBreaker: half-open single-flight, synchronous callbacks, SuccessRequired
  • Bulkhead: semaphore-based concurrency limiter
  • Retry: exponential backoff with full jitter
  • Resilient Adapters: wrapped all 5 infrastructure backends (Compute, Network, Storage, DNS, LB)

Phase 5: Testing

  • 5 failure drills validating HA invariants
  • 5 release gates validating SLOs (<1ms fail-fast, bulkhead isolation, circuit recovery)

SLO Targets Met

  • ✅ Leader failover <30s
  • ✅ Zero singleton duplicate executions during failover
  • ✅ Zero job loss in crash tests (durable queue)
  • ✅ No API outage during single pod loss

Test Results

  • All 47 new tests pass
  • Build compiles cleanly with go vet

Files Changed

34 new files, 12 modified files across:

  • internal/core/ports/ - new interfaces
  • internal/platform/ - resilience primitives + wrappers
  • internal/repositories/ - implementations
  • internal/workers/ - HA-enabled workers
  • internal/drills/ - HA validation tests
  • cmd/api/ - wiring changes

Summary by CodeRabbit

  • New Features

    • Added role-based deployment modes (api, worker, all) for flexible scaling.
    • Implemented leader election for coordinated singleton worker execution across replicas.
    • Introduced durable task queue with idempotent execution and automatic stale job reclamation.
  • Improvements

    • Enhanced fault tolerance with circuit breaker and bulkhead resilience patterns on compute, storage, network, and DNS backends.
    • Added configurable timeouts and retry logic for improved failure recovery.

…rkers

- Add LeaderElector port interface (internal/core/ports/leader.go)
- Implement PgLeaderElector using pg_try_advisory_lock with 5s heartbeat
- Create LeaderGuard wrapper to ensure singleton workers run on exactly one node
- Wrap 11 singleton workers: LB, AutoScaling, Cron, Container, Accounting,
  Lifecycle, ReplicaMonitor, ClusterReconciler, Healing, DatabaseFailover, Log
- Add unit tests for leader election and guard behavior
- Extend TaskQueue port with DurableTaskQueue interface using Redis Streams
  with consumer groups for exactly-once delivery
- Implement Redis Streams durable queue with EnsureGroup, Receive, Ack, Nack,
  ReclaimStale methods
- Add ExecutionLedger port interface for idempotent job processing
- Implement PgExecutionLedger using job_executions table with ON CONFLICT DO NOTHING
- Integrate durable queue + ledger into ProvisionWorker, ClusterWorker,
  PipelineWorker with bounded concurrency
- Add migration 100_create_job_executions
- Add noop implementations for testing
…ry utilities

Circuit Breaker:
- Add half-open single-flight: only one probe request allowed at a time
- Add OnStateChange callback (synchronous) for observability
- Add SuccessRequired for multi-success half-open→closed transition
- Add Name and State.String() methods
- Backward compatible with existing NewCircuitBreaker(threshold, timeout)

Bulkhead:
- Add semaphore-based concurrency limiter with configurable wait timeout
- Returns ErrBulkheadFull when limit reached and timeout expires

Retry:
- Add exponential backoff with full jitter
- Configurable ShouldRetry predicate for selective retry
- Context-aware cancellation
…ackends

Add decorator wrappers implementing ports interfaces with resilience patterns:

- ResilientCompute: CB (5 fails/30s) + Bulkhead (20 conc) + Timeouts
- ResilientNetwork: CB (5 fails/30s) + Bulkhead (15 conc) + Timeout (30s)
- ResilientStorage: CB (5 fails/30s) + Bulkhead (10 conc) + Timeouts
- ResilientDNS: CB (5 fails/30s) + Timeout (10s) - no bulkhead needed
- ResilientLB: CB (5 fails/30s) + Timeouts (30s normal, 2m deploy)

Design:
- Ping() bypasses bulkhead (cheap health check) but uses CB
- Type() delegates directly (pure metadata)
- Retry NOT applied at adapter level (dangerous for provisioning)
- All wrappers have configurable options with sensible defaults
- SuccessRequired: 2 for half-open→closed (extra safety)

Add comprehensive tests for ResilientCompute (passthrough, circuit trip,
bulkhead limits, timeout, unwrap, ping bypass).
- Wrap all backends with resilient decorators in main.go:
  NewResilientCompute, NewResilientStorage, NewResilientNetwork, NewResilientLB
- Wrap DNS backend with resilient decorator in dependencies.go
- Create PgLeaderElector and wire into ServiceConfig
- Update ProvisionWorker, ClusterWorker, PipelineWorker to use:
  * DurableTaskQueue (Redis Streams with consumer groups)
  * ExecutionLedger for idempotent job processing
  * Bounded concurrency via semaphore (provision=20, cluster=10, pipeline=5)
- Update workers to use Receive/Ack/Nack pattern for exactly-once delivery
- Add role validation tests
HA Drills (ha_drills_test.go):
1. Circuit breaker trip and recovery (validates 3 state transitions)
2. Bulkhead saturation and graceful rejection
3. Resilient adapter end-to-end (CB + bulkhead + timeout compose)
4. Retry backoff and context cancellation
5. Half-open single-flight validation

Release Gates (release_gates_test.go) - validate SLOs:
1. Fail-fast latency <1ms when circuit is open
2. Bulkhead isolation (saturated compute doesn't affect network)
3. Circuit recovery within resetTimeout window
4. Retry idempotency (exactly MaxAttempts executions)
5. Independent circuit breakers don't interfere

Total: 13 new tests validating HA invariants.
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 12, 2026

Warning

Rate limit exceeded

@jackthepunished has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 5 minutes and 36 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 5 minutes and 36 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: edefb9d6-2f64-4e3a-bb48-24ca11e1868f

📥 Commits

Reviewing files that changed from the base of the PR and between 53a83db and 9cca272.

📒 Files selected for processing (11)
  • .github/workflows/benchmarks.yml
  • cmd/api/main.go
  • cmd/api/main_test.go
  • internal/api/setup/dependencies.go
  • internal/platform/bulkhead.go
  • internal/platform/circuit_breaker.go
  • internal/platform/retry.go
  • internal/platform/retry_test.go
  • internal/repositories/noop/adapters.go
  • internal/repositories/postgres/execution_ledger.go
  • internal/repositories/postgres/leader_elector.go
📝 Walkthrough

Walkthrough

This PR introduces a comprehensive resilience and distributed coordination architecture. It wraps backends (compute, storage, network, DNS, LB) with circuit breakers and bulkheads; adds PostgreSQL-backed leader election enabling singleton coordination; introduces Redis Streams-based durable task queues with at-least-once delivery; implements execution ledgers for idempotent job processing; refactors workers to integrate new patterns; and adds integration tests validating HA properties.

Changes

Cohort / File(s) Summary
Core Resilience Primitives
internal/platform/circuit_breaker.go, internal/platform/circuit_breaker_test.go, internal/platform/bulkhead.go, internal/platform/bulkhead_test.go, internal/platform/retry.go, internal/platform/retry_test.go
Implementation of resilience patterns: circuit breaker with configurable thresholds, state transitions (closed → open → half-open), and optional callbacks; bulkhead for concurrency limiting with slots and timeouts; retry with exponential backoff and jitter. Includes comprehensive unit tests validating state transitions, error handling, and timing constraints.
Resilient Backend Adapters
internal/platform/resilient_compute.go, internal/platform/resilient_compute_test.go, internal/platform/resilient_storage.go, internal/platform/resilient_network.go, internal/platform/resilient_dns.go, internal/platform/resilient_lb.go
Decorator pattern wrapping raw backends with bulkhead, circuit breaker, and per-call timeouts. Each adapter applies resilience layers (bulkhead → circuit breaker → timeout) to operations. Compute, storage, and network include both short and long operation timeouts; DNS and LB focus on circuit breaker protection. Includes mock-based tests validating passthrough behavior, failure handling, and state transitions.
Distributed Coordination & Leadership
internal/workers/leader_guard.go, internal/workers/leader_guard_test.go, internal/repositories/postgres/leader_elector.go, internal/repositories/postgres/leader_elector_test.go
Leader election mechanism using PostgreSQL advisory locks for singleton worker coordination. LeaderGuard wrapper gates inner worker execution behind leadership acquisition and heartbeat verification. Includes restart logic on leadership loss. Tests validate single-flight behavior, blocking on non-leader state, and graceful shutdown.
Durable Task Queue & Execution Ledger Interfaces
internal/core/ports/task_queue.go, internal/core/ports/execution_ledger.go, internal/core/ports/leader.go
New port interfaces for distributed patterns: DurableTaskQueue extending TaskQueue with group management, receive/ack/nack/reclaim semantics; ExecutionLedger for idempotent job tracking with acquire, complete, failed, and status queries; LeaderElector for distributed leader election.
PostgreSQL Implementations
internal/repositories/postgres/execution_ledger.go, internal/repositories/postgres/migrations/100_create_job_executions.up.sql, internal/repositories/postgres/migrations/100_create_job_executions.down.sql
PostgreSQL-backed execution ledger implementing idempotency via job_executions table with status tracking (running/completed/failed), stale work reclamation, and optimistic locking. Includes up/down migrations creating schema with status constraints and running-job index.
Redis Durable Queue Implementation
internal/repositories/redis/durable_task_queue.go, internal/repositories/redis/durable_task_queue_test.go
Redis Streams-based durable queue with at-least-once delivery semantics. Implements consumer groups, receive with group tracking, ack/nack, and ReclaimStale for idle messages. Configurable dead-letter queue routing for exhausted messages. Includes comprehensive tests verifying group management, message receipt, acknowledgment flows, and multi-consumer distribution.
Worker Refactoring
internal/workers/cluster_worker.go, internal/workers/cluster_worker_test.go, internal/workers/pipeline_worker.go, internal/workers/provision_worker.go, internal/workers/provision_worker_test.go
Major refactoring replacing simple queue dequeue with durable queue receive/ack/nack flow. All workers now accept ExecutionLedger for idempotency, initialize consumer groups, spawn reclaimLoop for stale message recovery, and enforce concurrency limits via semaphores. Job processing now wraps around TryAcquire checks and ledger state marking (MarkComplete/MarkFailed). Updated constructors and test doubles to support new signatures.
No-Op Repository Implementations
internal/repositories/noop/adapters.go
Extended noop repository with NoopExecutionLedger (always grants ownership, returns empty status) and expanded NoopTaskQueue with DurableTaskQueue methods (group/receive/ack/nack/reclaim). Used for configurations where durability is not required.
Service Setup & Configuration
internal/api/setup/dependencies.go, cmd/api/main.go, cmd/api/main_test.go
Application initialization now wraps raw backends with resilience decorators (circuit breaker, bulkhead, timeout); creates PostgreSQL-backed LeaderElector and passes via ServiceConfig; updates role handling from APP_ROLE to ROLE with validation (api/worker/all); gates HTTP server creation by role; adds nil-safety in shutdown path. Introduces Workers struct typing with Runner interface for singleton workers. Includes three new tests validating role-based startup behavior and HTTP server lifecycle.
Testing & Drills
internal/drills/ha_drills_test.go, internal/drills/release_gates_test.go
Integration-like resilience drills validating circuit breaker trip/recovery, bulkhead saturation/rejection, combined resilience adapter behavior, retry with backoff, and half-open single-flight. Release gate tests verify fail-fast latency, isolation, recovery, idempotent retries, and concurrent breaker behavior. Exercises state transitions, error types, timing constraints, and backoff semantics.

Sequence Diagram(s)

sequenceDiagram
    participant Worker as Worker<br/>(Pipeline/Cluster/Provision)
    participant Queue as DurableTaskQueue<br/>(Redis)
    participant Ledger as ExecutionLedger<br/>(PostgreSQL)
    participant Backend as Compute/Storage<br/>Backend

    Note over Worker,Backend: Worker Startup
    Worker->>Queue: EnsureGroup(queueName, groupName)
    Queue->>Queue: Create consumer group if not exists

    Note over Worker,Backend: Main Processing Loop
    Worker->>Queue: Receive(queueName, groupName, consumerName)
    Queue-->>Worker: DurableMessage {ID, Payload}
    
    Worker->>Ledger: TryAcquire(jobKey, staleThreshold)
    alt Already Running or Completed
        Ledger-->>Worker: false (duplicate/in-progress)
        Worker->>Queue: Nack(messageID)
    else New Job
        Ledger-->>Worker: true (ownership granted)
        Worker->>Backend: Execute job
        alt Backend Success
            Backend-->>Worker: success
            Worker->>Ledger: MarkComplete(jobKey, result)
            Worker->>Queue: Ack(messageID)
        else Backend Failure
            Backend-->>Worker: error
            Worker->>Ledger: MarkFailed(jobKey, reason)
            Worker->>Queue: Nack(messageID)
        end
    end

    Note over Worker,Backend: Reclaim Stale Messages
    Worker->>Queue: ReclaimStale(groupName, minIdleMs)
    Queue-->>Worker: [DurableMessage...] (idle messages)
    loop For each reclaimed message
        Worker->>Ledger: TryAcquire(jobKey, staleThreshold)
        alt Can Reclaim
            Worker->>Backend: Execute job
            Worker->>Ledger: MarkComplete/MarkFailed
            Worker->>Queue: Ack(messageID)
        else Cannot Reclaim
            Worker->>Queue: Nack(messageID)
        end
    end
Loading
sequenceDiagram
    participant Client as Caller
    participant ResilientBackend as ResilientAdapter<br/>(Compute/Storage/etc)
    participant Bulkhead as Bulkhead<br/>(Concurrency)
    participant CircuitBreaker as CircuitBreaker<br/>(State Machine)
    participant InnerBackend as Raw Backend

    Client->>ResilientBackend: Operation(ctx)
    activate ResilientBackend
    
    ResilientBackend->>Bulkhead: Execute(fn)
    activate Bulkhead
    alt Slot Available
        Bulkhead->>CircuitBreaker: Execute(fn)
        activate CircuitBreaker
        
        alt Circuit Closed
            CircuitBreaker->>InnerBackend: call(ctx with timeout)
            activate InnerBackend
            alt Success
                InnerBackend-->>CircuitBreaker: result
                CircuitBreaker->>CircuitBreaker: recordSuccess()
            else Failure
                InnerBackend-->>CircuitBreaker: error
                CircuitBreaker->>CircuitBreaker: recordFailure()
                alt Threshold Exceeded
                    CircuitBreaker->>CircuitBreaker: transition to Open
                end
            end
            deactivate InnerBackend
        else Circuit Open
            CircuitBreaker-->>CircuitBreaker: ErrCircuitOpen
            CircuitBreaker->>CircuitBreaker: fail-fast
        else Circuit Half-Open
            CircuitBreaker->>InnerBackend: probe call
            alt Probe Success
                CircuitBreaker->>CircuitBreaker: transition to Closed
            else Probe Failure
                CircuitBreaker->>CircuitBreaker: transition to Open
            end
        end
        
        CircuitBreaker-->>Bulkhead: result/error
        deactivate CircuitBreaker
        Bulkhead->>Bulkhead: release slot
    else No Slots
        Bulkhead-->>Bulkhead: ErrBulkheadFull
    end
    deactivate Bulkhead
    
    ResilientBackend-->>Client: result/error
    deactivate ResilientBackend
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Poem

🐰 A resilient warren springs to life,
With locks and bulkheads, circuits bright,
Leadership shared through the night,
Durable messages survive the strife,
No job lost, no task forgotten—
The system hops on, forever determined! 🌟

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 38.89% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(ha): Control Plane HA' clearly describes the main objective—adding High Availability features to the control plane. It uses a conventional commit format (feat) and is concise, accurately reflecting the primary change across the PR.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 20

🧹 Nitpick comments (12)
cmd/api/main_test.go (1)

170-259: These role permutations should be one table-driven test.

The setup and expectations are repeated across three near-identical cases, which makes future role changes easy to update inconsistently. Collapsing them into a table would keep the role matrix readable and cheaper to extend.

As per coding guidelines, **/*_test.go: Use table-driven tests in test files.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/api/main_test.go` around lines 170 - 259, Three nearly identical tests
(TestRunApplicationWorkerRoleDoesNotStartHTTP,
TestRunApplicationDefaultsToAllRole, TestRunApplicationInvalidRoleReturnsEarly)
should be collapsed into one table-driven test that iterates role cases and
expectations; create TestRunApplicationRoles that defines a slice of cases with
fields like name, role, expectStartCalled (or expectNoHTTP), and
expectedSignalBehavior, then for each case use t.Run to set t.Setenv("ROLE",
tc.role), create deps := DefaultDeps(), override deps.NewHTTPServer,
deps.StartHTTPServer, deps.ShutdownHTTPServer, and deps.NotifySignals according
to the case expectations (closing channels or calling t.Fatalf when unexpected),
call runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(),
&setup.Workers{}), and assert the expected outcome (e.g., server shutdown
channel closed or no HTTP functions invoked); remove the three separate tests
after migrating their logic into the table-driven cases so future role
permutations are easy to extend and maintain.
internal/platform/resilient_compute_test.go (1)

19-113: Prefer testify/mock over a hand-written ComputeBackend fake.

This helper now mirrors almost the whole ports.ComputeBackend surface, so every interface change will force broad mechanical edits in test code. Using testify/mock keeps each case focused on the methods it actually exercises.

As per coding guidelines, **/*_test.go: Use testify/mock for creating mock objects in tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/platform/resilient_compute_test.go` around lines 19 - 113, The test
defines a large hand-written fake type mockCompute (and helper wait/delay logic
and callCount) that mirrors ports.ComputeBackend; change it to use testify/mock
by creating a MockCompute struct embedding mock.Mock, replace implementations
like LaunchInstanceWithOptions, StartInstance, StopInstance, GetInstanceLogs,
Ping, etc., with simple wrappers that call m.Called(...) and return values from
that call, remove the atomic callCount and wait/delay functions, and in tests
set expectations with mock.On(...).Return(...) (or use .Run(...) to simulate
delays when needed) so only the methods exercised in each test are specified and
maintained via testify/mock.
internal/workers/pipeline_worker.go (1)

144-148: Silent ledger failures reduce observability.

Same pattern as provision_worker - ledger errors are discarded. Consider logging these failures.

As per coding guidelines: "Do not use silent failures - avoid blank identifier assignment like _ = someFunc()"

Proposed fix
 		if w.ledger != nil {
-			_ = w.ledger.MarkFailed(workerCtx, jobKey, "transient load error")
+			if ledgerErr := w.ledger.MarkFailed(workerCtx, jobKey, "transient load error"); ledgerErr != nil {
+				w.logger.Warn("failed to mark job failed in ledger",
+					"build_id", job.BuildID, "error", ledgerErr)
+			}
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/workers/pipeline_worker.go` around lines 144 - 148, The code
silently ignores errors returned from w.ledger.MarkFailed and w.taskQueue.Nack
in the pipeline worker; change this to log failures instead of assigning to the
blank identifier: capture the returned error from w.ledger.MarkFailed(workerCtx,
jobKey, "transient load error") and, if non-nil, call the appropriate logger
(e.g., w.log or processLogger) with context including jobKey and the error; do
the same for w.taskQueue.Nack(workerCtx, pipelineQueueName, pipelineGroup,
msg.ID) so both failures are surfaced for observability.
internal/workers/provision_worker.go (2)

177-180: Same silent failure pattern for MarkComplete.

Apply the same logging pattern here.

Proposed fix
 	// Mark completed in ledger (prevents duplicate execution).
 	if w.ledger != nil {
-		_ = w.ledger.MarkComplete(workerCtx, jobKey, "ok")
+		if ledgerErr := w.ledger.MarkComplete(workerCtx, jobKey, "ok"); ledgerErr != nil {
+			w.logger.Warn("failed to mark job complete in ledger",
+				"instance_id", job.InstanceID, "error", ledgerErr)
+		}
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/workers/provision_worker.go` around lines 177 - 180, The call to
w.ledger.MarkComplete in provision_worker.go currently ignores errors; update
the block in the function where MarkComplete is invoked (the call using
w.ledger.MarkComplete(workerCtx, jobKey, "ok")) to capture its returned error
and log it using the existing logger (e.g., w.logger or processLogger) instead
of discarding it; ensure you check w.ledger != nil first, call err :=
w.ledger.MarkComplete(...), and on err != nil call the logger's error method
with a clear message and the err, preserving the original "ok" status value.

163-169: Silent ledger failures reduce observability.

When MarkFailed or MarkComplete fail (lines 165, 179), the errors are discarded. This could mask ledger connectivity issues. Consider at least logging these errors.

As per coding guidelines: "Do not use silent failures - avoid blank identifier assignment like _ = someFunc()"

Proposed fix
 		// Mark failed in the ledger so it can be retried.
 		if w.ledger != nil {
-			_ = w.ledger.MarkFailed(workerCtx, jobKey, err.Error())
+			if ledgerErr := w.ledger.MarkFailed(workerCtx, jobKey, err.Error()); ledgerErr != nil {
+				w.logger.Warn("failed to mark job failed in ledger",
+					"instance_id", job.InstanceID, "error", ledgerErr)
+			}
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/workers/provision_worker.go` around lines 163 - 169, The code
currently discards errors returned by w.ledger.MarkFailed and
w.ledger.MarkComplete; change these to check the returned error and log it
(e.g., using the worker's logger) so ledger connectivity/operation failures are
visible. Specifically, replace the blank-identifier assignments around
w.ledger.MarkFailed(workerCtx, jobKey, err.Error()) and
w.ledger.MarkComplete(workerCtx, jobKey) with error checks (if err :=
w.ledger.MarkFailed(...); err != nil { w.logger.Errorf("ledger.MarkFailed
job=%s: %v", jobKey, err) }) and analogous logging for MarkComplete, keeping the
existing semantics for Nack/ack behavior. Ensure you reference w.ledger,
MarkFailed, MarkComplete, workerCtx and jobKey in the fix.
internal/drills/ha_drills_test.go (2)

131-136: Consider checking errors in test assertions instead of discarding.

Using _ = bh.Execute(...) silently discards the error. While this is test code deliberately saturating the bulkhead, the coding guidelines discourage blank identifier assignments. Consider asserting the return value or adding a brief comment explaining the intentional discard.

This pattern repeats at lines 224 and 355.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/drills/ha_drills_test.go` around lines 131 - 136, The test currently
discards errors from bh.Execute by assigning to the blank identifier (e.g., `_ =
bh.Execute(ctx, func() error { ... })`), which hides failures; update the test
assertions to capture and assert the returned error from bh.Execute (or
explicitly require it to be nil/expected error) at each occurrence (references:
bh.Execute, inFlight, blockCh) instead of discarding, or if the error is
intentionally ignored, add a short comment explaining why and replacing `_ =`
with a clear comment and/or a deliberate assertion like
`require.NoError`/`assert.NoError` to make the intent explicit; apply the same
change to the repeated occurrences noted around the other lines.

207-207: Remove unused logger variable.

The logger is assigned but immediately discarded with a comment placeholder. Either use it for actual logging in the test or remove it entirely.

Proposed fix
-	_ = logger // Would be used for real logging in production.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/drills/ha_drills_test.go` at line 207, The test contains an unused
variable named "logger" that is immediately discarded via "_ = logger"; remove
that no-op assignment or use "logger" in the test (e.g., pass it to the function
under test or emit a test log) so the variable is not unused. Locate the
declaration of "logger" in the ha_drills_test code and either delete the "_ =
logger" line or replace it with meaningful usage of the "logger" instance.
internal/platform/resilient_storage.go (1)

151-157: Hardcoded 5-second timeout for Ping.

Consider making this configurable via ResilientStorageOpts or extracting to a named constant for consistency with the coding guidelines regarding magic numbers.

Also note that Ping bypasses the bulkhead (goes directly through CB), which is appropriate for lightweight health checks that shouldn't compete for capacity.

Proposed fix
+const pingTimeout = 5 * time.Second
+
 func (r *ResilientStorage) Ping(ctx context.Context) error {
 	return r.cb.Execute(func() error {
-		ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
+		ctx2, cancel := context.WithTimeout(ctx, pingTimeout)
 		defer cancel()
 		return r.inner.Ping(ctx2)
 	})
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/platform/resilient_storage.go` around lines 151 - 157, The Ping
method in ResilientStorage uses a hardcoded 5-second timeout; add a configurable
timeout instead (either extract a named constant like DefaultPingTimeout =
5*time.Second or add a PingTimeout time.Duration field to ResilientStorageOpts
and initialize it when constructing ResilientStorage) and use that value in
Ping's context.WithTimeout call; keep the call path through r.cb.Execute and
r.inner.Ping unchanged so health checks still bypass bulkhead contention.
internal/workers/cluster_worker.go (2)

38-42: Handle os.Hostname() error instead of silently discarding it.

Using _ = os.Hostname() violates the coding guideline against silent failures. While the fallback is reasonable, the error should be logged for observability.

Proposed fix
 func NewClusterWorker(repo ports.ClusterRepository, provisioner ports.ClusterProvisioner, taskQueue ports.DurableTaskQueue, ledger ports.ExecutionLedger, logger *slog.Logger) *ClusterWorker {
-	hostname, _ := os.Hostname()
+	hostname, err := os.Hostname()
+	if err != nil {
+		logger.Warn("failed to get hostname, using fallback", "error", err)
+		hostname = "cluster-worker"
+	}
 	if hostname == "" {
 		hostname = "cluster-worker"
 	}

As per coding guidelines: "Do not use silent failures - avoid blank identifier assignment like _ = someFunc()"

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/workers/cluster_worker.go` around lines 38 - 42, In
NewClusterWorker, don’t ignore the error returned by os.Hostname(); capture both
hostname and err, and if err is non-nil log it using the provided logger (logger
or passed-in *slog.Logger) before falling back to the default "cluster-worker"
value so you preserve observability while keeping the existing fallback
behavior; update the hostname assignment in NewClusterWorker accordingly and
reference os.Hostname(), hostname, err and logger in your change.

76-79: Extract retry backoff duration as a named constant.

The hardcoded 1 * time.Second sleep duration should be a named constant for consistency with the other timing constants defined at the top of the file.

Proposed fix

Add to constants block:

 const (
 	clusterQueue          = "k8s_jobs"
 	clusterGroup          = "cluster_workers"
 	clusterMaxWorkers     = 10
 	clusterReclaimMs      = 5 * 60 * 1000 // 5 minutes
 	clusterReclaimN       = 10
 	clusterStaleThreshold = 15 * time.Minute
+	clusterReceiveBackoff = 1 * time.Second
 )

Then use it:

-			time.Sleep(1 * time.Second)
+			time.Sleep(clusterReceiveBackoff)

As per coding guidelines: "Do not use magic numbers - use named constants instead"

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/workers/cluster_worker.go` around lines 76 - 79, The hardcoded sleep
duration in the receive error path (time.Sleep(1 * time.Second)) is a magic
number; add a named constant (e.g., clusterReceiveBackoff or
receiveRetryBackoff) to the top constants block in this file and replace the
literal with that constant where the worker logs the error and calls time.Sleep
(locate the code by the w.logger.Error("failed to receive cluster job", ...) /
time.Sleep call). Ensure the new constant follows existing naming and units
convention and is used consistently for this retry backoff.
internal/platform/resilient_compute.go (1)

263-269: Extract Ping timeout as a configurable option or named constant.

The hardcoded 5*time.Second timeout for Ping is inconsistent with the configurable timeouts for other operations. Consider adding a PingTimeout field to ResilientComputeOpts or defining a package-level constant.

Proposed fix (option A: add to opts)

Add to ResilientComputeOpts:

 type ResilientComputeOpts struct {
 	// ...existing fields...
+	// PingTimeout is the timeout for health check operations. Default: 5s.
+	PingTimeout time.Duration
 }

Add to withDefaults:

+	if o.PingTimeout <= 0 {
+		o.PingTimeout = 5 * time.Second
+	}

Then use:

 func (r *ResilientCompute) Ping(ctx context.Context) error {
 	return r.cb.Execute(func() error {
-		ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
+		ctx2, cancel := context.WithTimeout(ctx, r.opts.PingTimeout)
 		defer cancel()
 		return r.inner.Ping(ctx2)
 	})
 }

As per coding guidelines: "Do not use magic numbers - use named constants instead"

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/platform/resilient_compute.go` around lines 263 - 269, The Ping
method currently uses a hardcoded 5*time.Second timeout; add a configurable Ping
timeout to avoid magic numbers by updating ResilientComputeOpts to include a
PingTimeout (time.Duration) and set a default in withDefaults (or alternatively
declare a package-level constant PingTimeout = 5*time.Second), then change
ResilientCompute.Ping to use that configured value (e.g., ctx2, cancel :=
context.WithTimeout(ctx, opts.PingTimeout) or the package constant) so Ping
respects configuration and removes the magic number.
internal/platform/resilient_network.go (1)

196-202: Extract Ping timeout as configurable or use a constant (same as resilient_compute.go).

For consistency across resilient wrappers, the Ping timeout should be configurable. Consider adding PingTimeout to ResilientNetworkOpts.

Proposed fix

Add to ResilientNetworkOpts:

 type ResilientNetworkOpts struct {
 	CallTimeout     time.Duration // Per-call timeout. Default: 30s.
 	CBThreshold     int           // Failures to trip. Default: 5.
 	CBResetTimeout  time.Duration // Open→half-open wait. Default: 30s.
 	BulkheadMaxConc int           // Max concurrent calls. Default: 15.
 	BulkheadWait    time.Duration // Bulkhead slot wait. Default: 10s.
+	PingTimeout     time.Duration // Health check timeout. Default: 5s.
 }

Add to withDefaults:

+	if o.PingTimeout <= 0 {
+		o.PingTimeout = 5 * time.Second
+	}

Then use:

 func (r *ResilientNetwork) Ping(ctx context.Context) error {
 	return r.cb.Execute(func() error {
-		ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
+		ctx2, cancel := context.WithTimeout(ctx, r.opts.PingTimeout)
 		defer cancel()
 		return r.inner.Ping(ctx2)
 	})
 }

As per coding guidelines: "Do not use magic numbers - use named constants instead"

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/platform/resilient_network.go` around lines 196 - 202, The Ping
method currently uses a hard-coded 5s timeout; make this configurable like in
resilient_compute.go by adding a PingTimeout field to ResilientNetworkOpts,
defaulting it in withDefaults (use a named constant for the default), and then
replace the literal 5*time.Second in ResilientNetwork.Ping with the configured
opts.PingTimeout (or the default constant) when creating ctx2 via
context.WithTimeout; reference ResilientNetwork, ResilientNetworkOpts,
withDefaults, and Ping in your changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/api/main_test.go`:
- Around line 170-198: The test currently can pass if runApplication returns
immediately; change TestRunApplicationWorkerRoleDoesNotStartHTTP so it launches
runApplication(deps, ...) in a goroutine and use a done channel (or
sync.WaitGroup) to detect completion, then have NotifySignals send SIGTERM after
a short sleep and assert that runApplication does not return before the SIGTERM
(fail the test if the done channel is closed too early) and that it does return
after the SIGTERM; keep the existing
NewHTTPServer/StartHTTPServer/ShutdownHTTPServer t.Fatalf hooks and reference
runApplication, DefaultDeps, and NotifySignals to implement the blocking/timeout
behavior.

In `@cmd/api/main.go`:
- Around line 174-183: The ROLE validation currently logs and returns from
runApplication but does not surface an error, so run() still returns nil and the
process exits successfully; change the behavior in the ROLE check inside
runApplication (the block that reads os.Getenv("ROLE") and consults validRoles)
to return a non-nil error (e.g., fmt.Errorf or errors.New) when the role is
invalid instead of just logger.Error and return, so that run() receives and
propagates the error and the process exits with non-zero status; update callers
of runApplication/run as needed to propagate that error (ensure run() returns
the error up to main).

In `@internal/platform/bulkhead_test.go`:
- Around line 22-45: The test currently only asserts maxSeen <= 2 which allows a
broken serial implementation to pass; update the test around the goroutine loop
that calls bh.Execute to include a start barrier (e.g., a sync.WaitGroup or
channel named start or startWG) so all worker goroutines wait to begin the
critical section simultaneously, then release the barrier so they contend
concurrently; after wg.Wait() assert that maxSeen.Load() == int32(2) (or at
least >= 2) to ensure the bulkhead actually allows two concurrent slots,
referencing the existing goroutine body that uses running.Add(1)/Add(-1),
maxSeen, and bh.Execute to locate where to insert the barrier and the tightened
assertion.
- Around line 106-117: The test is racing on slot release by sleeping; instead
wait for the holder goroutine to finish and check Execute's error instead of
discarding it. Change the anonymous goroutine that calls bh.Execute to send its
returned error on a channel (or use a sync.WaitGroup) and in the main test
receive/Wait for that completion before asserting bh.Available(); reference the
bh.Execute call inside the goroutine and the started/done channels so you
explicitly close done, wait for the goroutine completion result, assert the
error is nil, then assert bh.Available().

In `@internal/platform/bulkhead.go`:
- Around line 52-71: The acquire method can still send to b.sem after ctx is
already canceled because the send case may be chosen; to fix it, first check ctx
by doing a non-blocking select on ctx.Done() (return ErrBulkheadFull if closed)
before attempting to send, then proceed with the existing timed or untimed send
logic (the subsequent select should still include <-ctx.Done() and timer case as
before) so that Bulkhead.acquire (and callers like Execute) never acquires a
slot for an already canceled context.

In `@internal/platform/circuit_breaker_test.go`:
- Around line 110-128: The test currently uses time.Sleep to wait for the probe
goroutine to finish which is racy; instead, wait deterministically for the
probe's Execute to complete and for the circuit's state transition before
asserting. Replace the sleep after closing probeDone by capturing the probe
Execute result (e.g., signal a done channel or use a sync.WaitGroup inside the
goroutine that calls cb.Execute) and block on that signal; similarly replace
other sleeps (lines noted in the comment) by awaiting the Execute completion or
a state-change notification from cb.GetState()/transition counter so the
assertions for ErrCircuitOpen and StateClosed happen only after the probe has
actually finished.

In `@internal/platform/circuit_breaker.go`:
- Around line 188-199: transitionLocked currently invokes cb.onStateChange while
holding cb.mu; change it to capture the current state transition and the
onStateChange callback under the lock, set cb.state under the lock, then release
the lock and invoke the callback (if non-nil) outside the lock. In other words,
inside transitionLocked (on type CircuitBreaker) read from := cb.state, set
cb.state = to, store cb.onStateChange and cb.name into locals while holding
cb.mu, then return early if from==to; finally call the saved onStateChange(name,
from, to) after unlocking so the callback runs without cb.mu held.

In `@internal/platform/retry.go`:
- Around line 81-95: The backoffDelay implementation currently enforces a lower
bound of base/2 which is not full jitter; change it to return a uniform random
duration in [0, cap] where cap = min(max, base * mult^attempt) (i.e., the
existing calculated value after capping). In function backoffDelay replace the
floor/jitter logic with: compute cap := calculated (after capping), ensure cap >
0, then return time.Duration(rand.Int63n(int64(cap)+1)); keep the existing
math.Pow-based calculated and the cap logic that sets calculated = max when
appropriate and remove the base/2 floor behavior so the function matches the
documented full-jitter strategy.

In `@internal/repositories/postgres/execution_ledger.go`:
- Around line 69-79: The current ledger allows superseded attempts to overwrite
active runs because terminal updates only filter by job_key (and the failed
retry path returns true without verifying a row was updated); propagate an
attempt/fencing token (or the acquired started_at) from TryAcquire into the
subsequent retry and terminal methods (e.g., MarkComplete, MarkFailed) and
include it in the UPDATE WHERE clause so only the matching attempt can update
the row; also ensure the retry/failure paths check tag.RowsAffected() and return
false when zero rows were updated. Apply this change to the identified update
sites covering the ranges around TryAcquire, MarkComplete, MarkFailed (and the
similar UPDATEs at the other noted ranges).

In `@internal/repositories/postgres/leader_elector.go`:
- Around line 23-37: Current implementation uses the pooled DB interface for
session-scoped advisory locks, causing ownership to be lost across
Acquire/Heartbeat/Release; update PgLeaderElector so each held key is pinned to
a dedicated session connection for the full lifecycle: modify Acquire to obtain
and store a dedicated *pgx.Conn (or session-aware connection) per key in the
elector (e.g., extend the held map or add a conn map keyed by lock name), use
that same connection when running Heartbeat and Release, and ensure Release
closes/releases that pinned connection; update Heartbeat and Release to
reference the stored per-key connection instead of using the pooled DB interface
so pg_try_advisory_lock/pg_advisory_unlock execute on the same backend session.

In `@internal/repositories/postgres/migrations/100_create_job_executions.up.sql`:
- Around line 2-11: MarkComplete and MarkFailed currently update job_executions
rows using only job_key and can overwrite a job reclaimed by another worker;
update both methods to accept the acquired started_at timestamp (the same
optimistic-lock value used by TryAcquire) and add it to the WHERE clause (i.e.,
WHERE job_key = $1 AND started_at = $acquiredStartedAt) so only the current
lease holder can change status on the job_executions row.

In `@internal/repositories/redis/durable_task_queue.go`:
- Around line 86-90: Dequeue currently calls q.client.XDel(ctx, queueName,
msg.ID) and ignores its result, returning payload even if deletion fails; change
this so you capture the XDel result and error (e.g., res, err :=
q.client.XDel(...).Result()), check the error and the deleted count, and only
return payload when XDel succeeded—otherwise return an appropriate error; update
the Dequeue path that references msg, queueName, q.client.XDel to avoid using
the blank identifier and to correctly propagate XDel failures instead of
silently treating the message as consumed.
- Around line 134-135: The Ack method on durableTaskQueue currently only calls
XAck which marks the message as processed but leaves the stream entry; update
Ack (method durableTaskQueue.Ack) to also remove the entry after successful XAck
by calling XDel with the same queueName and messageID (or use XACKDEL if
go-redis supports it) so acknowledged messages are deleted from the stream;
ensure you handle and return errors from XDel (and only call XDel after XAck
succeeds) similar to how deadLetter and Dequeue perform cleanup.
- Around line 167-169: ReclaimStale currently swallows errors from deadLetter
and deadLetter uses a non-atomic Pipeline() to run XADD + XACK + XDEL, causing
possible partial effects; modify deadLetter to perform the DLQ XADD then
conditionally XACK+XDEL in a single atomic operation (either via a Redis EVAL
Lua script that does XADD then XACK+XDEL only on XADD success or by using
MULTI/EXEC with WATCH as appropriate) so the three ops cannot partially apply,
and update ReclaimStale to return/propagate the error from deadLetter instead of
assigning it to the blank identifier; locate the changes in the deadLetter
implementation and its caller ReclaimStale to ensure errors are handled and
atomicity is enforced for XADD, XACK, XDEL.

In `@internal/workers/cluster_worker.go`:
- Line 128: Replace the new background context with the propagated worker
context so cancellations and timeouts flow through; instead of calling
appcontext.WithUserID(context.Background(), job.UserID) use the existing
workerCtx as the parent (i.e., call appcontext.WithUserID(workerCtx,
job.UserID)) so in-flight operations are cancelled when the worker shuts down
and context propagation is preserved for functions like those invoked after
creating ctx.
- Around line 112-126: The code currently discards errors from ledger and
taskQueue calls (e.g., TryAcquire, MarkFailed, MarkComplete, Ack, Nack) using
the blank identifier; update these call sites throughout processJob,
handleProvision, handleDeprovision, handleUpgrade, and reclaimLoop to check
returned errors and log them via w.logger (include context such as "cluster_id"
job.ClusterID, "msg_id" msg.ID, operation name and the error). For each
operation (ledger.TryAcquire, ledger.MarkFailed/MarkComplete,
taskQueue.Ack/Nack, etc.) replace `_ = ...` with error handling that logs a
descriptive message and the error, and only suppress or handle the error after
logging so failures are visible for debugging. Ensure the logs provide operation
names and relevant identifiers to make tracing across ledger and queue actions
straightforward.
- Around line 152-159: The switch over job.Type (checking
domain.ClusterJobProvision/Deprovision/Upgrade) lacks a default branch so
unknown job types leave processErr nil and ack the job; add a default case in
the switch that sets processErr to a descriptive error (or wraps fmt.Errorf)
indicating an unknown/unsupported job type (include job.Type and job.ID or
cluster.ID in the message) and optionally log it before returning/acknowledging,
so the caller handling processErr (and methods like w.handleProvision,
w.handleDeprovision, w.handleUpgrade) will treat unexpected types as failures.

In `@internal/workers/leader_guard_test.go`:
- Around line 75-76: Replace the brittle time.Sleep calls in
leader_guard_test.go with event-driven synchronization: add ready and stopped
signaling channels to the mock runner used in the tests (e.g., the mock runner
struct used around the code at the lines with time.Sleep), have the mock runner
send on ready when the inner worker goroutine has started and send on stopped
when it exits, then in the test wait on those channels (using a select with a
reasonable timeout to avoid hanging) instead of sleeping; do the same for the
second occurrence around lines 172–173 so tests wait for explicit
readiness/shutdown signals rather than fixed sleeps.

In `@internal/workers/provision_worker_test.go`:
- Around line 161-163: Replace the fragile time.Sleep-based waits in the test
with a deterministic synchronization on the fake queue's acknowledgement
signals: instead of sleeping for 200ms then calling cancel(), wait for the
fakeDurableQueue to emit the expected ack/nack (or a done channel/event exposed
by fakeDurableQueue) before calling cancel(); do the same replacement for the
other occurrence around lines 209-210 so the test waits on the observable
ack/nack from fakeDurableQueue rather than on a wall-clock sleep.

---

Nitpick comments:
In `@cmd/api/main_test.go`:
- Around line 170-259: Three nearly identical tests
(TestRunApplicationWorkerRoleDoesNotStartHTTP,
TestRunApplicationDefaultsToAllRole, TestRunApplicationInvalidRoleReturnsEarly)
should be collapsed into one table-driven test that iterates role cases and
expectations; create TestRunApplicationRoles that defines a slice of cases with
fields like name, role, expectStartCalled (or expectNoHTTP), and
expectedSignalBehavior, then for each case use t.Run to set t.Setenv("ROLE",
tc.role), create deps := DefaultDeps(), override deps.NewHTTPServer,
deps.StartHTTPServer, deps.ShutdownHTTPServer, and deps.NotifySignals according
to the case expectations (closing channels or calling t.Fatalf when unexpected),
call runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(),
&setup.Workers{}), and assert the expected outcome (e.g., server shutdown
channel closed or no HTTP functions invoked); remove the three separate tests
after migrating their logic into the table-driven cases so future role
permutations are easy to extend and maintain.

In `@internal/drills/ha_drills_test.go`:
- Around line 131-136: The test currently discards errors from bh.Execute by
assigning to the blank identifier (e.g., `_ = bh.Execute(ctx, func() error { ...
})`), which hides failures; update the test assertions to capture and assert the
returned error from bh.Execute (or explicitly require it to be nil/expected
error) at each occurrence (references: bh.Execute, inFlight, blockCh) instead of
discarding, or if the error is intentionally ignored, add a short comment
explaining why and replacing `_ =` with a clear comment and/or a deliberate
assertion like `require.NoError`/`assert.NoError` to make the intent explicit;
apply the same change to the repeated occurrences noted around the other lines.
- Line 207: The test contains an unused variable named "logger" that is
immediately discarded via "_ = logger"; remove that no-op assignment or use
"logger" in the test (e.g., pass it to the function under test or emit a test
log) so the variable is not unused. Locate the declaration of "logger" in the
ha_drills_test code and either delete the "_ = logger" line or replace it with
meaningful usage of the "logger" instance.

In `@internal/platform/resilient_compute_test.go`:
- Around line 19-113: The test defines a large hand-written fake type
mockCompute (and helper wait/delay logic and callCount) that mirrors
ports.ComputeBackend; change it to use testify/mock by creating a MockCompute
struct embedding mock.Mock, replace implementations like
LaunchInstanceWithOptions, StartInstance, StopInstance, GetInstanceLogs, Ping,
etc., with simple wrappers that call m.Called(...) and return values from that
call, remove the atomic callCount and wait/delay functions, and in tests set
expectations with mock.On(...).Return(...) (or use .Run(...) to simulate delays
when needed) so only the methods exercised in each test are specified and
maintained via testify/mock.

In `@internal/platform/resilient_compute.go`:
- Around line 263-269: The Ping method currently uses a hardcoded 5*time.Second
timeout; add a configurable Ping timeout to avoid magic numbers by updating
ResilientComputeOpts to include a PingTimeout (time.Duration) and set a default
in withDefaults (or alternatively declare a package-level constant PingTimeout =
5*time.Second), then change ResilientCompute.Ping to use that configured value
(e.g., ctx2, cancel := context.WithTimeout(ctx, opts.PingTimeout) or the package
constant) so Ping respects configuration and removes the magic number.

In `@internal/platform/resilient_network.go`:
- Around line 196-202: The Ping method currently uses a hard-coded 5s timeout;
make this configurable like in resilient_compute.go by adding a PingTimeout
field to ResilientNetworkOpts, defaulting it in withDefaults (use a named
constant for the default), and then replace the literal 5*time.Second in
ResilientNetwork.Ping with the configured opts.PingTimeout (or the default
constant) when creating ctx2 via context.WithTimeout; reference
ResilientNetwork, ResilientNetworkOpts, withDefaults, and Ping in your changes.

In `@internal/platform/resilient_storage.go`:
- Around line 151-157: The Ping method in ResilientStorage uses a hardcoded
5-second timeout; add a configurable timeout instead (either extract a named
constant like DefaultPingTimeout = 5*time.Second or add a PingTimeout
time.Duration field to ResilientStorageOpts and initialize it when constructing
ResilientStorage) and use that value in Ping's context.WithTimeout call; keep
the call path through r.cb.Execute and r.inner.Ping unchanged so health checks
still bypass bulkhead contention.

In `@internal/workers/cluster_worker.go`:
- Around line 38-42: In NewClusterWorker, don’t ignore the error returned by
os.Hostname(); capture both hostname and err, and if err is non-nil log it using
the provided logger (logger or passed-in *slog.Logger) before falling back to
the default "cluster-worker" value so you preserve observability while keeping
the existing fallback behavior; update the hostname assignment in
NewClusterWorker accordingly and reference os.Hostname(), hostname, err and
logger in your change.
- Around line 76-79: The hardcoded sleep duration in the receive error path
(time.Sleep(1 * time.Second)) is a magic number; add a named constant (e.g.,
clusterReceiveBackoff or receiveRetryBackoff) to the top constants block in this
file and replace the literal with that constant where the worker logs the error
and calls time.Sleep (locate the code by the w.logger.Error("failed to receive
cluster job", ...) / time.Sleep call). Ensure the new constant follows existing
naming and units convention and is used consistently for this retry backoff.

In `@internal/workers/pipeline_worker.go`:
- Around line 144-148: The code silently ignores errors returned from
w.ledger.MarkFailed and w.taskQueue.Nack in the pipeline worker; change this to
log failures instead of assigning to the blank identifier: capture the returned
error from w.ledger.MarkFailed(workerCtx, jobKey, "transient load error") and,
if non-nil, call the appropriate logger (e.g., w.log or processLogger) with
context including jobKey and the error; do the same for
w.taskQueue.Nack(workerCtx, pipelineQueueName, pipelineGroup, msg.ID) so both
failures are surfaced for observability.

In `@internal/workers/provision_worker.go`:
- Around line 177-180: The call to w.ledger.MarkComplete in provision_worker.go
currently ignores errors; update the block in the function where MarkComplete is
invoked (the call using w.ledger.MarkComplete(workerCtx, jobKey, "ok")) to
capture its returned error and log it using the existing logger (e.g., w.logger
or processLogger) instead of discarding it; ensure you check w.ledger != nil
first, call err := w.ledger.MarkComplete(...), and on err != nil call the
logger's error method with a clear message and the err, preserving the original
"ok" status value.
- Around line 163-169: The code currently discards errors returned by
w.ledger.MarkFailed and w.ledger.MarkComplete; change these to check the
returned error and log it (e.g., using the worker's logger) so ledger
connectivity/operation failures are visible. Specifically, replace the
blank-identifier assignments around w.ledger.MarkFailed(workerCtx, jobKey,
err.Error()) and w.ledger.MarkComplete(workerCtx, jobKey) with error checks (if
err := w.ledger.MarkFailed(...); err != nil { w.logger.Errorf("ledger.MarkFailed
job=%s: %v", jobKey, err) }) and analogous logging for MarkComplete, keeping the
existing semantics for Nack/ack behavior. Ensure you reference w.ledger,
MarkFailed, MarkComplete, workerCtx and jobKey in the fix.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2ea5fe97-01f5-4ee1-873c-d9f6d3f42ce5

📥 Commits

Reviewing files that changed from the base of the PR and between dc5f132 and 53a83db.

📒 Files selected for processing (35)
  • cmd/api/main.go
  • cmd/api/main_test.go
  • internal/api/setup/dependencies.go
  • internal/core/ports/execution_ledger.go
  • internal/core/ports/leader.go
  • internal/core/ports/task_queue.go
  • internal/drills/ha_drills_test.go
  • internal/drills/release_gates_test.go
  • internal/platform/bulkhead.go
  • internal/platform/bulkhead_test.go
  • internal/platform/circuit_breaker.go
  • internal/platform/circuit_breaker_test.go
  • internal/platform/resilient_compute.go
  • internal/platform/resilient_compute_test.go
  • internal/platform/resilient_dns.go
  • internal/platform/resilient_lb.go
  • internal/platform/resilient_network.go
  • internal/platform/resilient_storage.go
  • internal/platform/retry.go
  • internal/platform/retry_test.go
  • internal/repositories/noop/adapters.go
  • internal/repositories/postgres/execution_ledger.go
  • internal/repositories/postgres/leader_elector.go
  • internal/repositories/postgres/leader_elector_test.go
  • internal/repositories/postgres/migrations/100_create_job_executions.down.sql
  • internal/repositories/postgres/migrations/100_create_job_executions.up.sql
  • internal/repositories/redis/durable_task_queue.go
  • internal/repositories/redis/durable_task_queue_test.go
  • internal/workers/cluster_worker.go
  • internal/workers/cluster_worker_test.go
  • internal/workers/leader_guard.go
  • internal/workers/leader_guard_test.go
  • internal/workers/pipeline_worker.go
  • internal/workers/provision_worker.go
  • internal/workers/provision_worker_test.go

Comment thread cmd/api/main_test.go
Comment on lines +170 to +198
func TestRunApplicationWorkerRoleDoesNotStartHTTP(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
t.Setenv("ROLE", "worker")

deps := DefaultDeps()

deps.NewHTTPServer = func(string, http.Handler) *http.Server {
t.Fatalf("NewHTTPServer should not be called in worker-only mode")
return nil
}
deps.StartHTTPServer = func(*http.Server) error {
t.Fatalf("StartHTTPServer should not be called in worker-only mode")
return nil
}
deps.ShutdownHTTPServer = func(context.Context, *http.Server) error {
t.Fatalf("ShutdownHTTPServer should not be called in worker-only mode")
return nil
}
deps.NotifySignals = func(c chan<- os.Signal, _ ...os.Signal) {
go func() {
// Give workers a moment to start, then signal shutdown
time.Sleep(50 * time.Millisecond)
c <- syscall.SIGTERM
}()
}

runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
// If we reach here without t.Fatalf, the test passes — no HTTP server was touched.
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

This worker-role test still passes if the process exits immediately.

If runApplication returns right away in worker mode, this test still passes because the only assertions are that HTTP hooks were not called. Run it in a goroutine and assert it stays blocked until the SIGTERM path is exercised.

🧪 Assert that worker mode stays alive until shutdown
- runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
- // If we reach here without t.Fatalf, the test passes — no HTTP server was touched.
+ returned := make(chan struct{})
+ go func() {
+ 	defer close(returned)
+ 	runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
+ }()
+
+ select {
+ case <-returned:
+ 	t.Fatalf("runApplication returned before SIGTERM in worker mode")
+ case <-time.After(25 * time.Millisecond):
+ }
+
+ select {
+ case <-returned:
+ case <-time.After(time.Second):
+ 	t.Fatalf("expected worker mode to exit after SIGTERM")
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func TestRunApplicationWorkerRoleDoesNotStartHTTP(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
t.Setenv("ROLE", "worker")
deps := DefaultDeps()
deps.NewHTTPServer = func(string, http.Handler) *http.Server {
t.Fatalf("NewHTTPServer should not be called in worker-only mode")
return nil
}
deps.StartHTTPServer = func(*http.Server) error {
t.Fatalf("StartHTTPServer should not be called in worker-only mode")
return nil
}
deps.ShutdownHTTPServer = func(context.Context, *http.Server) error {
t.Fatalf("ShutdownHTTPServer should not be called in worker-only mode")
return nil
}
deps.NotifySignals = func(c chan<- os.Signal, _ ...os.Signal) {
go func() {
// Give workers a moment to start, then signal shutdown
time.Sleep(50 * time.Millisecond)
c <- syscall.SIGTERM
}()
}
runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
// If we reach here without t.Fatalf, the test passes — no HTTP server was touched.
}
func TestRunApplicationWorkerRoleDoesNotStartHTTP(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
t.Setenv("ROLE", "worker")
deps := DefaultDeps()
deps.NewHTTPServer = func(string, http.Handler) *http.Server {
t.Fatalf("NewHTTPServer should not be called in worker-only mode")
return nil
}
deps.StartHTTPServer = func(*http.Server) error {
t.Fatalf("StartHTTPServer should not be called in worker-only mode")
return nil
}
deps.ShutdownHTTPServer = func(context.Context, *http.Server) error {
t.Fatalf("ShutdownHTTPServer should not be called in worker-only mode")
return nil
}
deps.NotifySignals = func(c chan<- os.Signal, _ ...os.Signal) {
go func() {
// Give workers a moment to start, then signal shutdown
time.Sleep(50 * time.Millisecond)
c <- syscall.SIGTERM
}()
}
returned := make(chan struct{})
go func() {
defer close(returned)
runApplication(deps, &platform.Config{Port: "0"}, logger, gin.New(), &setup.Workers{})
}()
select {
case <-returned:
t.Fatalf("runApplication returned before SIGTERM in worker mode")
case <-time.After(25 * time.Millisecond):
}
select {
case <-returned:
case <-time.After(time.Second):
t.Fatalf("expected worker mode to exit after SIGTERM")
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/api/main_test.go` around lines 170 - 198, The test currently can pass if
runApplication returns immediately; change
TestRunApplicationWorkerRoleDoesNotStartHTTP so it launches runApplication(deps,
...) in a goroutine and use a done channel (or sync.WaitGroup) to detect
completion, then have NotifySignals send SIGTERM after a short sleep and assert
that runApplication does not return before the SIGTERM (fail the test if the
done channel is closed too early) and that it does return after the SIGTERM;
keep the existing NewHTTPServer/StartHTTPServer/ShutdownHTTPServer t.Fatalf
hooks and reference runApplication, DefaultDeps, and NotifySignals to implement
the blocking/timeout behavior.

Comment thread cmd/api/main.go
Comment on lines +22 to +45
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := bh.Execute(context.Background(), func() error {
cur := running.Add(1)
defer running.Add(-1)
// Track the max concurrent.
for {
old := maxSeen.Load()
if cur <= old || maxSeen.CompareAndSwap(old, cur) {
break
}
}
time.Sleep(50 * time.Millisecond)
return nil
})
assert.NoError(t, err)
}()
}

wg.Wait()
assert.LessOrEqual(t, maxSeen.Load(), int32(2))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

This won't catch a one-slot bulkhead.

Line 44 only verifies maxSeen <= 2, so an implementation that accidentally serializes everything to 1 slot still passes. Add a start barrier and assert the observed peak reaches 2.

🧪 Tighten the concurrency assertion
+ start := make(chan struct{})
  for i := 0; i < 4; i++ {
  	wg.Add(1)
  	go func() {
  		defer wg.Done()
+ 		<-start
  		err := bh.Execute(context.Background(), func() error {
  			cur := running.Add(1)
  			defer running.Add(-1)
  			// Track the max concurrent.
  			for {
  				old := maxSeen.Load()
  				if cur <= old || maxSeen.CompareAndSwap(old, cur) {
  					break
  				}
  			}
  			time.Sleep(50 * time.Millisecond)
  			return nil
  		})
  		assert.NoError(t, err)
  	}()
  }
+ close(start)

  wg.Wait()
- assert.LessOrEqual(t, maxSeen.Load(), int32(2))
+ assert.Equal(t, int32(2), maxSeen.Load())
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := bh.Execute(context.Background(), func() error {
cur := running.Add(1)
defer running.Add(-1)
// Track the max concurrent.
for {
old := maxSeen.Load()
if cur <= old || maxSeen.CompareAndSwap(old, cur) {
break
}
}
time.Sleep(50 * time.Millisecond)
return nil
})
assert.NoError(t, err)
}()
}
wg.Wait()
assert.LessOrEqual(t, maxSeen.Load(), int32(2))
}
start := make(chan struct{})
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-start
err := bh.Execute(context.Background(), func() error {
cur := running.Add(1)
defer running.Add(-1)
// Track the max concurrent.
for {
old := maxSeen.Load()
if cur <= old || maxSeen.CompareAndSwap(old, cur) {
break
}
}
time.Sleep(50 * time.Millisecond)
return nil
})
assert.NoError(t, err)
}()
}
close(start)
wg.Wait()
assert.Equal(t, int32(2), maxSeen.Load())
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/platform/bulkhead_test.go` around lines 22 - 45, The test currently
only asserts maxSeen <= 2 which allows a broken serial implementation to pass;
update the test around the goroutine loop that calls bh.Execute to include a
start barrier (e.g., a sync.WaitGroup or channel named start or startWG) so all
worker goroutines wait to begin the critical section simultaneously, then
release the barrier so they contend concurrently; after wg.Wait() assert that
maxSeen.Load() == int32(2) (or at least >= 2) to ensure the bulkhead actually
allows two concurrent slots, referencing the existing goroutine body that uses
running.Add(1)/Add(-1), maxSeen, and bh.Execute to locate where to insert the
barrier and the tightened assertion.

Comment on lines +106 to +117
go func() {
_ = bh.Execute(context.Background(), func() error {
close(started)
<-done
return nil
})
}()
<-started
assert.Equal(t, 2, bh.Available())
close(done)
time.Sleep(10 * time.Millisecond)
assert.Equal(t, 3, bh.Available())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Don't guess when the slot is released.

Line 116 sleeps for 10ms and then assumes the holder goroutine has exited. That makes the final Available() assertion timing-dependent, and the discarded Execute result can hide an unexpected failure in the goroutine.

🧪 Wait for the holder goroutine explicitly
+ finished := make(chan error, 1)
  go func() {
- 	_ = bh.Execute(context.Background(), func() error {
+ 	finished <- bh.Execute(context.Background(), func() error {
  		close(started)
  		<-done
  		return nil
  	})
  }()
  <-started
  assert.Equal(t, 2, bh.Available())
  close(done)
- time.Sleep(10 * time.Millisecond)
+ require.NoError(t, <-finished)
  assert.Equal(t, 3, bh.Available())
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
go func() {
_ = bh.Execute(context.Background(), func() error {
close(started)
<-done
return nil
})
}()
<-started
assert.Equal(t, 2, bh.Available())
close(done)
time.Sleep(10 * time.Millisecond)
assert.Equal(t, 3, bh.Available())
finished := make(chan error, 1)
go func() {
finished <- bh.Execute(context.Background(), func() error {
close(started)
<-done
return nil
})
}()
<-started
assert.Equal(t, 2, bh.Available())
close(done)
require.NoError(t, <-finished)
assert.Equal(t, 3, bh.Available())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/platform/bulkhead_test.go` around lines 106 - 117, The test is
racing on slot release by sleeping; instead wait for the holder goroutine to
finish and check Execute's error instead of discarding it. Change the anonymous
goroutine that calls bh.Execute to send its returned error on a channel (or use
a sync.WaitGroup) and in the main test receive/Wait for that completion before
asserting bh.Available(); reference the bh.Execute call inside the goroutine and
the started/done channels so you explicitly close done, wait for the goroutine
completion result, assert the error is nil, then assert bh.Available().

Comment thread internal/platform/bulkhead.go
Comment thread internal/workers/cluster_worker.go
Comment thread internal/workers/cluster_worker.go Outdated
Comment thread internal/workers/cluster_worker.go
Comment on lines +75 to +76
// Wait a bit for the inner worker to start
time.Sleep(100 * time.Millisecond)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Replace fixed sleeps with event-driven synchronization.

These assertions depend on the goroutines starting and stopping within 50–100ms, which is brittle on slower CI runners. Have the mock runner signal readiness/shutdown over a channel and wait on that instead of wall-clock sleeps.

Also applies to: 172-173

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/workers/leader_guard_test.go` around lines 75 - 76, Replace the
brittle time.Sleep calls in leader_guard_test.go with event-driven
synchronization: add ready and stopped signaling channels to the mock runner
used in the tests (e.g., the mock runner struct used around the code at the
lines with time.Sleep), have the mock runner send on ready when the inner worker
goroutine has started and send on stopped when it exits, then in the test wait
on those channels (using a select with a reasonable timeout to avoid hanging)
instead of sleeping; do the same for the second occurrence around lines 172–173
so tests wait for explicit readiness/shutdown signals rather than fixed sleeps.

Comment on lines +161 to 163
// Give worker time to process
time.Sleep(200 * time.Millisecond)
cancel()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid wall-clock sleeps in the worker tests.

These checks assume the queue round-trip finishes inside 50–200ms. On a busy runner that will flake even when the worker is correct. Wait on an observable ack/nack signal from fakeDurableQueue instead of sleeping for a fixed interval.

Also applies to: 209-210

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/workers/provision_worker_test.go` around lines 161 - 163, Replace
the fragile time.Sleep-based waits in the test with a deterministic
synchronization on the fake queue's acknowledgement signals: instead of sleeping
for 200ms then calling cancel(), wait for the fakeDurableQueue to emit the
expected ack/nack (or a done channel/event exposed by fakeDurableQueue) before
calling cancel(); do the same replacement for the other occurrence around lines
209-210 so the test waits on the observable ack/nack from fakeDurableQueue
rather than on a wall-clock sleep.

@poyrazK
Copy link
Copy Markdown
Owner

poyrazK commented Mar 12, 2026

Hi Bahadır , thank you for your help . Can you verify all ci tests are passing for review ?

@jackthepunished
Copy link
Copy Markdown
Contributor Author

Hi Bahadır , thank you for your help . Can you verify all ci tests are passing for review ?

I'll get onto it ASAP.

Copilot AI review requested due to automatic review settings April 14, 2026 11:43
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements control-plane High Availability primitives across the worker fleet (leader election for singletons, durable queues + idempotency ledger for parallel workers, and resilience wrappers for infrastructure backends), along with role-based startup wiring.

Changes:

  • Added Postgres advisory-lock leader election and a LeaderGuard wrapper for singleton workers.
  • Introduced a Redis Streams–backed durable task queue + Postgres execution ledger, and migrated Provision/Pipeline/Cluster workers to ack/nack + reclaim semantics.
  • Added resilience primitives (circuit breaker, bulkhead, retry) and wrapped infrastructure backends; updated API startup wiring to support ROLE=api|worker|all.

Reviewed changes

Copilot reviewed 35 out of 35 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
internal/workers/provision_worker.go Migrates provision worker to durable queue + execution ledger + bounded concurrency + reclaim loop.
internal/workers/provision_worker_test.go Updates provision worker tests for durable queue ack/nack behavior.
internal/workers/pipeline_worker.go Migrates pipeline worker to durable queue + execution ledger + bounded concurrency + reclaim loop.
internal/workers/cluster_worker.go Migrates cluster worker to durable queue + execution ledger + bounded concurrency + reclaim loop.
internal/workers/cluster_worker_test.go Updates cluster worker tests for durable queue ack/nack usage.
internal/workers/leader_guard.go Adds LeaderGuard wrapper to run singleton workers only under leadership.
internal/workers/leader_guard_test.go Tests LeaderGuard leader/non-leader and restart/shutdown behavior.
internal/repositories/redis/durable_task_queue.go Implements ports.DurableTaskQueue via Redis Streams consumer groups + reclaim + DLQ.
internal/repositories/redis/durable_task_queue_test.go Adds tests for enqueue/receive/ack/nack/reclaim and legacy dequeue compatibility.
internal/repositories/postgres/leader_elector.go Adds Postgres advisory-lock based leader elector implementation.
internal/repositories/postgres/leader_elector_test.go Tests deterministic/positive/unique key-to-lock hashing.
internal/repositories/postgres/execution_ledger.go Adds Postgres execution ledger implementing idempotent job execution tracking.
internal/repositories/postgres/migrations/100_create_job_executions.up.sql Adds job_executions table for execution ledger.
internal/repositories/postgres/migrations/100_create_job_executions.down.sql Drops job_executions table on rollback.
internal/core/ports/task_queue.go Introduces DurableTaskQueue and DurableMessage, deprecates TaskQueue.Dequeue for parallel workers.
internal/core/ports/leader.go Adds ports.LeaderElector interface.
internal/core/ports/execution_ledger.go Adds ports.ExecutionLedger interface.
internal/repositories/noop/adapters.go Extends no-op task queue to implement DurableTaskQueue; adds no-op execution ledger.
internal/platform/circuit_breaker.go Enhances circuit breaker with options, half-open single-flight, and state-change hooks.
internal/platform/circuit_breaker_test.go Adds/updates tests for circuit breaker single-flight, transitions, and options.
internal/platform/bulkhead.go Adds semaphore-based bulkhead concurrency limiter.
internal/platform/bulkhead_test.go Adds tests validating bulkhead concurrency, rejection, and context behavior.
internal/platform/retry.go Adds exponential backoff retry with full jitter and optional retry predicate.
internal/platform/retry_test.go Adds tests for retry success/failure/backoff/context/predicate behavior.
internal/platform/resilient_compute.go Adds resilient compute adapter wrapper (CB + bulkhead + timeouts + unwrap).
internal/platform/resilient_compute_test.go Adds tests validating resilient compute passthrough, CB trip, bulkhead, timeout, unwrap, ping behavior.
internal/platform/resilient_storage.go Adds resilient storage adapter wrapper (CB + bulkhead + timeouts + unwrap).
internal/platform/resilient_network.go Adds resilient network adapter wrapper (CB + bulkhead + timeouts + unwrap).
internal/platform/resilient_dns.go Adds resilient DNS adapter wrapper (CB + timeouts).
internal/platform/resilient_lb.go Adds resilient LB proxy adapter wrapper (CB + timeouts).
internal/drills/ha_drills_test.go Adds HA drills validating resilience primitives (CB/bulkhead/retry) invariants.
internal/drills/release_gates_test.go Adds CI-style release gates for latency/isolation/recovery/idempotency invariants.
internal/api/setup/dependencies.go Wires durable queue + ledger into services/workers; wraps singleton workers with LeaderGuard when elector is enabled.
cmd/api/main.go Wraps backends with resilient adapters, instantiates leader elector, switches to ROLE env var, and makes runApplication return errors.
cmd/api/main_test.go Adds tests for ROLE handling and updates existing test for runApplication error return.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +50 to +73
func (e *PgLeaderElector) Acquire(ctx context.Context, key string) (bool, error) {
lockID := keyToLockID(key)
var acquired bool
err := e.db.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired)
if err != nil {
return false, fmt.Errorf("leader election acquire failed for key %q: %w", key, err)
}

e.mu.Lock()
if acquired {
e.held[key] = true
}
e.mu.Unlock()

return acquired, nil
}

// Release explicitly releases the advisory lock for the given key.
func (e *PgLeaderElector) Release(ctx context.Context, key string) error {
lockID := keyToLockID(key)
_, err := e.db.Exec(ctx, "SELECT pg_advisory_unlock($1)", lockID)
if err != nil {
return fmt.Errorf("leader election release failed for key %q: %w", key, err)
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pg advisory locks are session-scoped, but this elector uses the generic DB interface (often backed by pgxpool/DualDB) for Acquire/Release. With a pool, Acquire/Release/heartbeat can run on different connections, which can (a) fail to release the lock, leaving it stuck until the pool closes, and (b) break the guarantee that leadership is tied to a single session. Consider pinning a dedicated connection for the duration of leadership (e.g., acquire a *pgxpool.Conn and store it per key while leader) and run all lock ops on that same connection.

Copilot uses AI. Check for mistakes.
Comment on lines +149 to +185
// heartbeat periodically checks that we still hold the advisory lock.
// If the lock is lost (e.g., DB connection reset), it cancels the fn context.
func (e *PgLeaderElector) heartbeat(ctx context.Context, key string, cancel context.CancelFunc) {
ticker := time.NewTicker(leaderRenewInterval)
defer ticker.Stop()

lockID := keyToLockID(key)

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Check if we still hold the lock by trying to acquire it again.
// pg_try_advisory_lock is re-entrant: if we already hold it, it returns true
// and increments the lock count. We immediately unlock the extra acquisition.
var stillHeld bool
err := e.db.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&stillHeld)
if err != nil {
e.logger.Error("heartbeat check failed, assuming leadership lost", "key", key, "error", err)
cancel()
return
}
if stillHeld {
// We re-acquired (re-entrant), so unlock the extra lock count
if _, unlockErr := e.db.Exec(ctx, "SELECT pg_advisory_unlock($1)", lockID); unlockErr != nil {
e.logger.Error("failed to release re-entrant heartbeat lock",
"key", key, "error", unlockErr)
cancel()
return
}
} else {
// We lost the lock
e.logger.Error("leadership lost", "key", key)
cancel()
return
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heartbeat check uses pg_try_advisory_lock to infer whether leadership is still held, but pg_try_advisory_lock also returns true when the lock is not currently held and happens to be free. In that case this code would acquire+unlock and incorrectly treat leadership as still held, so it won't cancel fnCtx even though the leader lock isn't actually maintained. A more reliable approach is to keep a dedicated connection that holds the lock for the entire leadership period; then heartbeat can simply run a lightweight query/ping on that same connection and cancel on error (or omit heartbeat and rely on the connection lifecycle).

Suggested change
// heartbeat periodically checks that we still hold the advisory lock.
// If the lock is lost (e.g., DB connection reset), it cancels the fn context.
func (e *PgLeaderElector) heartbeat(ctx context.Context, key string, cancel context.CancelFunc) {
ticker := time.NewTicker(leaderRenewInterval)
defer ticker.Stop()
lockID := keyToLockID(key)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Check if we still hold the lock by trying to acquire it again.
// pg_try_advisory_lock is re-entrant: if we already hold it, it returns true
// and increments the lock count. We immediately unlock the extra acquisition.
var stillHeld bool
err := e.db.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&stillHeld)
if err != nil {
e.logger.Error("heartbeat check failed, assuming leadership lost", "key", key, "error", err)
cancel()
return
}
if stillHeld {
// We re-acquired (re-entrant), so unlock the extra lock count
if _, unlockErr := e.db.Exec(ctx, "SELECT pg_advisory_unlock($1)", lockID); unlockErr != nil {
e.logger.Error("failed to release re-entrant heartbeat lock",
"key", key, "error", unlockErr)
cancel()
return
}
} else {
// We lost the lock
e.logger.Error("leadership lost", "key", key)
cancel()
return
}
// heartbeat periodically checks database liveness while leadership is active.
// It must not use pg_try_advisory_lock to infer lock ownership, because that can
// succeed when the lock is merely free and incorrectly report leadership as held.
// The robust design is to hold the advisory lock on a dedicated connection for the
// full leadership period and run this check on that same connection.
func (e *PgLeaderElector) heartbeat(ctx context.Context, key string, cancel context.CancelFunc) {
ticker := time.NewTicker(leaderRenewInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Perform a lightweight liveness query instead of attempting to re-acquire
// the advisory lock. Re-acquisition is not a safe ownership check because a
// free lock would also return success.
var ping int
if err := e.db.QueryRow(ctx, "SELECT 1").Scan(&ping); err != nil {
e.logger.Error("heartbeat check failed, assuming leadership lost", "key", key, "error", err)
cancel()
return
}

Copilot uses AI. Check for mistakes.
Comment on lines +125 to +129
if !acquired {
w.logger.Info("skipping duplicate cluster job",
"cluster_id", job.ClusterID, "type", job.Type, "msg_id", msg.ID)
w.ackWithLog(workerCtx, msg.ID, "duplicate cluster job")
return
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When TryAcquire returns false, this worker immediately ACKs the queue message as a duplicate. That can drop work in the crash-recovery path: ReclaimStale is configured to run after 5m, but clusterStaleThreshold is 15m, so a reclaimed message can be seen as "still running" (not yet stale) and will be ACKed here, permanently losing the job while the ledger row remains running. Instead, only ACK when the ledger indicates completed (or other terminal state), and otherwise leave the message pending / NACK (or align clusterReclaimMs to be > clusterStaleThreshold).

Copilot uses AI. Check for mistakes.
Comment on lines +21 to +30
const (
pipelineQueueName = "pipeline_build_queue"
pipelineGroup = "pipeline_workers"
pipelineMaxWorkers = 5
pipelineReclaimMs = 10 * 60 * 1000 // 10 minutes (builds are longer)
pipelineReclaimN = 5
// Stale threshold for idempotency ledger: builds can take up to 30 min,
// so a "running" entry older than this is considered abandoned.
pipelineStaleThreshold = 35 * time.Minute
)
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipelineReclaimMs (10m) is much shorter than pipelineStaleThreshold (35m). That means reclaimed messages from crashed consumers will be pulled and then skipped by the ledger check (status still "running"), leaving them pending and causing repeated reclaim attempts/log spam without making progress until the stale threshold elapses. Consider aligning reclaim timing with the ledger (e.g., set pipelineReclaimMs >= pipelineStaleThreshold or reduce the stale threshold) so a reclaimed message is actually eligible for takeover.

Copilot uses AI. Check for mistakes.
Copilot AI review requested due to automatic review settings April 14, 2026 13:11
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 37 out of 37 changed files in this pull request and generated 11 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +132 to +144
if !acquired {
// Check if it's already finished or just being processed by someone else.
status, _, _, getErr := w.ledger.GetStatus(workerCtx, jobKey)
if getErr == nil && status == "completed" {
w.logger.Info("skipping already completed provision job",
"instance_id", job.InstanceID, "msg_id", msg.ID)
w.ackWithLog(workerCtx, msg.ID, "provision already completed")
return
}
w.logger.Info("provision job is currently being processed by another worker",
"instance_id", job.InstanceID, "msg_id", msg.ID)
return // Leave unacked for redelivery/wait.
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When TryAcquire returns false and the ledger status isn't "completed", this returns without Ack/Nack. Since the message has already been delivered to this consumer group, leaving it unacked keeps it stuck in the PEL and it will not be delivered again until an idle-claim/reclaim cycle. If the intent is to drop duplicates while another worker is running, consider Acking here (or explicitly rechecking status then Ack) to avoid PEL buildup.

Copilot uses AI. Check for mistakes.
Comment on lines +218 to +223
m := m // capture loop variable
sem <- struct{}{}
go func() {
defer func() { <-sem }()
w.processJob(ctx, &m, job)
}()
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same shutdown risk as the main loop: sem <- struct{}{} can block forever inside reclaimLoop when the channel is full, preventing reclaimLoop from observing ctx.Done and stopping. Make semaphore acquisition ctx-aware (select with ctx.Done) so shutdown can't deadlock on a full semaphore.

Copilot uses AI. Check for mistakes.
Comment on lines +120 to +132
if !acquired {
// Check if it's already finished or just being processed by someone else.
status, _, _, getErr := w.ledger.GetStatus(workerCtx, jobKey)
if getErr == nil && status == "completed" {
w.logger.Info("skipping already completed pipeline job",
"build_id", job.BuildID, "msg_id", msg.ID)
w.ackWithLog(workerCtx, msg.ID, "pipeline already completed")
return
}
w.logger.Info("pipeline job is currently being processed by another worker",
"build_id", job.BuildID, "msg_id", msg.ID)
return // Leave unacked for redelivery/wait.
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When TryAcquire returns false and the ledger status isn't "completed", this returns without Ack/Nack. Because the message was already delivered to this consumer, it stays pending in the PEL and won't be re-delivered until a reclaim/idle timeout, which can create backlog and noisy reclaims. If this is a duplicate while another worker is running, consider Acking it (or polling ledger state briefly then Ack) to avoid stuck pending messages.

Copilot uses AI. Check for mistakes.
Comment on lines +414 to +419
m := m
sem <- struct{}{}
go func() {
defer func() { <-sem }()
w.processJob(ctx, &m, job)
}()
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same shutdown risk as the main loop: sem <- struct{}{} can block inside reclaimLoop when the semaphore is full, preventing the loop from exiting on ctx cancellation. Consider making semaphore acquisition ctx-aware to avoid shutdown deadlocks.

Copilot uses AI. Check for mistakes.
Comment on lines 104 to +108
sem <- struct{}{}
go func() {
go func(m *ports.DurableMessage, j domain.ClusterJob) {
defer func() { <-sem }()
w.processJob(job)
}()
w.processJob(ctx, m, j)
}(msg, job)
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This semaphore acquisition can block indefinitely when the channel is full, which can prevent clean shutdown if ctx is canceled while blocked on sem <- struct{}{}. Consider making semaphore acquisition ctx-aware (select on ctx.Done) (same pattern also appears in reclaimLoop).

Copilot uses AI. Check for mistakes.
Comment on lines +42 to +56
// Measure fail-fast latency over 100 calls.
const iterations = 100
start := time.Now()
for i := 0; i < iterations; i++ {
err := cb.Execute(func() error { return nil })
if !errors.Is(err, platform.ErrCircuitOpen) {
t.Fatalf("iteration %d: expected ErrCircuitOpen, got %v", i, err)
}
}
elapsed := time.Since(start)

avgLatency := elapsed / iterations
if avgLatency > 1*time.Millisecond {
t.Fatalf("average fail-fast latency %v exceeds 1ms SLO", avgLatency)
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timing gate asserts an average fail-fast latency <1ms on shared CI runners. That threshold is likely to be flaky under load/noisy neighbors and can cause non-deterministic CI failures unrelated to correctness. Consider loosening the threshold (e.g., 5–10ms), measuring percentiles, or gating these strict SLO tests behind an env flag / dedicated performance job rather than running on every CI go test ./internal/....

Copilot uses AI. Check for mistakes.
Comment on lines +20 to +55
// PgLeaderElector implements ports.LeaderElector using Postgres session-level advisory locks.
// Each leader key is hashed to a 64-bit integer used as the advisory lock ID.
// The lock is session-scoped: held as long as the DB connection is alive.
type PgLeaderElector struct {
db DB
logger *slog.Logger
mu sync.Mutex
held map[string]bool // tracks which keys this instance holds
}

// NewPgLeaderElector creates a leader elector backed by Postgres advisory locks.
func NewPgLeaderElector(db DB, logger *slog.Logger) *PgLeaderElector {
return &PgLeaderElector{
db: db,
logger: logger,
held: make(map[string]bool),
}
}

// keyToLockID deterministically maps a string key to a 64-bit advisory lock ID.
func keyToLockID(key string) int64 {
h := fnv.New64a()
_, _ = h.Write([]byte(key))
// Ensure positive value for pg advisory lock (avoids negative lock IDs).
return int64(h.Sum64() & 0x7FFFFFFFFFFFFFFF)
}

// Acquire attempts to acquire the advisory lock for the given key.
// Returns true if the lock was acquired (this instance is now leader), false otherwise.
// Uses pg_try_advisory_lock which is non-blocking.
func (e *PgLeaderElector) Acquire(ctx context.Context, key string) (bool, error) {
lockID := keyToLockID(key)
var acquired bool
err := e.db.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired)
if err != nil {
return false, fmt.Errorf("leader election acquire failed for key %q: %w", key, err)
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Postgres advisory locks are session-scoped, but this elector uses the generic postgres.DB interface (backed by *pgxpool.Pool / DualDB). Each QueryRow/Exec may run on a different pooled connection, so the lock/heartbeat can be executed on different sessions, allowing multiple instances to think they are leader (and Release may not release the held lock). Consider changing the elector to hold a dedicated connection (e.g., pool.Acquire) for the lifetime of leadership per key and run Acquire/heartbeat/Release on that same connection, releasing it when leadership ends.

Copilot uses AI. Check for mistakes.
Comment on lines +108 to +114
)

// Process job concurrently to handle high throughput in load tests
go w.processJob(job)
sem <- struct{}{} // acquire concurrency slot
go func(m *ports.DurableMessage, j domain.ProvisionJob) {
defer func() { <-sem }()
w.processJob(ctx, m, j)
}(msg, job)
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This semaphore acquisition can block indefinitely when the channel is full. If shutdown (ctx.Done) happens while blocked on sem <- struct{}{}, the worker can't exit cleanly and may hang shutdown. Use a ctx-aware acquire (select on ctx.Done) or a helper that returns early on cancellation (same pattern also appears in reclaimLoop).

Copilot uses AI. Check for mistakes.
Comment on lines +99 to +103
sem <- struct{}{}
go func(m *ports.DurableMessage, j domain.BuildJob) {
defer func() { <-sem }()
w.processJob(ctx, m, j)
}(msg, job)
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This semaphore acquisition can block indefinitely when the channel is full. If ctx is canceled while blocked on sem <- struct{}{}, the worker may hang during shutdown. Use a ctx-aware acquire (select on ctx.Done) (same pattern also appears in reclaimLoop).

Copilot uses AI. Check for mistakes.
Comment on lines +141 to +155
acked, err := q.client.XAck(ctx, queueName, groupName, messageID).Result()
if err != nil {
return fmt.Errorf("ack %s/%s/%s: %w", queueName, groupName, messageID, err)
}
if acked == 0 {
return fmt.Errorf("ack %s/%s/%s: message not pending", queueName, groupName, messageID)
}

deleted, delErr := q.client.XDel(ctx, queueName, messageID).Result()
if delErr != nil {
return fmt.Errorf("ack xdel %s/%s: %w", queueName, messageID, delErr)
}
if deleted == 0 {
return fmt.Errorf("ack xdel %s/%s: no message deleted", queueName, messageID)
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack currently returns an error when XACK reports 0 (not pending) or when XDEL reports 0 (already deleted/trimmed). In practice, Ack is often safer as an idempotent operation (especially with retries/reclaims), otherwise workers will log warnings for already-acked messages even though the desired state is achieved. Consider treating acked==0 and/or deleted==0 as success (or at least not as an error) to reduce noise and avoid surprising callers that treat Ack errors as fatal.

Suggested change
acked, err := q.client.XAck(ctx, queueName, groupName, messageID).Result()
if err != nil {
return fmt.Errorf("ack %s/%s/%s: %w", queueName, groupName, messageID, err)
}
if acked == 0 {
return fmt.Errorf("ack %s/%s/%s: message not pending", queueName, groupName, messageID)
}
deleted, delErr := q.client.XDel(ctx, queueName, messageID).Result()
if delErr != nil {
return fmt.Errorf("ack xdel %s/%s: %w", queueName, messageID, delErr)
}
if deleted == 0 {
return fmt.Errorf("ack xdel %s/%s: no message deleted", queueName, messageID)
}
_, err := q.client.XAck(ctx, queueName, groupName, messageID).Result()
if err != nil {
return fmt.Errorf("ack %s/%s/%s: %w", queueName, groupName, messageID, err)
}
_, delErr := q.client.XDel(ctx, queueName, messageID).Result()
if delErr != nil {
return fmt.Errorf("ack xdel %s/%s: %w", queueName, messageID, delErr)
}

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants