Skip to content

fix(ha): PR #101 HA implementation bug fixes#154

Open
poyrazK wants to merge 30 commits intomainfrom
pr-101-ha
Open

fix(ha): PR #101 HA implementation bug fixes#154
poyrazK wants to merge 30 commits intomainfrom
pr-101-ha

Conversation

@poyrazK
Copy link
Copy Markdown
Owner

@poyrazK poyrazK commented Apr 16, 2026

Summary

Bug fixes for PR #101 HA implementation:

Critical Fixes

  • Timer leak in retry.go: Add defer timer.Stop() to prevent timer leak on ctx cancellation
  • Inverted reclaim interval: Change provisionReclaimMs from 20min to 1min (was incorrectly > stale threshold)

High Fixes

  • Goroutine deadlock in leader_elector.go: Add recover() to heartbeat goroutine to prevent deadlock on panic
  • Orphaned container in pipeline_worker.go: Fail step if collectTaskLogs fails instead of just logging

Medium Fixes

  • Unbounded Redis key growth: Add 24h TTL to retry keys in durable_task_queue
  • DLQ message loss: Reorder deadLetter: XAdd to DLQ first, only ack/del if successful
  • Reclaim race in execution_ledger.go: Use RETURNING TRUE pattern to avoid race
  • Silent Update failures: Add warning logs when repo.Update fails in cluster handlers

Features

  • Resilience metrics: Add Prometheus metrics for circuit breaker transitions/blocked requests and bulkhead concurrency/rejections
  • Retry integration: Integrate retry wrapper into all resilient wrappers with configurable attempts and backoff

Test Plan

  • go test ./internal/platform/...
  • go test ./internal/workers/...
  • go test ./internal/repositories/redis/...
  • go test ./internal/repositories/postgres/...

jackthepunished and others added 19 commits March 10, 2026 05:39
…rkers

- Add LeaderElector port interface (internal/core/ports/leader.go)
- Implement PgLeaderElector using pg_try_advisory_lock with 5s heartbeat
- Create LeaderGuard wrapper to ensure singleton workers run on exactly one node
- Wrap 11 singleton workers: LB, AutoScaling, Cron, Container, Accounting,
  Lifecycle, ReplicaMonitor, ClusterReconciler, Healing, DatabaseFailover, Log
- Add unit tests for leader election and guard behavior
- Extend TaskQueue port with DurableTaskQueue interface using Redis Streams
  with consumer groups for exactly-once delivery
- Implement Redis Streams durable queue with EnsureGroup, Receive, Ack, Nack,
  ReclaimStale methods
- Add ExecutionLedger port interface for idempotent job processing
- Implement PgExecutionLedger using job_executions table with ON CONFLICT DO NOTHING
- Integrate durable queue + ledger into ProvisionWorker, ClusterWorker,
  PipelineWorker with bounded concurrency
- Add migration 100_create_job_executions
- Add noop implementations for testing
…ry utilities

Circuit Breaker:
- Add half-open single-flight: only one probe request allowed at a time
- Add OnStateChange callback (synchronous) for observability
- Add SuccessRequired for multi-success half-open→closed transition
- Add Name and State.String() methods
- Backward compatible with existing NewCircuitBreaker(threshold, timeout)

Bulkhead:
- Add semaphore-based concurrency limiter with configurable wait timeout
- Returns ErrBulkheadFull when limit reached and timeout expires

Retry:
- Add exponential backoff with full jitter
- Configurable ShouldRetry predicate for selective retry
- Context-aware cancellation
…ackends

Add decorator wrappers implementing ports interfaces with resilience patterns:

- ResilientCompute: CB (5 fails/30s) + Bulkhead (20 conc) + Timeouts
- ResilientNetwork: CB (5 fails/30s) + Bulkhead (15 conc) + Timeout (30s)
- ResilientStorage: CB (5 fails/30s) + Bulkhead (10 conc) + Timeouts
- ResilientDNS: CB (5 fails/30s) + Timeout (10s) - no bulkhead needed
- ResilientLB: CB (5 fails/30s) + Timeouts (30s normal, 2m deploy)

Design:
- Ping() bypasses bulkhead (cheap health check) but uses CB
- Type() delegates directly (pure metadata)
- Retry NOT applied at adapter level (dangerous for provisioning)
- All wrappers have configurable options with sensible defaults
- SuccessRequired: 2 for half-open→closed (extra safety)

Add comprehensive tests for ResilientCompute (passthrough, circuit trip,
bulkhead limits, timeout, unwrap, ping bypass).
- Wrap all backends with resilient decorators in main.go:
  NewResilientCompute, NewResilientStorage, NewResilientNetwork, NewResilientLB
- Wrap DNS backend with resilient decorator in dependencies.go
- Create PgLeaderElector and wire into ServiceConfig
- Update ProvisionWorker, ClusterWorker, PipelineWorker to use:
  * DurableTaskQueue (Redis Streams with consumer groups)
  * ExecutionLedger for idempotent job processing
  * Bounded concurrency via semaphore (provision=20, cluster=10, pipeline=5)
- Update workers to use Receive/Ack/Nack pattern for exactly-once delivery
- Add role validation tests
HA Drills (ha_drills_test.go):
1. Circuit breaker trip and recovery (validates 3 state transitions)
2. Bulkhead saturation and graceful rejection
3. Resilient adapter end-to-end (CB + bulkhead + timeout compose)
4. Retry backoff and context cancellation
5. Half-open single-flight validation

Release Gates (release_gates_test.go) - validate SLOs:
1. Fail-fast latency <1ms when circuit is open
2. Bulkhead isolation (saturated compute doesn't affect network)
3. Circuit recovery within resetTimeout window
4. Retry idempotency (exactly MaxAttempts executions)
5. Independent circuit breakers don't interfere

Total: 13 new tests validating HA invariants.
- Add defer timer.Stop() to prevent timer leak on ctx cancellation
- Change provisionReclaimMs from 20min to 1min (was > stale threshold, inverted)
Add recover() to heartbeat goroutine to prevent deadlock if panic occurs.
Also log panic details for observability.
Previously logsErr was only logged as warning but the step was marked
as succeeded. Now returns error so step is properly marked as failed.
- Add 24h TTL to retry keys to prevent unbounded memory growth
- Reorder deadLetter: XAdd to DLQ first, only ack/del if successful
- Add logger to durableTaskQueue for warning messages
- Add warning logs when repo.Update fails in cluster handlers
- Fix reclaim race: use RETURNING TRUE pattern instead of RowsAffected
Add Prometheus metrics for:
- circuit_breaker_transitions_total (state changes)
- circuit_breaker_blocked_total (rejected requests when open)
- bulkhead_concurrent_requests (current concurrency gauge)
- bulkhead_rejected_total (rejections due to saturation)
Copilot AI review requested due to automatic review settings April 16, 2026 11:27
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 16, 2026

Warning

Rate limit exceeded

@poyrazK has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 6 minutes and 32 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 6 minutes and 32 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1296886c-2e47-45dd-9da6-3c934422e47b

📥 Commits

Reviewing files that changed from the base of the PR and between 1632050 and 8f40729.

📒 Files selected for processing (42)
  • .github/workflows/benchmarks.yml
  • cmd/api/main.go
  • cmd/api/main_test.go
  • internal/api/setup/dependencies.go
  • internal/core/ports/execution_ledger.go
  • internal/core/ports/leader.go
  • internal/core/ports/task_queue.go
  • internal/core/services/vpc.go
  • internal/drills/ha_drills_test.go
  • internal/drills/release_gates_test.go
  • internal/platform/bulkhead.go
  • internal/platform/bulkhead_test.go
  • internal/platform/circuit_breaker.go
  • internal/platform/circuit_breaker_test.go
  • internal/platform/resilience_metrics.go
  • internal/platform/resilient_compute.go
  • internal/platform/resilient_compute_test.go
  • internal/platform/resilient_dns.go
  • internal/platform/resilient_lb.go
  • internal/platform/resilient_network.go
  • internal/platform/resilient_storage.go
  • internal/platform/retry.go
  • internal/platform/retry_test.go
  • internal/repositories/noop/adapters.go
  • internal/repositories/postgres/execution_ledger.go
  • internal/repositories/postgres/leader_elector.go
  • internal/repositories/postgres/leader_elector_test.go
  • internal/repositories/postgres/migrations/100_create_job_executions.down.sql
  • internal/repositories/postgres/migrations/100_create_job_executions.up.sql
  • internal/repositories/redis/durable_task_queue.go
  • internal/repositories/redis/durable_task_queue_test.go
  • internal/workers/cluster_worker.go
  • internal/workers/cluster_worker_test.go
  • internal/workers/leader_guard.go
  • internal/workers/leader_guard_test.go
  • internal/workers/pipeline_worker.go
  • internal/workers/pipeline_worker_test.go
  • internal/workers/provision_worker.go
  • internal/workers/provision_worker_test.go
  • tests/autoscaling_e2e_test.go
  • tests/database_e2e_test.go
  • tests/networking_e2e_test.go
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch pr-101-ha

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes several HA-related correctness issues introduced in PR #101 by moving workers to a durable queue + execution ledger model, tightening leader-election behavior, and adding resilience primitives (circuit breaker / bulkhead / retry) with metrics and drills.

Changes:

  • Migrate Provision / Pipeline / Cluster workers to ports.DurableTaskQueue semantics (Receive/Ack/Nack/ReclaimStale) and add Postgres-backed ExecutionLedger for idempotency.
  • Add Postgres advisory-lock leader election (PgLeaderElector) and a LeaderGuard wrapper to run singleton workers only on the elected leader.
  • Introduce resilience primitives + metrics (circuit breaker, bulkhead, retry) and add drills/release-gate tests.

Reviewed changes

Copilot reviewed 38 out of 38 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
internal/workers/provision_worker_test.go Updates tests to DurableQueue semantics (Ack/Nack expectations).
internal/workers/provision_worker.go Migrates provision worker to durable queue + ledger + reclaim loop.
internal/workers/pipeline_worker_test.go Updates pipeline worker test to pass durable message + Ack expectation.
internal/workers/pipeline_worker.go Migrates pipeline worker to durable queue + ledger + reclaim + step failure on log-collection error.
internal/workers/leader_guard_test.go Adds unit tests for leader-guard behavior (leader/not leader/restart/shutdown).
internal/workers/leader_guard.go Adds LeaderGuard wrapper around workers using LeaderElector.
internal/workers/cluster_worker_test.go Updates cluster worker tests for durable queue Ack/Nack + signature changes.
internal/workers/cluster_worker.go Migrates cluster worker to durable queue + ledger + reclaim + improved update warnings.
internal/repositories/redis/durable_task_queue_test.go Adds miniredis tests for durable queue group/receive/ack/nack/reclaim + legacy dequeue.
internal/repositories/redis/durable_task_queue.go Implements Redis Streams durable queue (consumer groups, reclaim, DLQ, retry counters w/ TTL).
internal/repositories/postgres/migrations/100_create_job_executions.up.sql Adds job_executions table for execution ledger.
internal/repositories/postgres/migrations/100_create_job_executions.down.sql Drops job_executions table.
internal/repositories/postgres/leader_elector_test.go Adds tests for deterministic/positive/unique lock ID hashing.
internal/repositories/postgres/leader_elector.go Adds Postgres advisory-lock leader elector + heartbeat + panic-safe goroutine.
internal/repositories/postgres/execution_ledger.go Adds Postgres execution ledger implementation (TryAcquire/MarkComplete/MarkFailed/GetStatus).
internal/repositories/noop/adapters.go Extends no-op task queue to DurableTaskQueue + adds NoopExecutionLedger.
internal/platform/retry_test.go Adds unit tests for retry behavior and backoff.
internal/platform/retry.go Adds retry helper with exponential backoff + jitter + metrics.
internal/platform/resilient_storage.go Adds resilient StorageBackend wrapper (CB/BH/Retry/Timeout).
internal/platform/resilient_network.go Adds resilient NetworkBackend wrapper (CB/BH/Retry/Timeout).
internal/platform/resilient_lb.go Adds resilient LB proxy wrapper (CB/Retry/Timeout).
internal/platform/resilient_dns.go Adds resilient DNS backend wrapper (CB/Retry/Timeout).
internal/platform/resilient_compute_test.go Adds tests validating compute wrapper behavior (CB/BH/timeout/passthrough).
internal/platform/resilient_compute.go Adds resilient ComputeBackend wrapper (CB/BH/Retry/Timeout).
internal/platform/resilience_metrics.go Adds Prometheus metrics for CB/BH/Retry.
internal/platform/circuit_breaker_test.go Expands circuit breaker test coverage (single-flight, callbacks, successRequired, string).
internal/platform/circuit_breaker.go Adds named/configurable CB with half-open single-flight + transition metrics.
internal/platform/bulkhead_test.go Adds bulkhead tests (limits, rejection, context, errors, available).
internal/platform/bulkhead.go Adds bulkhead implementation + metrics (concurrency/rejections).
internal/drills/release_gates_test.go Adds CI-style release gates validating resilience SLO invariants.
internal/drills/ha_drills_test.go Adds HA drill tests for CB/BH/Retry properties and transitions.
internal/core/ports/task_queue.go Introduces DurableTaskQueue + DurableMessage; deprecates Dequeue for parallel consumers.
internal/core/ports/leader.go Adds LeaderElector port.
internal/core/ports/execution_ledger.go Adds ExecutionLedger port.
internal/api/setup/dependencies.go Wires DurableQueue/Ledger into services/workers; adds LeaderGuard wrapping for singleton workers.
cmd/api/main_test.go Adds role-mode tests; updates runApplication to return error.
cmd/api/main.go Wraps backends with resilient decorators; introduces ROLE env; wires PgLeaderElector; worker-only mode skips HTTP.
.github/workflows/benchmarks.yml Splits benchmark reporting for PR vs main (no auto-push on PR).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +69 to +71
// Reclaim the stale entry using FOR UPDATE SKIP LOCKED.
// This prevents two reclaimers from racing to reclaim the same stale entry.
// If another reclaimer holds the lock (already reclaimed), we skip it.
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TryAcquire comment says the stale-entry reclaim uses "FOR UPDATE SKIP LOCKED", but the implementation actually uses an optimistic compare-and-swap update on (job_key, status, started_at) with RETURNING TRUE. Adjust the comment to match the real concurrency control mechanism so future readers don't rely on locking semantics that aren't present.

Suggested change
// Reclaim the stale entry using FOR UPDATE SKIP LOCKED.
// This prevents two reclaimers from racing to reclaim the same stale entry.
// If another reclaimer holds the lock (already reclaimed), we skip it.
// Reclaim the stale entry with an optimistic compare-and-swap update.
// The UPDATE only succeeds if the row is still in the previously observed
// (job_key, status='running', started_at) state; otherwise another worker
// updated it first and RETURNING yields no row.

Copilot uses AI. Check for mistakes.
Comment on lines +120 to +133
// Idempotency check.
if w.ledger != nil {
acquired, err := w.ledger.TryAcquire(workerCtx, jobKey, clusterStaleThreshold)
if err != nil {
w.logger.Error("execution ledger error",
"cluster_id", job.ClusterID, "msg_id", msg.ID, "error", err)
w.nackWithLog(workerCtx, msg.ID, "ledger try_acquire failed")
return
}
if !acquired {
w.logger.Info("skipping duplicate cluster job",
"cluster_id", job.ClusterID, "type", job.Type, "msg_id", msg.ID)
w.ackWithLog(workerCtx, msg.ID, "duplicate cluster job")
return
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When TryAcquire returns false, ClusterWorker immediately acks the message as a "duplicate cluster job". This is unsafe if the existing ledger row is status='running' but not yet past clusterStaleThreshold (e.g., the original worker crashed soon after acquiring the ledger). In that case, acking here drops the only durable copy of the job, so it can't be retried once the ledger entry becomes stale. Mirror the Provision/Pipeline pattern: call GetStatus and only Ack when status is 'completed' (or 'failed' if you want to drop), otherwise leave the message unacked/pending so it can be reclaimed and retried later.

Copilot uses AI. Check for mistakes.
Comment on lines +20 to +56
// PgLeaderElector implements ports.LeaderElector using Postgres session-level advisory locks.
// Each leader key is hashed to a 64-bit integer used as the advisory lock ID.
// The lock is session-scoped: held as long as the DB connection is alive.
type PgLeaderElector struct {
db DB
logger *slog.Logger
mu sync.Mutex
held map[string]bool // tracks which keys this instance holds
}

// NewPgLeaderElector creates a leader elector backed by Postgres advisory locks.
func NewPgLeaderElector(db DB, logger *slog.Logger) *PgLeaderElector {
return &PgLeaderElector{
db: db,
logger: logger,
held: make(map[string]bool),
}
}

// keyToLockID deterministically maps a string key to a 64-bit advisory lock ID.
func keyToLockID(key string) int64 {
h := fnv.New64a()
_, _ = h.Write([]byte(key))
// Ensure positive value for pg advisory lock (avoids negative lock IDs).
return int64(h.Sum64() & 0x7FFFFFFFFFFFFFFF)
}

// Acquire attempts to acquire the advisory lock for the given key.
// Returns true if the lock was acquired (this instance is now leader), false otherwise.
// Uses pg_try_advisory_lock which is non-blocking.
func (e *PgLeaderElector) Acquire(ctx context.Context, key string) (bool, error) {
lockID := keyToLockID(key)
var acquired bool
err := e.db.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired)
if err != nil {
return false, fmt.Errorf("leader election acquire failed for key %q: %w", key, err)
}
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PgLeaderElector uses Postgres session-level advisory locks, but all Acquire/Release/heartbeat calls go through the generic DB interface (compatible with *pgxpool.Pool). With a pool, each QueryRow/Exec may run on a different connection, so the lock may not actually be held (or released) by the same session; the heartbeat check can also run on a different session and produce incorrect results. To make session-scoped advisory locks correct, RunAsLeader should pin a single connection for the full leadership term (e.g., pool.Acquire → use that conn for pg_try_advisory_lock/pg_advisory_unlock/heartbeat, then Close the conn on release), or otherwise ensure all lock operations use the same underlying connection.

Copilot uses AI. Check for mistakes.
Comment on lines +210 to +213
pipe.HIncrBy(ctx, retryKey, "count", 1)
pipe.Expire(ctx, retryKey, 24*time.Hour)
pipeResults, _ := pipe.Exec(ctx)
attempts := pipeResults[0].(*redis.IntCmd).Val()
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReclaimStale ignores the error from pipe.Exec and then blindly indexes/type-asserts pipeResults[0]. If Exec returns an error (network issue, Redis down, ctx cancelled), this can panic or record an incorrect attempts count. Capture and handle the Exec error (and validate the returned cmds) before using the results; if it fails, return the error or at least skip processing the claimed ID so the reclaim loop can retry safely.

Suggested change
pipe.HIncrBy(ctx, retryKey, "count", 1)
pipe.Expire(ctx, retryKey, 24*time.Hour)
pipeResults, _ := pipe.Exec(ctx)
attempts := pipeResults[0].(*redis.IntCmd).Val()
attemptsCmd := pipe.HIncrBy(ctx, retryKey, "count", 1)
pipe.Expire(ctx, retryKey, 24*time.Hour)
if _, err := pipe.Exec(ctx); err != nil {
return nil, fmt.Errorf("increment retry counter for %s/%s/%s: %w", queueName, groupName, msgID, err)
}
attempts := attemptsCmd.Val()

Copilot uses AI. Check for mistakes.
Comment on lines +265 to +272
pipe.XAck(ctx, queueName, groupName, msgID)
pipe.XDel(ctx, queueName, msgID)
if _, err := pipe.Exec(ctx); err != nil {
// DLQ entry was added but original ack/del failed.
// Log but don't return error — message is in DLQ and original is still in PEL.
q.logger.Warn("dead-letter: failed to ack/del original message",
"msg_id", msgID, "dlq_id", added, "error", err)
}
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deadLetter logs and returns nil when the post-DLQ XAck/XDel pipeline fails. That leaves the original message pending in the PEL while also having a DLQ copy, which can lead to repeated DLQ entries on subsequent reclaim cycles and unbounded pending growth. Consider making the DLQ move idempotent/atomic (e.g., Lua script or MULTI/EXEC transaction for XADD+XACK+XDEL), or at minimum return an error when ack/del fails so the caller can retry cleanup deterministically.

Suggested change
pipe.XAck(ctx, queueName, groupName, msgID)
pipe.XDel(ctx, queueName, msgID)
if _, err := pipe.Exec(ctx); err != nil {
// DLQ entry was added but original ack/del failed.
// Log but don't return error — message is in DLQ and original is still in PEL.
q.logger.Warn("dead-letter: failed to ack/del original message",
"msg_id", msgID, "dlq_id", added, "error", err)
}
ackCmd := pipe.XAck(ctx, queueName, groupName, msgID)
delCmd := pipe.XDel(ctx, queueName, msgID)
if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("dead-letter cleanup failed after dlq add (msg_id=%s, dlq_id=%s): %w", msgID, added, err)
}
acked, err := ackCmd.Result()
if err != nil {
return fmt.Errorf("dead-letter xack result failed after dlq add (msg_id=%s, dlq_id=%s): %w", msgID, added, err)
}
if acked == 0 {
return fmt.Errorf("dead-letter xack affected 0 messages after dlq add (msg_id=%s, dlq_id=%s)", msgID, added)
}
deleted, err := delCmd.Result()
if err != nil {
return fmt.Errorf("dead-letter xdel result failed after dlq add (msg_id=%s, dlq_id=%s): %w", msgID, added, err)
}
if deleted == 0 {
return fmt.Errorf("dead-letter xdel affected 0 messages after dlq add (msg_id=%s, dlq_id=%s)", msgID, added)
}

Copilot uses AI. Check for mistakes.
Comment on lines +79 to +81
defer timer.Stop()
select {
case <-ctx.Done():
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry creates a new timer on each backoff iteration but uses defer timer.Stop() inside the retry loop. Defers in a loop accumulate until Retry returns, which can retain timers longer than necessary under high MaxAttempts and defeats the intent of promptly releasing timer resources. Prefer stopping the timer explicitly after the select (and draining the channel when needed) instead of deferring inside the loop.

Suggested change
defer timer.Stop()
select {
case <-ctx.Done():
select {
case <-ctx.Done():
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}

Copilot uses AI. Check for mistakes.
Comment on lines +24 to +26
// How long a message can sit in PEL before another consumer reclaims it.
// Must be longer than provisionStaleThreshold (15m) to avoid premature reclaim.
provisionReclaimMs = 60 * 1000 // 1 minute
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above provisionReclaimMs says the reclaim idle time "must be longer than provisionStaleThreshold (15m)", but the value is 1 minute. Given the surrounding logic, this looks like the comment is inverted/outdated and may mislead future tuning. Please update the comment to reflect the intended relationship between PEL reclaim idle time and the execution-ledger stale threshold (and why).

Copilot uses AI. Check for mistakes.
- Add RetryMaxAttempts and RetryBaseDelay options to all resilient wrappers
- Add retry call in callProtected: bulkhead -> circuit breaker -> retry -> timeout
- Add OperationName to RetryOpts for metrics labels
- Emit retry_attempts_total and retry_backoff_seconds metrics
origin/main changed AttachVolume to return (string, string, error) and
DetachVolume to return (string, error) to support container ID changes
for Docker bind mounts after stop->recreate->start cycles.

Updated all implementations (libvirt, firecracker, docker, noop),
wrappers (resilient_compute), and test mocks to match the new
interface signatures.

Issue: CI failure due to interface mismatch
Copilot AI review requested due to automatic review settings April 16, 2026 12:43
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 49 out of 49 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +418 to +423
m := m
sem <- struct{}{}
go func() {
defer func() { <-sem }()
w.processJob(ctx, &m, job)
}()
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In reclaimLoop, sem <- struct{}{} can block forever under saturation and ignores ctx.Done(), which can deadlock shutdown. Consider acquiring the semaphore with select { case sem<-...: case <-ctx.Done(): return } (or skipping reclaimed work when shutting down).

Copilot uses AI. Check for mistakes.
Comment on lines +305 to +310
m := m
sem <- struct{}{}
go func() {
defer func() { <-sem }()
w.processJob(ctx, &m, job)
}()
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In reclaimLoop, sem <- struct{}{} can block forever under high load and ignores ctx.Done(), which can deadlock shutdown. Acquire the semaphore with a select on ctx.Done() or skip scheduling reclaimed jobs once shutdown has started.

Copilot uses AI. Check for mistakes.
Comment on lines +69 to +71
// Reclaim the stale entry using FOR UPDATE SKIP LOCKED.
// This prevents two reclaimers from racing to reclaim the same stale entry.
// If another reclaimer holds the lock (already reclaimed), we skip it.
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says this reclaim uses “FOR UPDATE SKIP LOCKED”, but the implementation is an UPDATE ... WHERE started_at = $2 RETURNING TRUE compare-and-swap. Please update the comment to match the actual mechanism (or change the query) so future readers don’t assume row locking semantics that aren’t present.

Copilot uses AI. Check for mistakes.
Comment on lines +209 to +214
pipe := q.client.Pipeline()
pipe.HIncrBy(ctx, retryKey, "count", 1)
pipe.Expire(ctx, retryKey, 24*time.Hour)
pipeResults, _ := pipe.Exec(ctx)
attempts := pipeResults[0].(*redis.IntCmd).Val()

Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipe.Exec(ctx) errors are ignored here. If Exec fails, pipeResults can be nil/short and pipeResults[0].(*redis.IntCmd) can panic, potentially breaking the reclaim loop and leaving messages stuck. Handle the Exec error and validate result length/types before reading attempts (or use the returned *IntCmd from HIncrBy directly).

Copilot uses AI. Check for mistakes.
Comment on lines +113 to +117
sem <- struct{}{} // acquire concurrency slot
go func(m *ports.DurableMessage, j domain.ProvisionJob) {
defer func() { <-sem }()
w.processJob(ctx, m, j)
}(msg, job)
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sem <- struct{}{} can block indefinitely when the semaphore is full, and that send does not respect ctx.Done(). This can prevent the worker from shutting down cleanly (Run goroutine stuck on semaphore send even after context cancellation). Acquire the semaphore with a select that also listens for ctx.Done() (or move acquisition into the goroutine with a cancellable select).

Copilot uses AI. Check for mistakes.
Comment on lines +221 to +226
m := m // capture loop variable
sem <- struct{}{}
go func() {
defer func() { <-sem }()
w.processJob(ctx, &m, job)
}()
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In reclaimLoop, sem <- struct{}{} can also block forever when the worker is saturated, and it similarly ignores ctx.Done(). On shutdown this can deadlock the reclaim loop and keep the process alive. Use a select on ctx.Done() when acquiring the semaphore (and consider skipping reclaimed work when shutting down).

Copilot uses AI. Check for mistakes.
Comment on lines +101 to +105
sem <- struct{}{}
go func(m *ports.DurableMessage, j domain.BuildJob) {
defer func() { <-sem }()
w.processJob(ctx, m, j)
}(msg, job)
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sem <- struct{}{} can block indefinitely when the worker is at max concurrency, and that send doesn’t observe ctx.Done(). This can prevent graceful shutdown (Run loop stuck trying to acquire a slot). Acquire the semaphore with a select including ctx.Done() or restructure so shutdown can always progress.

Copilot uses AI. Check for mistakes.
Comment on lines 108 to +112
sem <- struct{}{}
go func() {
go func(m *ports.DurableMessage, j domain.ClusterJob) {
defer func() { <-sem }()
w.processJob(job)
}()
w.processJob(ctx, m, j)
}(msg, job)
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sem <- struct{}{} can block indefinitely when the worker is saturated, and the send doesn’t respect ctx.Done(). That can prevent the cluster worker from stopping cleanly (Run loop stuck on semaphore send). Acquire the semaphore with a select that also listens for ctx.Done() (or move acquisition into a cancellable goroutine).

Copilot uses AI. Check for mistakes.
Copilot AI review requested due to automatic review settings April 17, 2026 17:11
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 38 out of 38 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cmd/api/main.go
Comment on lines +133 to +143
rawCompute, rawStorage, rawNetwork, rawLBProxy, err := initBackends(deps, cfg, logger, db, rdb)
if err != nil {
logger.Error("backend initialization failed", "error", err)
return err
}

// Wrap raw backends with resilience decorators (circuit breaker, bulkhead, timeouts).
compute := platform.NewResilientCompute(rawCompute, logger, platform.ResilientComputeOpts{})
storage := platform.NewResilientStorage(rawStorage, logger, platform.ResilientStorageOpts{})
network := platform.NewResilientNetwork(rawNetwork, logger, platform.ResilientNetworkOpts{})
lbProxy := platform.NewResilientLB(rawLBProxy, logger, platform.ResilientLBOpts{})
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initBackends builds rawLBProxy using rawCompute (see setup.InitLBProxy for libvirt path). Since compute is wrapped after initBackends, the LB proxy’s internal compute calls won’t benefit from the compute bulkhead/timeouts in the libvirt case, even though ResilientLB’s comment assumes compute bulkhead already limits container/VM creation. Consider wrapping compute before calling InitLBProxy (or re-initializing LBProxy using the resilient compute backend) so LB proxy operations are subject to the same resilience limits.

Copilot uses AI. Check for mistakes.
Comment on lines +54 to +57
if avgLatency > 1*time.Millisecond {
t.Fatalf("average fail-fast latency %v exceeds 1ms SLO", avgLatency)
}
t.Logf("PASS: avg fail-fast latency = %v (SLO: <1ms)", avgLatency)
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestReleaseGate_CircuitBreakerFailFast asserts an average latency <1ms across 100 iterations. This is very sensitive to CI host load/scheduling and can be flaky even when fail-fast behavior is correct. Consider relaxing the threshold, using a higher-latency budget/percentile, gating via env/build tag, or converting this to a benchmark instead of a default-running test.

Suggested change
if avgLatency > 1*time.Millisecond {
t.Fatalf("average fail-fast latency %v exceeds 1ms SLO", avgLatency)
}
t.Logf("PASS: avg fail-fast latency = %v (SLO: <1ms)", avgLatency)
if avgLatency > 5*time.Millisecond {
t.Fatalf("average fail-fast latency %v exceeds 5ms SLO", avgLatency)
}
t.Logf("PASS: avg fail-fast latency = %v (SLO: <5ms)", avgLatency)

Copilot uses AI. Check for mistakes.
Comment on lines +20 to +27
const (
provisionQueue = "provision_queue"
provisionGroup = "provision_workers"
provisionMaxWorkers = 20
// How long a message can sit in PEL before another consumer reclaims it.
// Must be longer than provisionStaleThreshold (15m) to avoid premature reclaim.
provisionReclaimMs = 60 * 1000 // 1 minute
provisionReclaimN = 10
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provisionReclaimMs is set to 1 minute, but provisioning work can run up to 10 minutes (processJob timeout) and the ledger stale threshold is 15 minutes. Reclaiming after 1 minute will repeatedly XAUTOCLAIM in-flight messages, increment retry counters, and can eventually dead-letter work that is still actively being processed. Increase the reclaim idle threshold to exceed the maximum expected job runtime (or at least align it with the ledger stale threshold) and update the comment accordingly.

Copilot uses AI. Check for mistakes.
Comment on lines +246 to +248
w.logger.Info("deprovisioning succeeded", "cluster_id", cluster.ID)
_ = w.repo.Delete(ctx, cluster.ID)
return nil
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cluster deprovision success path ignores errors from repo.Delete. If Delete fails, the worker will Ack the queue message (in processJob) and MarkComplete in the ledger, leaving a cluster record that will never be retried/cleaned up. Consider handling the delete error (return it so the message is nacked/retried, or at least log+leave the job in a state that can be reconciled).

Copilot uses AI. Check for mistakes.
poyrazK added 2 commits April 17, 2026 20:49
- Replace math/rand/v2 with crypto/rand in retry backoff to address G404
- Add errors.Is for pgx.ErrNoRows comparisons in execution_ledger
- Use uintptr cast to silence G115 integer overflow in leader_elector
- Rewrite if-else chain to switch in provision_worker_test for gocritic
- Database: increase HTTP client timeout from 10s to 30s
- Autoscaling: increase scaling group cleanup wait from 15s to 45s
- Autoscaling/Networking: increase VPC deletion retry from 30s to 60s

HA retry logic can cause operations to take longer, especially when
instance termination involves retries or leadership elections cause
slight delays in worker processing.
Copilot AI review requested due to automatic review settings April 18, 2026 10:58
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 41 out of 41 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +20 to +67
// PgLeaderElector implements ports.LeaderElector using Postgres session-level advisory locks.
// Each leader key is hashed to a 64-bit integer used as the advisory lock ID.
// The lock is session-scoped: held as long as the DB connection is alive.
type PgLeaderElector struct {
db DB
logger *slog.Logger
mu sync.Mutex
held map[string]bool // tracks which keys this instance holds
}

// NewPgLeaderElector creates a leader elector backed by Postgres advisory locks.
func NewPgLeaderElector(db DB, logger *slog.Logger) *PgLeaderElector {
return &PgLeaderElector{
db: db,
logger: logger,
held: make(map[string]bool),
}
}

// keyToLockID deterministically maps a string key to a 64-bit advisory lock ID.
func keyToLockID(key string) int64 {
h := fnv.New64a()
_, _ = h.Write([]byte(key))
// Sum64 returns uint64. Mask to lower 63 bits (positive int64 range) and
// cast via uintptr to silence G115 integer overflow warning.
v := uintptr(h.Sum64() & 0x7FFFFFFFFFFFFFFF)
return int64(v)
}

// Acquire attempts to acquire the advisory lock for the given key.
// Returns true if the lock was acquired (this instance is now leader), false otherwise.
// Uses pg_try_advisory_lock which is non-blocking.
func (e *PgLeaderElector) Acquire(ctx context.Context, key string) (bool, error) {
lockID := keyToLockID(key)
var acquired bool
err := e.db.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired)
if err != nil {
return false, fmt.Errorf("leader election acquire failed for key %q: %w", key, err)
}

e.mu.Lock()
if acquired {
e.held[key] = true
}
e.mu.Unlock()

return acquired, nil
}
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leader elector uses session-level advisory locks, but it calls pg_try_advisory_lock/pg_advisory_unlock via the generic DB interface (typically backed by *pgxpool.Pool). With a pool, each call may use a different underlying connection, which breaks session-scoped lock semantics (locks can remain held on an idle pooled connection, Release may run on a different session and not unlock, and heartbeat may check the wrong session). Consider having PgLeaderElector hold a dedicated *pgxpool.Conn (or pgx.Conn) for the lifetime of leadership and run Acquire/heartbeat/Release on that same connection.

Copilot uses AI. Check for mistakes.
Comment on lines +80 to +82
defer timer.Stop()
select {
case <-ctx.Done():
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.NewTimer is created inside the retry loop, but defer timer.Stop() is also inside the loop. This defers all Stop() calls until Retry returns, which can accumulate many deferred calls and keep timer resources alive longer than necessary. Stop (and if needed drain) the timer per-iteration instead of deferring inside the loop (or use time.After / a reusable timer).

Suggested change
defer timer.Stop()
select {
case <-ctx.Done():
select {
case <-ctx.Done():
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}

Copilot uses AI. Check for mistakes.
Comment on lines +24 to +30
// How long a message can sit in PEL before another consumer reclaims it.
// Must be longer than provisionStaleThreshold (15m) to avoid premature reclaim.
provisionReclaimMs = 60 * 1000 // 1 minute
provisionReclaimN = 10
// Stale threshold for idempotency ledger: if a "running" entry is older
// than this, it is considered abandoned and can be reclaimed.
provisionStaleThreshold = 15 * time.Minute
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says the PEL reclaim idle time "must be longer than provisionStaleThreshold (15m)", but provisionReclaimMs is set to 1 minute. More importantly, with ReclaimStale incrementing retry attempts on every reclaim and maxRetries defaulting to 5, reclaiming every 1 minute can dead-letter a legitimately long-running provision (worker timeout is 10m) before it finishes. Consider setting provisionReclaimMs to a value comfortably larger than the expected max processing time (or change retry counting so it only increments on actual failures) and update the comment accordingly.

Copilot uses AI. Check for mistakes.
Comment on lines 103 to 118
@@ -58,53 +118,135 @@ func (cb *CircuitBreaker) Execute(fn func() error) error {
}
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CircuitBreaker.Execute sets halfOpenInFlight in allowRequest(), but if fn() panics, recordSuccess/recordFailure won't run and halfOpenInFlight can remain true indefinitely, causing the breaker to reject all future requests in half-open. Consider wrapping fn() with a defer that clears halfOpenInFlight (likely via recordFailure) on panic, and decide whether to re-panic or convert it into an error.

Copilot uses AI. Check for mistakes.
Comment on lines +120 to +134
// Idempotency check.
if w.ledger != nil {
acquired, err := w.ledger.TryAcquire(workerCtx, jobKey, clusterStaleThreshold)
if err != nil {
w.logger.Error("execution ledger error",
"cluster_id", job.ClusterID, "msg_id", msg.ID, "error", err)
w.nackWithLog(workerCtx, msg.ID, "ledger try_acquire failed")
return
}
if !acquired {
w.logger.Info("skipping duplicate cluster job",
"cluster_id", job.ClusterID, "type", job.Type, "msg_id", msg.ID)
w.ackWithLog(workerCtx, msg.ID, "duplicate cluster job")
return
}
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TryAcquire can return false not only for already-completed jobs but also when another worker currently owns a non-stale running execution. In that case, immediately Acking here can delete the only queue message while the other worker is still processing; if that worker crashes before MarkComplete, the job will be stuck as running in the ledger with no message left to retry. Mirror the provision/pipeline pattern: only ack when ledger status is completed (or otherwise terminal), and leave the message unacked when the job is actively running elsewhere.

Copilot uses AI. Check for mistakes.
poyrazK added 2 commits April 18, 2026 14:14
When deleting a VPC, the check for load balancers passes if they are
in DELETED status. However, the LB rows still exist in the database
because the LB worker processes deletions asynchronously.

This caused VPC deletion to fail with FK constraint violations.
The fix adds a cleanup step that removes LB rows marked as DELETED
before attempting to delete the VPC.
The autoscaling group cleanup involves terminating instances, which with
HA retries can take significant time. Increase timeouts to account for:
- HA retry delays on instance termination
- Worker tick interval (10 seconds)
- Multiple instances needing termination
Copilot AI review requested due to automatic review settings April 18, 2026 11:27
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 42 out of 42 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +21 to +30
const (
pipelineQueueName = "pipeline_build_queue"
pipelineGroup = "pipeline_workers"
pipelineMaxWorkers = 5
pipelineReclaimMs = 10 * 60 * 1000 // 10 minutes (builds are longer)
pipelineReclaimN = 5
// Stale threshold for idempotency ledger: builds can take up to 30 min,
// so a "running" entry older than this is considered abandoned.
pipelineStaleThreshold = 35 * time.Minute
)
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipelineReclaimMs (10 minutes) is shorter than the 30-minute job timeout; long-running builds will be auto-claimed while still processing, which can lead to duplicate execution attempts or message churn. Consider setting pipelineReclaimMs >= worst-case build duration (or >= pipelineStaleThreshold) or implementing a periodic "lease renewal" (XCLAIM-to-self) while a build is running.

Copilot uses AI. Check for mistakes.
Comment on lines +19 to +26
const (
clusterQueue = "k8s_jobs"
clusterGroup = "cluster_workers"
clusterMaxWorkers = 10
clusterReclaimMs = 5 * 60 * 1000 // 5 minutes
clusterReclaimN = 10
clusterStaleThreshold = 15 * time.Minute
clusterReceiveBackoff = 1 * time.Second
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clusterReclaimMs (5 minutes) is shorter than clusterStaleThreshold (15 minutes) and likely shorter than real cluster provision/upgrade runtimes. That means in-progress jobs can be auto-claimed and treated as stale while still running. Consider aligning clusterReclaimMs with the expected max job duration (or with clusterStaleThreshold), or add a lease-renewal mechanism while work is in flight.

Copilot uses AI. Check for mistakes.
Comment on lines +80 to +85
defer timer.Stop()
select {
case <-ctx.Done():
return lastErr
case <-timer.C:
}
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer timer.Stop() is inside the retry loop. Because defer runs only when Retry returns, this accumulates one deferred call per attempt and can retain timers longer than needed. Prefer stopping the timer explicitly at the end of each iteration (and draining timer.C if Stop() returns false) rather than deferring inside the loop.

Suggested change
defer timer.Stop()
select {
case <-ctx.Done():
return lastErr
case <-timer.C:
}
select {
case <-ctx.Done():
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
return lastErr
case <-timer.C:
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}

Copilot uses AI. Check for mistakes.
Comment on lines +135 to +147
if !acquired {
// Check if it's already finished or just being processed by someone else.
status, _, _, getErr := w.ledger.GetStatus(workerCtx, jobKey)
if getErr == nil && status == "completed" {
w.logger.Info("skipping already completed provision job",
"instance_id", job.InstanceID, "msg_id", msg.ID)
w.ackWithLog(workerCtx, msg.ID, "provision already completed")
return
}
w.logger.Info("provision job is currently being processed by another worker",
"instance_id", job.InstanceID, "msg_id", msg.ID)
return // Leave unacked for redelivery/wait.
}
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When TryAcquire returns false and the ledger status is not completed, the worker returns without ack/nack. This leaves the message pending under the current consumer; with reclaim enabled it can cause repeated reclaims/processing attempts and an ever-growing PEL if the job never becomes acked. Consider explicitly handling the running vs failed cases (e.g., re-check later, or only leave pending when you can guarantee it won't be reclaimed prematurely; otherwise ack duplicates safely).

Copilot uses AI. Check for mistakes.
Comment on lines +113 to +134
// Idempotency check: skip if already completed or actively being processed.
if w.ledger != nil {
acquired, err := w.ledger.TryAcquire(workerCtx, jobKey, pipelineStaleThreshold)
if err != nil {
w.logger.Error("execution ledger error",
"build_id", job.BuildID, "msg_id", msg.ID, "error", err)
w.nackWithLog(workerCtx, msg.ID, "ledger try_acquire failed")
return
}
if !acquired {
// Check if it's already finished or just being processed by someone else.
status, _, _, getErr := w.ledger.GetStatus(workerCtx, jobKey)
if getErr == nil && status == "completed" {
w.logger.Info("skipping already completed pipeline job",
"build_id", job.BuildID, "msg_id", msg.ID)
w.ackWithLog(workerCtx, msg.ID, "pipeline already completed")
return
}
w.logger.Info("pipeline job is currently being processed by another worker",
"build_id", job.BuildID, "msg_id", msg.ID)
return // Leave unacked for redelivery/wait.
}
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When TryAcquire returns false and the ledger status isn't completed, the code returns without ack/nack. This leaves the message stuck pending for this consumer and relies entirely on the reclaim loop to make progress, which can cause repeated reclaim churn. Consider explicitly deciding what to do for running vs failed (e.g., defer processing but ensure the message won't be reclaimed prematurely, or ack known-duplicate messages once it's safe).

Copilot uses AI. Check for mistakes.
Comment on lines +70 to +79
// Reclaim the stale entry using FOR UPDATE SKIP LOCKED.
// This prevents two reclaimers from racing to reclaim the same stale entry.
// If another reclaimer holds the lock (already reclaimed), we skip it.
var reclaimed bool
err := l.db.QueryRow(ctx, `
UPDATE job_executions
SET started_at = NOW(), status = 'running'
WHERE job_key = $1 AND status = 'running' AND started_at = $2
RETURNING TRUE
`, jobKey, startedAt).Scan(&reclaimed)
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says the stale-running reclaim uses "FOR UPDATE SKIP LOCKED", but the implementation is an UPDATE ... WHERE ... started_at = $2 RETURNING TRUE without any FOR UPDATE. Either update the comment to match the actual approach or switch to a locking strategy if that was the intent.

Copilot uses AI. Check for mistakes.
When deleting a VPC, scaling groups marked as DELETING may still have
their DB rows present due to async autoscaling worker cleanup. This
causes FK constraint violations when VPC deletion is attempted.

The fix adds cleanup of DELETING scaling groups before VPC deletion,
similar to the existing cleanup for DELETED load balancers.

This ensures VPC deletion succeeds even when the autoscaling worker
hasn't finished cleaning up (e.g., due to leadership election delays).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants