From 0a6141f67fb97c9e16db76f97765c863cadd3af7 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 12 Mar 2026 14:42:19 +0100 Subject: [PATCH 01/14] Moving failed jobs to the archive table --- verifier/jobqueue/README.md | 275 +++++++++++++++++++++++ verifier/jobqueue/interface.go | 11 +- verifier/jobqueue/postgres_queue.go | 79 +++++-- verifier/jobqueue/postgres_queue_test.go | 94 +++++--- 4 files changed, 405 insertions(+), 54 deletions(-) create mode 100644 verifier/jobqueue/README.md diff --git a/verifier/jobqueue/README.md b/verifier/jobqueue/README.md new file mode 100644 index 000000000..f2e9bf5f9 --- /dev/null +++ b/verifier/jobqueue/README.md @@ -0,0 +1,275 @@ +# Job Queue System + +## Overview + +The Job Queue provides a durable, PostgreSQL-backed task queue with automatic retry, failure handling, and job archiving. It's designed for reliable message processing in the CCV verifier pipeline. + +## Architecture + +The queue consists of two main tables: +- **Active Table** (`ccv_task_verifier_jobs` / `ccv_storage_writer_jobs`): Contains jobs that are pending or currently being processed +- **Archive Table** (`*_archive`): Contains completed and failed jobs for audit purposes + +## Job States + +Jobs transition through the following states: + +| State | Description | +|-------|-------------| +| `pending` | Job is waiting to be consumed by a worker | +| `processing` | Job is currently being processed by a worker | +| `completed` | Job finished successfully (exists only in archive) | +| `failed` | Job permanently failed (exists only in archive) | + +**Important**: Only `pending` and `processing` jobs exist in the active table. Once a job is `completed` or `failed`, it is immediately moved to the archive table. + +## State Machine Diagram + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ ACTIVE TABLE │ +│ │ +│ ┌─────────┐ │ +│ │ Publish │ │ +│ └────┬────┘ │ +│ │ │ +│ v │ +│ ┌─────────┐ ┌────────────┐ │ +│ │ pending │──Consume────>│ processing │ │ +│ └─────────┘ └─────┬──────┘ │ +│ ^ │ │ +│ │ │ │ +│ │ ┌────┼────┬──────────┐ │ +│ │ │ │ │ │ │ +│ │ │ │ │ │ │ +│ Retry (within │ │ │ │ │ +│ deadline) v v v v │ +│ │ Complete Fail Retry Retry │ +│ └───────────────────┘ │ (deadline (deadline │ +│ │ exceeded) exceeded) │ +│ │ │ │ │ +└─────────────────────────────────┼──────┼──────────┼─────────────────┘ + │ │ │ + │ v v + ┌───────────┴──┐ ┌──────────────┐ + │ Archive │ │ Archive │ + │ (completed) │ │ (failed) │ + └──────────────┘ └──────────────┘ + + ARCHIVE TABLE (completed or failed) +``` + +## State Transitions + +### 1. Publish → Pending + +```go +err := queue.Publish(ctx, job1, job2, job3) +``` + +- Creates new jobs in `pending` state +- Sets `available_at` timestamp (default: immediate) +- Sets `retry_deadline` based on `RetryDuration` config +- Jobs become immediately available for consumption + +### 2. Consume: Pending → Processing + +```go +jobs, err := queue.Consume(ctx, batchSize) +``` + +**Selection Criteria:** +- Jobs in `pending` state where `available_at <= NOW()` +- Jobs in `processing` state where `started_at + LockDuration < NOW()` (stale locks) + +**Effects:** +- Updates status to `processing` +- Sets `started_at` timestamp +- Increments `attempt_count` +- Locks the job using `FOR UPDATE SKIP LOCKED` (prevents duplicate consumption) + +**Note**: Failed jobs are **NOT** consumed - they are archived and cannot be retried. + +### 3. Complete: Processing → Archived (Completed) + +```go +err := queue.Complete(ctx, jobID1, jobID2) +``` + +- Deletes jobs from active table +- Inserts into archive table with `completed` status +- Sets `completed_at` timestamp +- Creates permanent audit trail + +### 4. Retry: Processing → Pending (or Archived if deadline exceeded) + +```go +err := queue.Retry(ctx, delay, errorMap, jobID1, jobID2) +``` + +**If `NOW() < retry_deadline`:** +- Updates status to `pending` +- Sets `available_at = NOW() + delay` +- Records error message in `last_error` +- Job becomes available for retry after delay + +**If `NOW() >= retry_deadline`:** +- Updates status to `failed` +- Records error message +- **Archives the job** (moves to archive table) +- Job will NOT be retried again + +### 5. Fail: Processing → Archived (Failed) + +```go +err := queue.Fail(ctx, errorMap, jobID1, jobID2) +``` + +- Updates status to `failed` +- Records error message in `last_error` +- **Immediately archives the job** (moves to archive table) +- Job will NOT be retried + +## Key Invariant + +**Every job consumed from the queue must eventually be:** +- **Completed** → archived with `completed` status +- **Failed** → archived with `failed` status +- **Retried** → returned to `pending` OR archived if retry deadline exceeded + +This design ensures: +- ✅ Active table only contains pending/processing jobs +- ✅ No unbounded table growth +- ✅ Complete audit trail in archive +- ✅ Predictable resource usage + +## Stale Lock Recovery + +If a worker crashes while processing a job, the job remains in `processing` state. The queue automatically reclaims these "stale" jobs: + +``` +Worker A: Consume job → started_at = 10:00 AM → [CRASH] +Worker B: Consume (at 10:15 AM) → detects stale lock (10:15 - 10:00 > LockDuration) +Worker B: Reclaims job → attempt_count++ +``` + +**Configuration:** +- `LockDuration`: How long a job can stay in `processing` before being reclaimed (default: 1 minute) + +## Configuration + +```go +type QueueConfig struct { + Name string // Queue name for logging and table naming + OwnerID string // Scopes jobs so multiple verifiers can share tables + RetryDuration time.Duration // How long jobs can be retried before permanent failure + LockDuration time.Duration // How long before a processing job is considered stale +} +``` + +## Usage Example + +### Basic Flow + +```go +// 1. Create queue +queue, err := jobqueue.NewPostgresJobQueue[MyJob]( + db, + jobqueue.QueueConfig{ + Name: "my_jobs", + OwnerID: "my-verifier", + RetryDuration: time.Hour, + LockDuration: time.Minute, + }, + logger, +) + +// 2. Publish jobs +err = queue.Publish(ctx, job1, job2, job3) + +// 3. Worker: Consume and process +jobs, err := queue.Consume(ctx, 10) // batch of up to 10 jobs +for _, job := range jobs { + err := processJob(job) + if err == nil { + // Success - archive as completed + queue.Complete(ctx, job.ID) + } else if isTransientError(err) { + // Transient error - retry after delay + queue.Retry(ctx, 10*time.Second, map[string]error{job.ID: err}, job.ID) + } else { + // Permanent error - archive as failed + queue.Fail(ctx, map[string]error{job.ID: err}, job.ID) + } +} + +// 4. Periodic cleanup of old archive entries +deleted, err := queue.Cleanup(ctx, 30*24*time.Hour) // delete after 30 days +``` + +### Delayed Publishing + +```go +// Publish job that becomes available in 1 hour +err := queue.PublishWithDelay(ctx, time.Hour, job) +``` + +### Monitoring + +```go +// Get count of pending + processing jobs +size, err := queue.Size(ctx) +log.Printf("Queue size: %d", size) +``` + +## Concurrency Guarantees + +- **Multiple Publishers**: Safe - concurrent `Publish()` calls are isolated +- **Multiple Consumers**: Safe - `SELECT FOR UPDATE SKIP LOCKED` ensures no duplicate consumption +- **Concurrent Operations**: Safe - all operations use database transactions + +## Error Handling Best Practices + +### Transient Errors (Should Retry) +- Network timeouts +- Temporary database unavailability +- Rate limit errors +- 503 Service Unavailable + +**Action**: Call `Retry()` with appropriate delay + +### Permanent Errors (Should Fail) +- Invalid data format +- Authorization failures +- 404 Not Found +- Business logic violations + +**Action**: Call `Fail()` to archive immediately + +### Retry Strategy Example + +```go +func handleResult(queue JobQueue, job Job, err error) { + if err == nil { + queue.Complete(ctx, job.ID) + return + } + + // Check retry deadline + if time.Now().After(job.RetryDeadline) { + // Too late to retry - fail permanently + queue.Fail(ctx, map[string]error{job.ID: err}, job.ID) + return + } + + // Classify error + if isTransient(err) { + // Exponential backoff based on attempt count + delay := time.Duration(math.Pow(2, float64(job.AttemptCount))) * time.Second + queue.Retry(ctx, delay, map[string]error{job.ID: err}, job.ID) + } else { + // Permanent error + queue.Fail(ctx, map[string]error{job.ID: err}, job.ID) + } +} +``` \ No newline at end of file diff --git a/verifier/jobqueue/interface.go b/verifier/jobqueue/interface.go index 9bc057f8a..edd8ccb8f 100644 --- a/verifier/jobqueue/interface.go +++ b/verifier/jobqueue/interface.go @@ -56,11 +56,14 @@ type JobQueue[T Jobable] interface { // Useful for implementing retry backoff strategies. PublishWithDelay(ctx context.Context, delay time.Duration, jobs ...T) error // Consume retrieves and locks up to batchSize jobs for processing. - // Jobs in 'pending' or 'failed' status that are past their available_at time are eligible. + // Jobs in 'pending' status that are past their available_at time are eligible. // Additionally, jobs stuck in 'processing' for longer than the configured LockDuration // are considered stale (e.g. from a crashed worker) and are automatically reclaimed. // Returns empty slice if no jobs are available. // + // Note: Jobs in 'failed' status are NOT consumed. They remain in the active table + // until archived by Cleanup or manually investigated. + // // The implementation should use SELECT FOR UPDATE SKIP LOCKED to ensure // concurrent consumers don't compete for the same jobs. Consume(ctx context.Context, batchSize int) ([]Job[T], error) @@ -69,10 +72,10 @@ type JobQueue[T Jobable] interface { Complete(ctx context.Context, jobIDs ...string) error // Retry schedules jobs for retry after the specified delay. // Increments attempt count and records the error message. - // If max attempts is exceeded, jobs are moved to failed status. + // If the retry deadline has been exceeded, jobs are marked as failed and archived. Retry(ctx context.Context, delay time.Duration, errors map[string]error, jobIDs ...string) error - // Fail marks jobs as permanently failed. - // These jobs will not be retried and should be investigated. + // Fail marks jobs as permanently failed and moves them to the archive. + // These jobs will not be retried and should be investigated via the archive table. Fail(ctx context.Context, errors map[string]error, jobIDs ...string) error // Cleanup archives or deletes jobs older than the retention period. // Should be called periodically to prevent unbounded table growth. diff --git a/verifier/jobqueue/postgres_queue.go b/verifier/jobqueue/postgres_queue.go index b86303853..c6b3e6967 100644 --- a/verifier/jobqueue/postgres_queue.go +++ b/verifier/jobqueue/postgres_queue.go @@ -133,8 +133,9 @@ func (q *PostgresJobQueue[T]) Consume(ctx context.Context, batchSize int) ([]Job staleBefore := now.Add(-q.config.LockDuration) // Select jobs that are: - // 1. pending/failed and past their available_at, OR + // 1. pending and past their available_at, OR // 2. processing but started_at is older than lockDuration (stale lock from crashed worker) + // Note: Failed jobs are NOT consumed - they remain in the table until archived or cleaned up. query := fmt.Sprintf(` UPDATE %s SET status = $1, @@ -144,12 +145,12 @@ func (q *PostgresJobQueue[T]) Consume(ctx context.Context, batchSize int) ([]Job SELECT id FROM %s WHERE owner_id = $3 AND ( - (status IN ($4, $5) AND available_at <= $6) + (status = $4 AND available_at <= $5) OR - (status = $7 AND started_at IS NOT NULL AND started_at <= $8) + (status = $6 AND started_at IS NOT NULL AND started_at <= $7) ) ORDER BY available_at ASC, id ASC - LIMIT $9 + LIMIT $8 FOR UPDATE SKIP LOCKED ) RETURNING id, job_id, task_data, attempt_count, retry_deadline, created_at, started_at, chain_selector, message_id @@ -160,11 +161,10 @@ func (q *PostgresJobQueue[T]) Consume(ctx context.Context, batchSize int) ([]Job now, // $2 started_at q.ownerID, // $3 JobStatusPending, // $4 - JobStatusFailed, // $5 - now, // $6 available_at <= - JobStatusProcessing, // $7 stale processing - staleBefore, // $8 started_at <= - batchSize, // $9 + now, // $5 available_at <= + JobStatusProcessing, // $6 stale processing + staleBefore, // $7 started_at <= + batchSize, // $8 ) if err != nil { return nil, fmt.Errorf("failed to consume jobs: %w", err) @@ -279,6 +279,7 @@ func (q *PostgresJobQueue[T]) Complete(ctx context.Context, jobIDs ...string) er } // Retry schedules jobs for retry after delay. +// If the retry deadline has been exceeded, jobs are marked as failed and archived. func (q *PostgresJobQueue[T]) Retry(ctx context.Context, delay time.Duration, errors map[string]error, jobIDs ...string) error { if len(jobIDs) == 0 { return nil @@ -352,6 +353,34 @@ func (q *PostgresJobQueue[T]) Retry(ctx context.Context, delay time.Duration, er return err } + // Archive jobs that exceeded the retry deadline + if len(failed) > 0 { + archiveQuery := fmt.Sprintf(` + WITH failed AS ( + DELETE FROM %s + WHERE job_id = ANY($1) + AND owner_id = $2 + AND status = $3 + RETURNING * + ) + INSERT INTO %s + SELECT *, NOW() as completed_at + FROM failed + `, q.tableName, q.archiveName) + + result, err := q.ds.ExecContext(ctx, archiveQuery, pq.Array(failed), q.ownerID, JobStatusFailed) + if err != nil { + q.logger.Errorw("Failed to archive jobs that exceeded retry deadline", + "error", err, + "count", len(failed)) + } else { + affected, _ := result.RowsAffected() + q.logger.Infow("Archived jobs that exceeded retry deadline", + "queue", q.config.Name, + "count", affected) + } + } + q.logger.Infow("Retried jobs", "queue", q.config.Name, "retried", len(retried), @@ -362,13 +391,15 @@ func (q *PostgresJobQueue[T]) Retry(ctx context.Context, delay time.Duration, er return nil } -// Fail marks jobs as permanently failed. +// Fail marks jobs as permanently failed and moves them to the archive. +// This ensures failed jobs don't remain in the active queue indefinitely. func (q *PostgresJobQueue[T]) Fail(ctx context.Context, errors map[string]error, jobIDs ...string) error { if len(jobIDs) == 0 { return nil } - query := fmt.Sprintf(` + // First, update the last_error for each job + updateQuery := fmt.Sprintf(` UPDATE %s SET status = $1, last_error = $2 @@ -377,7 +408,7 @@ func (q *PostgresJobQueue[T]) Fail(ctx context.Context, errors map[string]error, `, q.tableName) err := sqlutil.TransactDataSource(ctx, q.ds, nil, func(tx sqlutil.DataSource) error { - stmt, err := tx.PrepareContext(ctx, query) + stmt, err := tx.PrepareContext(ctx, updateQuery) if err != nil { return fmt.Errorf("failed to prepare fail statement: %w", err) } @@ -406,9 +437,29 @@ func (q *PostgresJobQueue[T]) Fail(ctx context.Context, errors map[string]error, return err } - q.logger.Infow("Failed jobs", + // Now move failed jobs to archive + archiveQuery := fmt.Sprintf(` + WITH failed AS ( + DELETE FROM %s + WHERE job_id = ANY($1) + AND owner_id = $2 + AND status = $3 + RETURNING * + ) + INSERT INTO %s + SELECT *, NOW() as completed_at + FROM failed + `, q.tableName, q.archiveName) + + result, err := q.ds.ExecContext(ctx, archiveQuery, pq.Array(jobIDs), q.ownerID, JobStatusFailed) + if err != nil { + return fmt.Errorf("failed to archive failed jobs: %w", err) + } + + affected, _ := result.RowsAffected() + q.logger.Infow("Failed and archived jobs", "queue", q.config.Name, - "count", len(jobIDs), + "count", affected, ) return nil diff --git a/verifier/jobqueue/postgres_queue_test.go b/verifier/jobqueue/postgres_queue_test.go index 2621b3c85..50fe38fe7 100644 --- a/verifier/jobqueue/postgres_queue_test.go +++ b/verifier/jobqueue/postgres_queue_test.go @@ -401,9 +401,12 @@ func TestRetryExceedsDeadline(t *testing.T) { errs := map[string]error{jobID: errors.New("fatal")} require.NoError(t, q.Retry(ctx, 0, errs, jobID)) - // Job should be marked as failed because retry deadline has passed. - assert.Equal(t, 1, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed)) + // Job should be archived (not in active table) because retry deadline has passed + assert.Equal(t, 0, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed)) assert.Equal(t, 0, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusPending)) + + // Job should be in the archive + assert.Equal(t, 1, countRows(t, db, "ccv_task_verifier_jobs_archive", jobqueue.JobStatusFailed)) } func TestRetryWithDelay(t *testing.T) { @@ -440,18 +443,22 @@ func TestFail(t *testing.T) { errs := map[string]error{jobID: errors.New("permanent")} require.NoError(t, q.Fail(ctx, errs, jobID)) - assert.Equal(t, 1, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed)) + // Failed job should NOT be in the active table + assert.Equal(t, 0, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed)) + + // Failed job should be in the archive + assert.Equal(t, 1, countRows(t, db, "ccv_task_verifier_jobs_archive", jobqueue.JobStatusFailed)) - // Verify error is stored + // Verify error is stored in the archive var lastErr string - err = db.QueryRowxContext(ctx, "SELECT last_error FROM ccv_task_verifier_jobs WHERE job_id = $1", jobID).Scan(&lastErr) + err = db.QueryRowxContext(ctx, "SELECT last_error FROM ccv_task_verifier_jobs_archive WHERE job_id = $1", jobID).Scan(&lastErr) require.NoError(t, err) assert.Equal(t, "permanent", lastErr) } -// Failed jobs are re-consumable by the Consume query (status IN ('pending','failed')). -func TestFailedJobsAreReconsumed(t *testing.T) { - q, _ := newTestQueue(t) +// Failed jobs are NOT re-consumable and should be archived. +func TestFailedJobsAreNotReconsumed(t *testing.T) { + q, db := newTestQueue(t) ctx := context.Background() require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) @@ -459,15 +466,21 @@ func TestFailedJobsAreReconsumed(t *testing.T) { consumed, err := q.Consume(ctx, 1) require.NoError(t, err) require.Len(t, consumed, 1) + jobID := consumed[0].ID // Mark as failed - require.NoError(t, q.Fail(ctx, map[string]error{consumed[0].ID: errors.New("err")}, consumed[0].ID)) + require.NoError(t, q.Fail(ctx, map[string]error{jobID: errors.New("err")}, jobID)) - // Should be consumable again - consumed2, err := q.Consume(ctx, 1) + // Failed job should NOT be in the active table + assert.Equal(t, 0, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed)) + + // Failed job should be in the archive + assert.Equal(t, 1, countRows(t, db, "ccv_task_verifier_jobs_archive", jobqueue.JobStatusFailed)) + + // Should NOT be consumable again + consumed2, err := q.Consume(ctx, 10) require.NoError(t, err) - require.Len(t, consumed2, 1) - assert.Equal(t, consumed[0].ID, consumed2[0].ID) + assert.Empty(t, consumed2, "failed jobs should not be consumed") } func TestCleanup(t *testing.T) { @@ -551,27 +564,27 @@ func TestSize(t *testing.T) { require.NoError(t, err) assert.Equal(t, 3, size) - // Fail the last consumed job (it stays in failed status) + // Fail the last consumed job (it is archived immediately) require.NoError(t, q.Fail(ctx, map[string]error{consumed[2].ID: errors.New("test error")}, consumed[2].ID)) - // Size should be 2 (2 pending only - failed jobs are excluded from Size) + // Size should be 2 (2 pending only - failed jobs are archived, not in active table) size, err = q.Size(ctx) require.NoError(t, err) assert.Equal(t, 2, size) - // Consume remaining jobs - Consume WILL pick up failed jobs for retry - // So this returns: 2 pending + 1 failed (that becomes processing) + // Consume remaining jobs - should only get the 2 pending jobs + // Failed jobs are archived and NOT consumed consumed2, err := q.Consume(ctx, 10) require.NoError(t, err) - require.Len(t, consumed2, 3) // 2 pending + 1 previously failed + require.Len(t, consumed2, 2) // Only 2 pending jobs (failed job was archived) - // Size should be 3 (all are now processing, and processing IS counted in Size) + // Size should be 2 (2 processing) size, err = q.Size(ctx) require.NoError(t, err) - assert.Equal(t, 3, size) + assert.Equal(t, 2, size) // Complete all remaining - require.NoError(t, q.Complete(ctx, consumed2[0].ID, consumed2[1].ID, consumed2[2].ID)) + require.NoError(t, q.Complete(ctx, consumed2[0].ID, consumed2[1].ID)) // Size should be 0 size, err = q.Size(ctx) @@ -828,15 +841,17 @@ func TestConcurrentRetryAndFail(t *testing.T) { t.Logf("completed=%d, failed=%d", completed.Load(), failed.Load()) - // All jobs should have been either completed (moved to archive) or remain in the main table as failed + // All jobs should be archived (either completed or failed) + // Some jobs might still be pending if they were retried but workers exited before consuming them again archivedCount := countAllRows(t, db, "ccv_task_verifier_jobs_archive") - failedCount := countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed) - - // Every job should be accounted for in one of these two places - // Some jobs might still be pending if they were retried but workers exited before consuming them again. pendingCount := countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusPending) - totalAccountedFor := archivedCount + failedCount + pendingCount - assert.Equal(t, numJobs, totalAccountedFor, "all jobs should be accounted for") + + // Failed jobs should be in archive, not in active table + failedInActiveTable := countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed) + assert.Equal(t, 0, failedInActiveTable, "failed jobs should be archived, not in active table") + + totalAccountedFor := archivedCount + pendingCount + assert.Equal(t, numJobs, totalAccountedFor, "all jobs should be either archived or pending") } func TestRetryDeadlineExhaustionCycle(t *testing.T) { @@ -867,19 +882,23 @@ func TestRetryDeadlineExhaustionCycle(t *testing.T) { // Wait for the retry deadline to expire time.Sleep(60 * time.Millisecond) - // Attempt 3: consume → retry (deadline has now passed, should fail permanently) + // Attempt 3: consume → retry (deadline has now passed, should fail permanently and archive) consumed, err = q.Consume(ctx, 1) require.NoError(t, err) require.Len(t, consumed, 1) assert.Equal(t, 3, consumed[0].AttemptCount) require.NoError(t, q.Retry(ctx, 0, map[string]error{jobID: errors.New("err3")}, jobID)) - // Job should now be in failed status (retry deadline passed) - assert.Equal(t, 1, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed)) + // Job should now be archived (not in active table) because retry deadline passed + assert.Equal(t, 0, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed)) + assert.Equal(t, 0, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusPending)) + + // Job should be in the archive with failed status + assert.Equal(t, 1, countRows(t, db, "ccv_task_verifier_jobs_archive", jobqueue.JobStatusFailed)) - // Verify last_error was recorded + // Verify last_error was recorded in the archive var lastErr string - err = db.QueryRowxContext(ctx, "SELECT last_error FROM ccv_task_verifier_jobs WHERE job_id = $1", jobID).Scan(&lastErr) + err = db.QueryRowxContext(ctx, "SELECT last_error FROM ccv_task_verifier_jobs_archive WHERE job_id = $1", jobID).Scan(&lastErr) require.NoError(t, err) assert.Equal(t, "err3", lastErr) } @@ -1046,8 +1065,11 @@ func TestEndToEndConcurrentWithRandomWork(t *testing.T) { remainingFailed := countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed) remainingPending := countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusPending) - totalAccounted := archived + remainingFailed + remainingPending + // Failed jobs should be archived, not in active table + assert.Equal(t, 0, remainingFailed, "failed jobs should be archived, not in active table") + + totalAccounted := archived + remainingPending assert.Equal(t, totalJobs, totalAccounted, - "all %d jobs should be archived (%d) + failed (%d) + pending (%d)", - totalJobs, archived, remainingFailed, remainingPending) + "all %d jobs should be archived (%d) + pending (%d)", + totalJobs, archived, remainingPending) } From 22cf928258421be5e38700ffb6fc0a32c16126a0 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 12 Mar 2026 15:51:32 +0100 Subject: [PATCH 02/14] Adding readme with deails --- verifier/jobqueue/README.md | 63 +++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/verifier/jobqueue/README.md b/verifier/jobqueue/README.md index f2e9bf5f9..24556457e 100644 --- a/verifier/jobqueue/README.md +++ b/verifier/jobqueue/README.md @@ -26,37 +26,38 @@ Jobs transition through the following states: ## State Machine Diagram ``` -┌─────────────────────────────────────────────────────────────────────┐ -│ ACTIVE TABLE │ -│ │ -│ ┌─────────┐ │ -│ │ Publish │ │ -│ └────┬────┘ │ -│ │ │ -│ v │ -│ ┌─────────┐ ┌────────────┐ │ -│ │ pending │──Consume────>│ processing │ │ -│ └─────────┘ └─────┬──────┘ │ -│ ^ │ │ -│ │ │ │ -│ │ ┌────┼────┬──────────┐ │ -│ │ │ │ │ │ │ -│ │ │ │ │ │ │ -│ Retry (within │ │ │ │ │ -│ deadline) v v v v │ -│ │ Complete Fail Retry Retry │ -│ └───────────────────┘ │ (deadline (deadline │ -│ │ exceeded) exceeded) │ -│ │ │ │ │ -└─────────────────────────────────┼──────┼──────────┼─────────────────┘ - │ │ │ - │ v v - ┌───────────┴──┐ ┌──────────────┐ - │ Archive │ │ Archive │ - │ (completed) │ │ (failed) │ - └──────────────┘ └──────────────┘ - - ARCHIVE TABLE (completed or failed) +┌────────────────────────────────────────────────────────────────────────┐ +│ ACTIVE TABLE │ +│ │ +│ ┌─────────┐ │ +│ │ Publish │ │ +│ └────┬────┘ │ +│ │ │ +│ v │ +│ ┌─────────┐ ┌────────────┐ │ +│ │ pending │───Consume─────>│ processing │ │ +│ └─────────┘ └──────┬─────┘ │ +│ ^ │ │ +│ │ │ │ +│ │ Retry │ │ +│ │ (within ┌──────┼────────┬──────────┐ │ +│ │ deadline) │ │ │ │ │ +│ │ v v v v │ +│ └──────────────── Retry Complete Fail Retry │ +│ │ (exceeded deadline) │ +│ │ │ │ +│ │ │ │ +│ │ │ │ +└──────────────────────────────────────┼─────────────┼───────────────────┘ + │ │ + │ │ + v v + ┌─────────────┐ ┌─────────────┐ + │ Archive │ │ Archive │ + │ (completed) │ │ (failed) │ + └─────────────┘ └─────────────┘ + + ARCHIVE TABLE ``` ## State Transitions From 844d79dbaa8a1dd0d2b56b45c05944b6c89626a1 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 12 Mar 2026 16:36:17 +0100 Subject: [PATCH 03/14] Fixes --- verifier/const.go | 5 + verifier/source_reader_service.go | 4 + verifier/storage_writer.go | 30 ++- verifier/task_verifier.go | 82 ++++++-- verifier/task_verifier_test.go | 322 +++++++++++++++++++++++++++++- 5 files changed, 414 insertions(+), 29 deletions(-) diff --git a/verifier/const.go b/verifier/const.go index 69ae06977..e48ea3b8c 100644 --- a/verifier/const.go +++ b/verifier/const.go @@ -1,9 +1,14 @@ package verifier +import "time" + const ( DefaultConfigFile = "/etc/config.toml" // ConfirmationDepth is the number of blocks to wait before considering a block finalized. // This is used for calculating finalized blocks as: (latest - ConfirmationDepth) // when running standalone mode. In CL node it's HeadTracker configuration. ConfirmationDepth = 15 + // DefaultJobQueueOperationTimeout is the timeout for job queue operations across verifier components. + // Used by task_verifier and storage_writer for Consume, Complete, Retry, Fail, Cleanup operations. + DefaultJobQueueOperationTimeout = 10 * time.Second ) diff --git a/verifier/source_reader_service.go b/verifier/source_reader_service.go index dc3dc62ca..95ff4de08 100644 --- a/verifier/source_reader_service.go +++ b/verifier/source_reader_service.go @@ -541,7 +541,11 @@ func (r *SourceReaderService) sendReadyMessages(ctx context.Context, latest, fin } // Delete cursed tasks immediately (these are being dropped, not queued) + // Also remove from writingTracker so checkpoints can advance for _, msgID := range toBeDeleted { + if task, exists := r.pendingTasks[msgID]; exists { + r.writingTracker.Remove(task.Message.SourceChainSelector, msgID) + } delete(r.pendingTasks, msgID) } diff --git a/verifier/storage_writer.go b/verifier/storage_writer.go index 6208da641..08c1eda73 100644 --- a/verifier/storage_writer.go +++ b/verifier/storage_writer.go @@ -147,7 +147,10 @@ func (s *StorageWriterProcessor) run() { func (s *StorageWriterProcessor) processBatch(ctx context.Context) error { // Consume batch of results from queue - jobs, err := s.resultQueue.Consume(ctx, s.batchSize) + consumeCtx, cancel := context.WithTimeout(ctx, DefaultJobQueueOperationTimeout) + defer cancel() + + jobs, err := s.resultQueue.Consume(consumeCtx, s.batchSize) if err != nil { return fmt.Errorf("failed to consume from result queue: %w", err) } @@ -184,7 +187,10 @@ func (s *StorageWriterProcessor) processBatch(ctx context.Context) error { errorMap[job.ID] = err } - if retryErr := s.resultQueue.Retry(ctx, s.retryDelay, errorMap, jobIDs...); retryErr != nil { + retryCtx, cancel := context.WithTimeout(ctx, DefaultJobQueueOperationTimeout) + defer cancel() + + if retryErr := s.resultQueue.Retry(retryCtx, s.retryDelay, errorMap, jobIDs...); retryErr != nil { s.lggr.Errorw("Failed to schedule retry for CCV data batch", "error", retryErr, "batchSize", len(jobs), @@ -253,7 +259,10 @@ func (s *StorageWriterProcessor) processBatch(ctx context.Context) error { "retryDelay", s.retryDelay, ) - if retryErr := s.resultQueue.Retry(ctx, s.retryDelay, failedErrorMap, retriableFailedJobs...); retryErr != nil { + retryCtx, cancel := context.WithTimeout(ctx, DefaultJobQueueOperationTimeout) + defer cancel() + + if retryErr := s.resultQueue.Retry(retryCtx, s.retryDelay, failedErrorMap, retriableFailedJobs...); retryErr != nil { s.lggr.Errorw("Failed to schedule retry for failed writes", "error", retryErr, "retriableFailedCount", len(retriableFailedJobs), @@ -267,7 +276,10 @@ func (s *StorageWriterProcessor) processBatch(ctx context.Context) error { "nonRetriableFailedCount", len(nonRetriableFailedJobs), ) - if failErr := s.resultQueue.Fail(ctx, failedErrorMap, nonRetriableFailedJobs...); failErr != nil { + failCtx, cancel := context.WithTimeout(ctx, DefaultJobQueueOperationTimeout) + defer cancel() + + if failErr := s.resultQueue.Fail(failCtx, failedErrorMap, nonRetriableFailedJobs...); failErr != nil { s.lggr.Errorw("Failed to mark jobs as failed", "error", failErr, "nonRetriableFailedCount", len(nonRetriableFailedJobs), @@ -292,7 +304,10 @@ func (s *StorageWriterProcessor) processBatch(ctx context.Context) error { } // Mark successful jobs as completed in queue - if err := s.resultQueue.Complete(ctx, successfulJobs...); err != nil { + completeCtx, cancel := context.WithTimeout(ctx, DefaultJobQueueOperationTimeout) + defer cancel() + + if err := s.resultQueue.Complete(completeCtx, successfulJobs...); err != nil { s.lggr.Errorw("Failed to complete jobs in queue", "error", err, "successfulCount", len(successfulJobs), @@ -350,8 +365,11 @@ func (s *StorageWriterProcessor) updateCheckpoints(ctx context.Context, chains m } func (s *StorageWriterProcessor) cleanup(ctx context.Context) error { + cleanupCtx, cancel := context.WithTimeout(ctx, DefaultJobQueueOperationTimeout) + defer cancel() + // Cleanup archived jobs older than retention period - deleted, err := s.resultQueue.Cleanup(ctx, s.retentionPeriod) + deleted, err := s.resultQueue.Cleanup(cleanupCtx, s.retentionPeriod) if err != nil { return fmt.Errorf("failed to cleanup result queue: %w", err) } diff --git a/verifier/task_verifier.go b/verifier/task_verifier.go index 748d82484..dcf7de252 100644 --- a/verifier/task_verifier.go +++ b/verifier/task_verifier.go @@ -144,7 +144,10 @@ func (p *TaskVerifierProcessor) run() { func (p *TaskVerifierProcessor) processBatch(ctx context.Context) error { // Consume batch of tasks from queue - jobs, err := p.taskQueue.Consume(ctx, p.batchSize) + consumeCtx, cancel := context.WithTimeout(ctx, DefaultJobQueueOperationTimeout) + defer cancel() + + jobs, err := p.taskQueue.Consume(consumeCtx, p.batchSize) if err != nil { return fmt.Errorf("failed to consume from task queue: %w", err) } @@ -201,6 +204,7 @@ func (p *TaskVerifierProcessor) handleVerificationResults( completedJobIDs := make([]string, 0) retryJobIDs := make([]string, 0) retryErrors := make(map[string]error) + retryDelays := make(map[string]time.Duration) failedJobIDs := make([]string, 0) failedErrors := make(map[string]error) @@ -221,7 +225,7 @@ func (p *TaskVerifierProcessor) handleVerificationResults( if result.Error != nil { errorCount++ - p.handleVerificationError(ctx, *result.Error, jobID, &retryJobIDs, retryErrors, &failedJobIDs, failedErrors) + p.handleVerificationError(ctx, *result.Error, jobID, &retryJobIDs, retryErrors, retryDelays, &failedJobIDs, failedErrors) } else if result.Result != nil { successCount++ successfulResults = append(successfulResults, *result.Result) @@ -246,49 +250,78 @@ func (p *TaskVerifierProcessor) handleVerificationResults( // Publish successful results to ccv_storage_writer_jobs queue if len(successfulResults) > 0 { - publishCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + publishCtx, cancel := context.WithTimeout(ctx, DefaultJobQueueOperationTimeout) defer cancel() if err := p.resultQueue.Publish(publishCtx, successfulResults...); err != nil { - p.lggr.Errorw("Failed to publish verification results to queue", + p.lggr.Errorw("Failed to publish verification results to queue - jobs will remain in processing state and be reclaimed as stale locks", "error", err, "count", len(successfulResults)) - // Results are lost - consider retrying the entire batch - // For now, we'll retry the jobs to re-verify - for _, jobID := range completedJobIDs { - retryJobIDs = append(retryJobIDs, jobID) - retryErrors[jobID] = err - } - completedJobIDs = nil - } else { - p.lggr.Debugw("Published verification results to queue", "count", len(successfulResults)) + // Don't complete these jobs - leave them in 'processing' state + // They will be reclaimed as stale locks and re-processed (re-verified and published) + // This is a rare case (DB failure), and relying on stale lock reclaim is acceptable + return fmt.Errorf("failed to publish %d verification results: %w", len(successfulResults), err) } + p.lggr.Debugw("Published verification results to queue", "count", len(successfulResults)) } // Complete successfully processed jobs if len(completedJobIDs) > 0 { - if err := p.taskQueue.Complete(ctx, completedJobIDs...); err != nil { - p.lggr.Errorw("Failed to complete jobs in queue", + completeCtx, cancel := context.WithTimeout(ctx, DefaultJobQueueOperationTimeout) + defer cancel() + + if err := p.taskQueue.Complete(completeCtx, completedJobIDs...); err != nil { + p.lggr.Errorw("Failed to complete jobs - they will remain in processing state and be reclaimed as stale locks", "error", err, "count", len(completedJobIDs)) + // Don't fail the batch - let stale lock reclaim handle it + // This is a rare case (DB failure), and we want to continue processing other jobs } } // Retry jobs with retryable errors if len(retryJobIDs) > 0 { - if err := p.taskQueue.Retry(ctx, 10*time.Second, retryErrors, retryJobIDs...); err != nil { - p.lggr.Errorw("Failed to retry jobs", - "error", err, - "count", len(retryJobIDs)) + // Group jobs by retry delay to minimize Retry() calls + jobsByDelay := make(map[time.Duration][]string) + errorsByDelay := make(map[time.Duration]map[string]error) + + for _, jobID := range retryJobIDs { + delay := retryDelays[jobID] + if jobsByDelay[delay] == nil { + jobsByDelay[delay] = make([]string, 0) + errorsByDelay[delay] = make(map[string]error) + } + jobsByDelay[delay] = append(jobsByDelay[delay], jobID) + errorsByDelay[delay][jobID] = retryErrors[jobID] + } + + // Retry jobs grouped by delay + for delay, jobIDs := range jobsByDelay { + retryCtx, cancel := context.WithTimeout(ctx, DefaultJobQueueOperationTimeout) + + if err := p.taskQueue.Retry(retryCtx, delay, errorsByDelay[delay], jobIDs...); err != nil { + p.lggr.Errorw("Failed to retry jobs - they will remain in processing state and be reclaimed as stale locks", + "error", err, + "count", len(jobIDs), + "delay", delay) + // Don't fail the batch - let stale lock reclaim handle it + // This is a rare case (DB failure), and we want to continue processing other jobs + } + cancel() // Call cancel immediately after Retry, not deferred } } // Fail jobs with permanent errors if len(failedJobIDs) > 0 { - if err := p.taskQueue.Fail(ctx, failedErrors, failedJobIDs...); err != nil { - p.lggr.Errorw("Failed to mark jobs as failed", + failCtx, cancel := context.WithTimeout(ctx, DefaultJobQueueOperationTimeout) + defer cancel() + + if err := p.taskQueue.Fail(failCtx, failedErrors, failedJobIDs...); err != nil { + p.lggr.Errorw("Failed to mark jobs as failed - they will remain in processing state and be reclaimed as stale locks", "error", err, "count", len(failedJobIDs)) + // Don't fail the batch - let stale lock reclaim handle it + // This is a rare case (DB failure), and we want to continue processing other jobs } } @@ -309,6 +342,7 @@ func (p *TaskVerifierProcessor) handleVerificationError( jobID string, retryJobIDs *[]string, retryErrors map[string]error, + retryDelays map[string]time.Duration, failedJobIDs *[]string, failedErrors map[string]error, ) { @@ -334,6 +368,7 @@ func (p *TaskVerifierProcessor) handleVerificationError( if verificationError.Retryable { *retryJobIDs = append(*retryJobIDs, jobID) retryErrors[jobID] = verificationError.Error + retryDelays[jobID] = verificationError.DelayOrDefault() } else { // Increment permanent error metric p.monitoring.Metrics(). @@ -355,8 +390,11 @@ func (p *TaskVerifierProcessor) handleVerificationError( } func (p *TaskVerifierProcessor) cleanup(ctx context.Context) error { + cleanupCtx, cancel := context.WithTimeout(ctx, DefaultJobQueueOperationTimeout) + defer cancel() + // Cleanup archived jobs older than retention period - deleted, err := p.taskQueue.Cleanup(ctx, p.retentionPeriod) + deleted, err := p.taskQueue.Cleanup(cleanupCtx, p.retentionPeriod) if err != nil { return fmt.Errorf("failed to cleanup task queue: %w", err) } diff --git a/verifier/task_verifier_test.go b/verifier/task_verifier_test.go index f90c834cc..fe05b4d1f 100644 --- a/verifier/task_verifier_test.go +++ b/verifier/task_verifier_test.go @@ -743,7 +743,7 @@ func countFailedTasks(t *testing.T, db *sqlx.DB, ownerID string) int { t.Helper() var count int err := db.QueryRow(` - SELECT COUNT(*) FROM ccv_task_verifier_jobs + SELECT COUNT(*) FROM ccv_task_verifier_jobs_archive WHERE owner_id = $1 AND status = 'failed' `, ownerID).Scan(&count) require.NoError(t, err) @@ -802,3 +802,323 @@ func (f *fakeVerifierDB) VerifyMessages(_ context.Context, tasks []verifier.Veri return results } + +// TestTaskVerifierProcessorDB_CustomRetryDelays tests that custom retry delays from VerificationError are respected. +func TestTaskVerifierProcessorDB_CustomRetryDelays(t *testing.T) { + t.Parallel() + + db := testutil.NewTestDB(t) + + t.Run("uses custom delay from VerificationError", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + + lggr := logger.Nop() + ownerID := "test-" + t.Name() + + taskQueue, err := jobqueue.NewPostgresJobQueue[verifier.VerificationTask]( + db, + jobqueue.QueueConfig{ + Name: verifier.TaskVerifierJobsTableName, + OwnerID: ownerID, + RetryDuration: time.Hour, + LockDuration: time.Minute, + }, + lggr, + ) + require.NoError(t, err) + + resultQueue, err := jobqueue.NewPostgresJobQueue[protocol.VerifierNodeResult]( + db, + jobqueue.QueueConfig{ + Name: verifier.StorageWriterJobsTableName, + OwnerID: ownerID, + RetryDuration: time.Hour, + LockDuration: time.Minute, + }, + lggr, + ) + require.NoError(t, err) + + message := protocol.Message{SequenceNumber: 1, SourceChainSelector: 1337} + messageID := message.MustMessageID() + task := verifier.VerificationTask{ + MessageID: messageID.String(), + Message: message, + } + + // Configure verifier with custom short delay (100ms) + customDelay := 100 * time.Millisecond + mockVerifier := &fakeVerifierDB{} + mockVerifier.SetErrors(map[string]verifier.VerificationError{ + messageID.String(): { + Task: task, + Retryable: true, + Delay: &customDelay, // Custom delay + Error: errors.New("transient error with custom delay"), + }, + }) + + processor, err := verifier.NewTaskVerifierProcessorDBWithPollInterval( + lggr, + ownerID, + mockVerifier, + monitoring.NewFakeVerifierMonitoring(), + taskQueue, + resultQueue, + verifier.NewPendingWritingTracker(lggr), + 10, + 50*time.Millisecond, + ) + require.NoError(t, err) + + require.NoError(t, processor.Start(ctx)) + t.Cleanup(func() { + require.NoError(t, processor.Close()) + }) + + require.NoError(t, taskQueue.Publish(ctx, task)) + + // Wait for first attempt + require.Eventually(t, func() bool { + return mockVerifier.GetProcessedCount() >= 1 + }, tests.WaitTimeout(t), 50*time.Millisecond) + + // Ensure job moved to pending state after first failure + require.Eventually(t, func() bool { + return countPendingTasks(t, db, ownerID) == 1 + }, tests.WaitTimeout(t), 50*time.Millisecond, "Job should be in pending state after first failure") + + // Record when we know the job is pending + pendingAt := time.Now() + + // Wait for custom delay (100ms) plus buffer + time.Sleep(customDelay + 100*time.Millisecond) + + // Verify it gets retried (processed count increases) + require.Eventually(t, func() bool { + return mockVerifier.GetProcessedCount() >= 2 + }, tests.WaitTimeout(t), 50*time.Millisecond, "Task should be retried after custom delay") + + // Verify the delay was approximately correct (should be >= 100ms from pendingAt) + retryDuration := time.Since(pendingAt) + require.GreaterOrEqual(t, retryDuration, customDelay, "Retry should wait at least the custom delay") + }) + + t.Run("groups jobs by delay for efficient retry", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + + lggr := logger.Nop() + ownerID := "test-" + t.Name() + + taskQueue, err := jobqueue.NewPostgresJobQueue[verifier.VerificationTask]( + db, + jobqueue.QueueConfig{ + Name: verifier.TaskVerifierJobsTableName, + OwnerID: ownerID, + RetryDuration: time.Hour, + LockDuration: time.Minute, + }, + lggr, + ) + require.NoError(t, err) + + resultQueue, err := jobqueue.NewPostgresJobQueue[protocol.VerifierNodeResult]( + db, + jobqueue.QueueConfig{ + Name: verifier.StorageWriterJobsTableName, + OwnerID: ownerID, + RetryDuration: time.Hour, + LockDuration: time.Minute, + }, + lggr, + ) + require.NoError(t, err) + + // Create 3 tasks with different delays + shortDelay := 100 * time.Millisecond + longDelay := 500 * time.Millisecond + + task1 := verifier.VerificationTask{ + MessageID: "msg1", + Message: protocol.Message{SequenceNumber: 1, SourceChainSelector: 1337}, + } + task2 := verifier.VerificationTask{ + MessageID: "msg2", + Message: protocol.Message{SequenceNumber: 2, SourceChainSelector: 1337}, + } + task3 := verifier.VerificationTask{ + MessageID: "msg3", + Message: protocol.Message{SequenceNumber: 3, SourceChainSelector: 1337}, + } + + mockVerifier := &fakeVerifierDB{} + mockVerifier.SetErrors(map[string]verifier.VerificationError{ + "msg1": {Retryable: true, Delay: &shortDelay, Error: errors.New("error1")}, + "msg2": {Retryable: true, Delay: &shortDelay, Error: errors.New("error2")}, // Same delay as msg1 + "msg3": {Retryable: true, Delay: &longDelay, Error: errors.New("error3")}, // Different delay + }) + + processor, err := verifier.NewTaskVerifierProcessorDBWithPollInterval( + lggr, + ownerID, + mockVerifier, + monitoring.NewFakeVerifierMonitoring(), + taskQueue, + resultQueue, + verifier.NewPendingWritingTracker(lggr), + 10, + 50*time.Millisecond, + ) + require.NoError(t, err) + + require.NoError(t, processor.Start(ctx)) + t.Cleanup(func() { + require.NoError(t, processor.Close()) + }) + + require.NoError(t, taskQueue.Publish(ctx, task1, task2, task3)) + + // Wait for all 3 to be processed initially and moved to pending state + require.Eventually(t, func() bool { + return mockVerifier.GetProcessedCount() >= 3 + }, tests.WaitTimeout(t), 50*time.Millisecond) + + // Wait for all jobs to be in pending state (may take a moment after processing) + require.Eventually(t, func() bool { + return countPendingTasks(t, db, ownerID) == 3 + }, tests.WaitTimeout(t), 50*time.Millisecond, "All 3 jobs should be pending with their respective delays") + + // After shortDelay, msg1 and msg2 should be available (they have same delay) + time.Sleep(shortDelay + 100*time.Millisecond) // Add buffer for processing + + // msg1 and msg2 should be retried (grouped together because same delay) + require.Eventually(t, func() bool { + return mockVerifier.GetProcessedCount() >= 5 // 3 initial + 2 retries + }, tests.WaitTimeout(t), 50*time.Millisecond, "msg1 and msg2 should be retried together") + + // Wait for msg3's longer delay + time.Sleep(longDelay - shortDelay + 100*time.Millisecond) // Remaining time plus buffer + + // Now msg3 should also be retried + require.Eventually(t, func() bool { + return mockVerifier.GetProcessedCount() >= 6 // 3 initial + 2 + 1 + }, tests.WaitTimeout(t), 50*time.Millisecond, "msg3 should be retried after its longer delay") + }) +} + +// TestTaskVerifierProcessorDB_PublishFailureHandling tests stale lock reclaim mechanism. +func TestTaskVerifierProcessorDB_PublishFailureHandling(t *testing.T) { + t.Parallel() + + db := testutil.NewTestDB(t) + + t.Run("verifies stale lock reclaim mechanism is configured", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + + lggr := logger.Nop() + ownerID := "test-" + t.Name() + + shortLockDuration := 200 * time.Millisecond + + taskQueue, err := jobqueue.NewPostgresJobQueue[verifier.VerificationTask]( + db, + jobqueue.QueueConfig{ + Name: verifier.TaskVerifierJobsTableName, + OwnerID: ownerID, + RetryDuration: time.Hour, + LockDuration: shortLockDuration, + }, + lggr, + ) + require.NoError(t, err) + + resultQueue, err := jobqueue.NewPostgresJobQueue[protocol.VerifierNodeResult]( + db, + jobqueue.QueueConfig{ + Name: verifier.StorageWriterJobsTableName, + OwnerID: ownerID, + RetryDuration: time.Hour, + LockDuration: time.Minute, + }, + lggr, + ) + require.NoError(t, err) + + message := protocol.Message{SequenceNumber: 1, SourceChainSelector: 1337} + messageID := message.MustMessageID() + task := verifier.VerificationTask{ + MessageID: messageID.String(), + Message: message, + } + + // Use a verifier that succeeds (normal case) + mockVerifier := &fakeVerifierDB{} + + processor, err := verifier.NewTaskVerifierProcessorDBWithPollInterval( + lggr, + ownerID, + mockVerifier, + monitoring.NewFakeVerifierMonitoring(), + taskQueue, + resultQueue, + verifier.NewPendingWritingTracker(lggr), + 10, + 50*time.Millisecond, + ) + require.NoError(t, err) + + require.NoError(t, processor.Start(ctx)) + t.Cleanup(func() { + require.NoError(t, processor.Close()) + }) + + require.NoError(t, taskQueue.Publish(ctx, task)) + + // Job should be processed successfully + require.Eventually(t, func() bool { + return mockVerifier.GetProcessedCount() >= 1 + }, tests.WaitTimeout(t), 50*time.Millisecond, "Job should be processed") + + // Job should be completed and archived + require.Eventually(t, func() bool { + return countCompletedTasks(t, db, ownerID) == 1 + }, tests.WaitTimeout(t), 50*time.Millisecond, "Job should be completed") + + // Verify result was published + require.Eventually(t, func() bool { + return countVerificationResults(t, db, ownerID) == 1 + }, tests.WaitTimeout(t), 50*time.Millisecond, "Result should be published") + + // This test primarily verifies that the stale lock mechanism is configured + // and the normal flow works. The actual stale lock reclaim is tested in + // TestTaskVerifierProcessorDB_StaleJobRecovery which has a dedicated test for it. + }) +} + +// Helper functions + +func countPendingTasks(t *testing.T, db *sqlx.DB, ownerID string) int { + t.Helper() + var count int + err := db.QueryRow(` + SELECT COUNT(*) FROM ccv_task_verifier_jobs + WHERE owner_id = $1 AND status = 'pending' + `, ownerID).Scan(&count) + require.NoError(t, err) + return count +} + +func countCompletedTasks(t *testing.T, db *sqlx.DB, ownerID string) int { + t.Helper() + var count int + // Completed tasks are in the archive + err := db.QueryRow(` + SELECT COUNT(*) FROM ccv_task_verifier_jobs_archive + WHERE owner_id = $1 AND status = 'completed' + `, ownerID).Scan(&count) + require.NoError(t, err) + return count +} From 30af209f86193a0f681006938f2b72c03db24a4c Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 12 Mar 2026 17:34:36 +0100 Subject: [PATCH 04/14] Fix in srs --- verifier/task_verifier_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/verifier/task_verifier_test.go b/verifier/task_verifier_test.go index b1f2316e8..1da0d04ff 100644 --- a/verifier/task_verifier_test.go +++ b/verifier/task_verifier_test.go @@ -872,6 +872,7 @@ func TestTaskVerifierProcessorDB_CustomRetryDelays(t *testing.T) { ownerID, mockVerifier, monitoring.NewFakeVerifierMonitoring(), + verifier.NoopLatencyTracker{}, taskQueue, resultQueue, verifier.NewPendingWritingTracker(lggr), @@ -973,6 +974,7 @@ func TestTaskVerifierProcessorDB_CustomRetryDelays(t *testing.T) { ownerID, mockVerifier, monitoring.NewFakeVerifierMonitoring(), + verifier.NoopLatencyTracker{}, taskQueue, resultQueue, verifier.NewPendingWritingTracker(lggr), @@ -1070,6 +1072,7 @@ func TestTaskVerifierProcessorDB_PublishFailureHandling(t *testing.T) { ownerID, mockVerifier, monitoring.NewFakeVerifierMonitoring(), + verifier.NoopLatencyTracker{}, taskQueue, resultQueue, verifier.NewPendingWritingTracker(lggr), From c32d3e8127bca161e50f556d9686497e50b26497 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Fri, 13 Mar 2026 10:14:45 +0100 Subject: [PATCH 05/14] Comment fix --- verifier/jobqueue/interface.go | 4 ++-- verifier/jobqueue/observability_decorator.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/verifier/jobqueue/interface.go b/verifier/jobqueue/interface.go index edd8ccb8f..a658ac12d 100644 --- a/verifier/jobqueue/interface.go +++ b/verifier/jobqueue/interface.go @@ -61,8 +61,8 @@ type JobQueue[T Jobable] interface { // are considered stale (e.g. from a crashed worker) and are automatically reclaimed. // Returns empty slice if no jobs are available. // - // Note: Jobs in 'failed' status are NOT consumed. They remain in the active table - // until archived by Cleanup or manually investigated. + // Note: Jobs in 'failed' status are NOT consumed. + // Failed jobs are moved to the archive by Fail() or Retry() when retry deadline is exceeded. // // The implementation should use SELECT FOR UPDATE SKIP LOCKED to ensure // concurrent consumers don't compete for the same jobs. diff --git a/verifier/jobqueue/observability_decorator.go b/verifier/jobqueue/observability_decorator.go index 542e754fc..73e026fee 100644 --- a/verifier/jobqueue/observability_decorator.go +++ b/verifier/jobqueue/observability_decorator.go @@ -64,7 +64,7 @@ func NewObservabilityDecorator[T Jobable]( } // Start begins the observability monitoring loop. -func (d *ObservabilityDecorator[T]) Start(ctx context.Context) error { +func (d *ObservabilityDecorator[T]) Start(_ context.Context) error { return d.StartOnce(d.Name(), func() error { d.lggr.Infow("Starting JobQueue observability monitoring", "queue", d.queue.Name(), From 82e66b427368fb1f8a8969aa88856c90a65323fb Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Fri, 13 Mar 2026 10:26:32 +0100 Subject: [PATCH 06/14] Comment fix --- verifier/jobqueue/README.md | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/verifier/jobqueue/README.md b/verifier/jobqueue/README.md index 24556457e..af96287db 100644 --- a/verifier/jobqueue/README.md +++ b/verifier/jobqueue/README.md @@ -229,24 +229,6 @@ log.Printf("Queue size: %d", size) - **Multiple Consumers**: Safe - `SELECT FOR UPDATE SKIP LOCKED` ensures no duplicate consumption - **Concurrent Operations**: Safe - all operations use database transactions -## Error Handling Best Practices - -### Transient Errors (Should Retry) -- Network timeouts -- Temporary database unavailability -- Rate limit errors -- 503 Service Unavailable - -**Action**: Call `Retry()` with appropriate delay - -### Permanent Errors (Should Fail) -- Invalid data format -- Authorization failures -- 404 Not Found -- Business logic violations - -**Action**: Call `Fail()` to archive immediately - ### Retry Strategy Example ```go From 51b87f66ea2296230b4c073e15a00010fc17a263 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Fri, 13 Mar 2026 12:17:15 +0100 Subject: [PATCH 07/14] Mering some tests together to reuse single database container --- verifier/jobqueue/postgres_queue_test.go | 723 ++++++++++++----------- 1 file changed, 376 insertions(+), 347 deletions(-) diff --git a/verifier/jobqueue/postgres_queue_test.go b/verifier/jobqueue/postgres_queue_test.go index 50fe38fe7..56e47dede 100644 --- a/verifier/jobqueue/postgres_queue_test.go +++ b/verifier/jobqueue/postgres_queue_test.go @@ -72,196 +72,420 @@ func countAllRows(t *testing.T, ds sqlutil.DataSource, table string) int { return count } -func TestPublishAndConsume(t *testing.T) { - q, _ := newTestQueue(t) +// countRowsWithOwner counts rows filtered by status and ownerID. +func countRowsWithOwner(t *testing.T, ds sqlutil.DataSource, table string, status jobqueue.JobStatus, ownerID string) int { + t.Helper() + var count int + err := ds.QueryRowxContext(context.Background(), + fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE status = $1 AND owner_id = $2", table), + string(status), ownerID).Scan(&count) + require.NoError(t, err) + return count +} + +// countAllRowsWithOwner counts all rows belonging to ownerID. +func countAllRowsWithOwner(t *testing.T, ds sqlutil.DataSource, table, ownerID string) int { + t.Helper() + var count int + err := ds.QueryRowxContext(context.Background(), + fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE owner_id = $1", table), + ownerID).Scan(&count) + require.NoError(t, err) + return count +} + +func Test_PostgresQueueOps(t *testing.T) { + db := testutil.NewTestDB(t) ctx := context.Background() - jobs := []testJob{ - {Chain: 1, Message: []byte("msg-1"), Data: "payload-1"}, - {Chain: 1, Message: []byte("msg-2"), Data: "payload-2"}, - {Chain: 2, Message: []byte("msg-3"), Data: "payload-3"}, + // Helper to create a queue with ownerID = t.Name() for each subtest + newQueue := func(t *testing.T, opts ...func(*jobqueue.QueueConfig)) *jobqueue.PostgresJobQueue[testJob] { + t.Helper() + cfg := jobqueue.QueueConfig{ + Name: verifier.TaskVerifierJobsTableName, + OwnerID: t.Name(), + RetryDuration: time.Hour, + LockDuration: time.Minute, + } + for _, o := range opts { + o(&cfg) + } + q, err := jobqueue.NewPostgresJobQueue[testJob](db, cfg, logger.Test(t)) + require.NoError(t, err) + return q } - // Publish - require.NoError(t, q.Publish(ctx, jobs...)) + t.Run("PublishAndConsume", func(t *testing.T) { + q := newQueue(t) - // Consume all - consumed, err := q.Consume(ctx, 10) - require.NoError(t, err) - require.Len(t, consumed, 3) + jobs := []testJob{ + {Chain: 1, Message: []byte("msg-1"), Data: "payload-1"}, + {Chain: 1, Message: []byte("msg-2"), Data: "payload-2"}, + {Chain: 2, Message: []byte("msg-3"), Data: "payload-3"}, + } - // Verify payload round-trip - payloads := map[string]testJob{} - for _, j := range consumed { - payloads[string(j.Payload.Message)] = j.Payload - assert.Equal(t, j.Payload.Chain, j.ChainSelector) - assert.Equal(t, j.Payload.Message, j.MessageID) - assert.Equal(t, 1, j.AttemptCount) - assert.WithinDuration(t, time.Now().Add(time.Hour), j.RetryDeadline, 5*time.Second) - assert.NotNil(t, j.StartedAt) - } - assert.Equal(t, "payload-1", payloads["msg-1"].Data) - assert.Equal(t, "payload-2", payloads["msg-2"].Data) - assert.Equal(t, "payload-3", payloads["msg-3"].Data) -} + // Publish + require.NoError(t, q.Publish(ctx, jobs...)) -func TestPublishEmpty(t *testing.T) { - q, _ := newTestQueue(t) - require.NoError(t, q.Publish(context.Background())) -} + // Consume all + consumed, err := q.Consume(ctx, 10) + require.NoError(t, err) + require.Len(t, consumed, 3) + + // Verify payload round-trip + payloads := map[string]testJob{} + for _, j := range consumed { + payloads[string(j.Payload.Message)] = j.Payload + assert.Equal(t, j.Payload.Chain, j.ChainSelector) + assert.Equal(t, j.Payload.Message, j.MessageID) + assert.Equal(t, 1, j.AttemptCount) + assert.WithinDuration(t, time.Now().Add(time.Hour), j.RetryDeadline, 5*time.Second) + assert.NotNil(t, j.StartedAt) + } + assert.Equal(t, "payload-1", payloads["msg-1"].Data) + assert.Equal(t, "payload-2", payloads["msg-2"].Data) + assert.Equal(t, "payload-3", payloads["msg-3"].Data) + }) -func TestConsumeEmpty(t *testing.T) { - q, _ := newTestQueue(t) - consumed, err := q.Consume(context.Background(), 10) - require.NoError(t, err) - assert.Empty(t, consumed) -} + t.Run("PublishEmpty", func(t *testing.T) { + q := newQueue(t) + require.NoError(t, q.Publish(ctx)) + }) -func TestConsumeRespectsAvailableAt(t *testing.T) { - q, _ := newTestQueue(t) - ctx := context.Background() + t.Run("ConsumeEmpty", func(t *testing.T) { + q := newQueue(t) + consumed, err := q.Consume(ctx, 10) + require.NoError(t, err) + assert.Empty(t, consumed) + }) - // Publish with a 1-hour delay – should NOT be consumable now - require.NoError(t, q.PublishWithDelay(ctx, time.Hour, testJob{Chain: 1, Message: []byte("delayed"), Data: "d"})) + t.Run("ConsumeRespectsAvailableAt", func(t *testing.T) { + q := newQueue(t) - consumed, err := q.Consume(ctx, 10) - require.NoError(t, err) - assert.Empty(t, consumed) -} + // Publish with a 1-hour delay – should NOT be consumable now + require.NoError(t, q.PublishWithDelay(ctx, time.Hour, testJob{Chain: 1, Message: []byte("delayed"), Data: "d"})) -func TestConsumeBatchSizeLimit(t *testing.T) { - q, _ := newTestQueue(t) - ctx := context.Background() + consumed, err := q.Consume(ctx, 10) + require.NoError(t, err) + assert.Empty(t, consumed) + }) - for i := range 5 { - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: fmt.Appendf(nil, "m-%d", i), Data: "x"})) - } + t.Run("ConsumeBatchSizeLimit", func(t *testing.T) { + q := newQueue(t) - consumed, err := q.Consume(ctx, 2) - require.NoError(t, err) - assert.Len(t, consumed, 2) -} + for i := range 5 { + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: fmt.Appendf(nil, "m-%d", i), Data: "x"})) + } -func TestConsumeDoesNotReturnProcessingJobs(t *testing.T) { - q, _ := newTestQueue(t) - ctx := context.Background() + consumed, err := q.Consume(ctx, 2) + require.NoError(t, err) + assert.Len(t, consumed, 2) + }) - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) + t.Run("ConsumeDoesNotReturnProcessingJobs", func(t *testing.T) { + q := newQueue(t) - // First consume locks the job - first, err := q.Consume(ctx, 10) - require.NoError(t, err) - require.Len(t, first, 1) + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) - // Second consume should return nothing – job is processing - second, err := q.Consume(ctx, 10) - require.NoError(t, err) - assert.Empty(t, second) -} + // First consume locks the job + first, err := q.Consume(ctx, 10) + require.NoError(t, err) + require.Len(t, first, 1) -func TestConsumeReclaimsStaleLock(t *testing.T) { - // Default queue has LockDuration=1min — used for initial consume and reclaim. - q, db := newTestQueue(t) - ctx := context.Background() + // Second consume should return nothing – job is processing + second, err := q.Consume(ctx, 10) + require.NoError(t, err) + assert.Empty(t, second) + }) - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) + t.Run("ConsumeReclaimsStaleLock", func(t *testing.T) { + q := newQueue(t) - // Consume the job (LockDuration=1min won't expire naturally during the test). - first, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, first, 1) - jobID := first[0].ID - assert.Equal(t, 1, first[0].AttemptCount) + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) - // Simulate a crashed worker by back-dating started_at to 10 minutes ago. - _, err = db.ExecContext(ctx, "UPDATE ccv_task_verifier_jobs SET started_at = NOW() - INTERVAL '10 minutes' WHERE job_id = $1", jobID) - require.NoError(t, err) + // Consume the job (LockDuration=1min won't expire naturally during the test). + first, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, first, 1) + jobID := first[0].ID + assert.Equal(t, 1, first[0].AttemptCount) - // Create a second queue with a 15-minute lock on the same DB. - // Since the job's started_at is only 10min ago, 15min lock means it's still "fresh". - qLong, err := jobqueue.NewPostgresJobQueue[testJob](db, jobqueue.QueueConfig{ - Name: verifier.TaskVerifierJobsTableName, - OwnerID: "test-verifier", - RetryDuration: time.Hour, - LockDuration: 15 * time.Minute, - }, logger.Test(t)) - require.NoError(t, err) + // Simulate a crashed worker by back-dating started_at to 10 minutes ago. + _, err = db.ExecContext(ctx, "UPDATE ccv_task_verifier_jobs SET started_at = NOW() - INTERVAL '10 minutes' WHERE job_id = $1", jobID) + require.NoError(t, err) - notReclaimed, err := qLong.Consume(ctx, 1) - require.NoError(t, err) - assert.Empty(t, notReclaimed, "job should not be reclaimed when lock has not expired") + // Create a second queue with a 15-minute lock on the same DB and ownerID. + // Since the job's started_at is only 10min ago, 15min lock means it's still "fresh". + qLong, err := jobqueue.NewPostgresJobQueue[testJob](db, jobqueue.QueueConfig{ + Name: verifier.TaskVerifierJobsTableName, + OwnerID: t.Name(), + RetryDuration: time.Hour, + LockDuration: 15 * time.Minute, + }, logger.Test(t)) + require.NoError(t, err) - // Original queue with 1-minute lock SHOULD reclaim (started_at is 10 min ago, 10 > 1). - reclaimed, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, reclaimed, 1) - assert.Equal(t, jobID, reclaimed[0].ID) - assert.Equal(t, 2, reclaimed[0].AttemptCount, "attempt_count should be incremented on reclaim") -} + notReclaimed, err := qLong.Consume(ctx, 1) + require.NoError(t, err) + assert.Empty(t, notReclaimed, "job should not be reclaimed when lock has not expired") -func TestConsumeDoesNotReclaimFreshProcessingJob(t *testing.T) { - q, _ := newTestQueue(t, func(c *jobqueue.QueueConfig) { - c.LockDuration = time.Hour + // Original queue with 1-minute lock SHOULD reclaim (started_at is 10 min ago, 10 > 1). + reclaimed, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, reclaimed, 1) + assert.Equal(t, jobID, reclaimed[0].ID) + assert.Equal(t, 2, reclaimed[0].AttemptCount, "attempt_count should be incremented on reclaim") }) - ctx := context.Background() - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) + t.Run("ConsumeDoesNotReclaimFreshProcessingJob", func(t *testing.T) { + q := newQueue(t, func(c *jobqueue.QueueConfig) { + c.LockDuration = time.Hour + }) - // Consume with the configured long lock duration. - first, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, first, 1) + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) - // A second consume should not reclaim because started_at is fresh. - second, err := q.Consume(ctx, 1) - require.NoError(t, err) - assert.Empty(t, second, "freshly consumed job should not be reclaimed while lock is still valid") -} + // Consume with the configured long lock duration. + first, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, first, 1) -func TestConsumeReclaimMultipleStaleJobs(t *testing.T) { - q, db := newTestQueue(t, func(c *jobqueue.QueueConfig) { - c.LockDuration = 10 * time.Minute + // A second consume should not reclaim because started_at is fresh. + second, err := q.Consume(ctx, 1) + require.NoError(t, err) + assert.Empty(t, second, "freshly consumed job should not be reclaimed while lock is still valid") }) - ctx := context.Background() - // Publish 5 jobs. - for i := range 5 { - require.NoError(t, q.Publish(ctx, testJob{ - Chain: 1, - Message: fmt.Appendf(nil, "stale-%d", i), - Data: "x", - })) - } + t.Run("ConsumeReclaimMultipleStaleJobs", func(t *testing.T) { + q := newQueue(t, func(c *jobqueue.QueueConfig) { + c.LockDuration = 10 * time.Minute + }) + + // Publish 5 jobs. + for i := range 5 { + require.NoError(t, q.Publish(ctx, testJob{ + Chain: 1, + Message: fmt.Appendf(nil, "stale-%d", i), + Data: "x", + })) + } + + // Consume all 5 (simulating a worker that will crash). + consumed, err := q.Consume(ctx, 10) + require.NoError(t, err) + require.Len(t, consumed, 5) - // Consume all 5 (simulating a worker that will crash). - consumed, err := q.Consume(ctx, 10) - require.NoError(t, err) - require.Len(t, consumed, 5) + // Back-date started_at for only 3 of them to simulate partial crash (20min > 10min lock). + for _, j := range consumed[:3] { + _, err = db.ExecContext(ctx, "UPDATE ccv_task_verifier_jobs SET started_at = NOW() - INTERVAL '20 minutes' WHERE job_id = $1 AND owner_id = $2", j.ID, t.Name()) + require.NoError(t, err) + } - // Back-date started_at for only 3 of them to simulate partial crash (20min > 10min lock). - for _, j := range consumed[:3] { - _, err = db.ExecContext(ctx, "UPDATE ccv_task_verifier_jobs SET started_at = NOW() - INTERVAL '20 minutes' WHERE job_id = $1", j.ID) + // Consume should reclaim exactly the 3 stale jobs. + reclaimed, err := q.Consume(ctx, 10) require.NoError(t, err) - } + assert.Len(t, reclaimed, 3) + + reclaimedIDs := map[string]bool{} + for _, j := range reclaimed { + reclaimedIDs[j.ID] = true + assert.Equal(t, 2, j.AttemptCount) + } + // Verify it's the right 3 jobs. + for _, j := range consumed[:3] { + assert.True(t, reclaimedIDs[j.ID], "expected job %s to be reclaimed", j.ID) + } + // The other 2 should not have been reclaimed. + for _, j := range consumed[3:] { + assert.False(t, reclaimedIDs[j.ID], "expected job %s to NOT be reclaimed", j.ID) + } + }) - // Consume should reclaim exactly the 3 stale jobs. - reclaimed, err := q.Consume(ctx, 10) - require.NoError(t, err) - assert.Len(t, reclaimed, 3) + t.Run("Complete", func(t *testing.T) { + q := newQueue(t) - reclaimedIDs := map[string]bool{} - for _, j := range reclaimed { - reclaimedIDs[j.ID] = true - assert.Equal(t, 2, j.AttemptCount) - } - // Verify it's the right 3 jobs. - for _, j := range consumed[:3] { - assert.True(t, reclaimedIDs[j.ID], "expected job %s to be reclaimed", j.ID) - } - // The other 2 should not have been reclaimed. - for _, j := range consumed[3:] { - assert.False(t, reclaimedIDs[j.ID], "expected job %s to NOT be reclaimed", j.ID) - } + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) + + consumed, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, consumed, 1) + + // Complete the job + require.NoError(t, q.Complete(ctx, consumed[0].ID)) + + // Main table should be empty, archive should have 1 + assert.Equal(t, 0, countAllRowsWithOwner(t, db, "ccv_task_verifier_jobs", t.Name())) + assert.Equal(t, 1, countAllRowsWithOwner(t, db, "ccv_task_verifier_jobs_archive", t.Name())) + }) + + t.Run("CompleteEmpty", func(t *testing.T) { + q := newQueue(t) + require.NoError(t, q.Complete(ctx)) + }) + + t.Run("CompleteNonExistentJob", func(t *testing.T) { + q := newQueue(t) + // Completing a non-existent job should not error, just affect 0 rows + require.NoError(t, q.Complete(ctx, "00000000-0000-0000-0000-000000000000")) + assert.Equal(t, 0, countAllRowsWithOwner(t, db, "ccv_task_verifier_jobs_archive", t.Name())) + }) + + t.Run("Retry", func(t *testing.T) { + q := newQueue(t) + + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) + + consumed, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, consumed, 1) + jobID := consumed[0].ID + + // Retry with an error message + errs := map[string]error{jobID: errors.New("transient failure")} + require.NoError(t, q.Retry(ctx, 0, errs, jobID)) + + // Job should be back to pending (retry deadline not yet reached) + assert.Equal(t, 1, countRowsWithOwner(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusPending, t.Name())) + + // Consume again (attempt 2) + consumed2, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, consumed2, 1) + assert.Equal(t, 2, consumed2[0].AttemptCount) + }) + + t.Run("RetryExceedsDeadline", func(t *testing.T) { + q := newQueue(t, func(c *jobqueue.QueueConfig) { + c.RetryDuration = time.Millisecond // expires almost immediately + }) + + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) + + // Small sleep to ensure retry_deadline has passed + time.Sleep(5 * time.Millisecond) + + consumed, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, consumed, 1) + jobID := consumed[0].ID + + errs := map[string]error{jobID: errors.New("fatal")} + require.NoError(t, q.Retry(ctx, 0, errs, jobID)) + + // Job should be archived (not in active table) because retry deadline has passed + assert.Equal(t, 0, countRowsWithOwner(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed, t.Name())) + assert.Equal(t, 0, countRowsWithOwner(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusPending, t.Name())) + + // Job should be in the archive + assert.Equal(t, 1, countRowsWithOwner(t, db, "ccv_task_verifier_jobs_archive", jobqueue.JobStatusFailed, t.Name())) + }) + + t.Run("RetryWithDelay", func(t *testing.T) { + q := newQueue(t) + + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) + + consumed, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, consumed, 1) + + // Retry with 1-hour delay + errs := map[string]error{consumed[0].ID: errors.New("oops")} + require.NoError(t, q.Retry(ctx, time.Hour, errs, consumed[0].ID)) + + // Job is pending but available_at is in the future → not consumable + second, err := q.Consume(ctx, 10) + require.NoError(t, err) + assert.Empty(t, second) + }) + + t.Run("Fail", func(t *testing.T) { + q := newQueue(t) + + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) + + consumed, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, consumed, 1) + jobID := consumed[0].ID + + errs := map[string]error{jobID: errors.New("permanent")} + require.NoError(t, q.Fail(ctx, errs, jobID)) + + // Failed job should NOT be in the active table + assert.Equal(t, 0, countRowsWithOwner(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed, t.Name())) + + // Failed job should be in the archive + assert.Equal(t, 1, countRowsWithOwner(t, db, "ccv_task_verifier_jobs_archive", jobqueue.JobStatusFailed, t.Name())) + + // Verify error is stored in the archive + var lastErr string + err = db.QueryRowxContext(ctx, "SELECT last_error FROM ccv_task_verifier_jobs_archive WHERE job_id = $1", jobID).Scan(&lastErr) + require.NoError(t, err) + assert.Equal(t, "permanent", lastErr) + }) + + t.Run("FailedJobsAreNotReconsumed", func(t *testing.T) { + q := newQueue(t) + + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) + + consumed, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, consumed, 1) + jobID := consumed[0].ID + + // Mark as failed + require.NoError(t, q.Fail(ctx, map[string]error{jobID: errors.New("err")}, jobID)) + + // Failed job should NOT be in the active table + assert.Equal(t, 0, countRowsWithOwner(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed, t.Name())) + + // Failed job should be in the archive + assert.Equal(t, 1, countRowsWithOwner(t, db, "ccv_task_verifier_jobs_archive", jobqueue.JobStatusFailed, t.Name())) + + // Should NOT be consumable again + consumed2, err := q.Consume(ctx, 10) + require.NoError(t, err) + assert.Empty(t, consumed2, "failed jobs should not be consumed") + }) + + t.Run("Cleanup", func(t *testing.T) { + q := newQueue(t) + + // Publish, consume, and complete a job so it lands in the archive + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) + consumed, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, consumed, 1) + require.NoError(t, q.Complete(ctx, consumed[0].ID)) + + assert.Equal(t, 1, countAllRowsWithOwner(t, db, "ccv_task_verifier_jobs_archive", t.Name())) + + // Back-date the completed_at so cleanup will pick it up + _, err = db.ExecContext(ctx, + "UPDATE ccv_task_verifier_jobs_archive SET completed_at = NOW() - INTERVAL '2 hours' WHERE owner_id = $1", + t.Name()) + require.NoError(t, err) + + deleted, err := q.Cleanup(ctx, time.Hour) + require.NoError(t, err) + assert.Equal(t, 1, deleted) + assert.Equal(t, 0, countAllRowsWithOwner(t, db, "ccv_task_verifier_jobs_archive", t.Name())) + }) + + t.Run("CleanupRetainsRecentJobs", func(t *testing.T) { + q := newQueue(t) + + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) + consumed, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.NoError(t, q.Complete(ctx, consumed[0].ID)) + + // Cleanup with very long retention – nothing should be deleted + deleted, err := q.Cleanup(ctx, 24*time.Hour) + require.NoError(t, err) + assert.Equal(t, 0, deleted) + assert.Equal(t, 1, countAllRowsWithOwner(t, db, "ccv_task_verifier_jobs_archive", t.Name())) + }) } func TestConsumeReclaimConcurrentNoDuplicates(t *testing.T) { @@ -327,201 +551,6 @@ func TestConsumeReclaimConcurrentNoDuplicates(t *testing.T) { assert.Equal(t, numJobs, count, "all stale jobs should have been reclaimed exactly once") } -func TestComplete(t *testing.T) { - q, db := newTestQueue(t) - ctx := context.Background() - - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) - - consumed, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, consumed, 1) - - // Complete the job - require.NoError(t, q.Complete(ctx, consumed[0].ID)) - - // Main table should be empty, archive should have 1 - assert.Equal(t, 0, countAllRows(t, db, "ccv_task_verifier_jobs")) - assert.Equal(t, 1, countAllRows(t, db, "ccv_task_verifier_jobs_archive")) -} - -func TestCompleteEmpty(t *testing.T) { - q, _ := newTestQueue(t) - require.NoError(t, q.Complete(context.Background())) -} - -func TestCompleteNonExistentJob(t *testing.T) { - q, db := newTestQueue(t) - // Completing a non-existent job should not error, just affect 0 rows - require.NoError(t, q.Complete(context.Background(), "00000000-0000-0000-0000-000000000000")) - assert.Equal(t, 0, countAllRows(t, db, "ccv_task_verifier_jobs_archive")) -} - -func TestRetry(t *testing.T) { - q, db := newTestQueue(t) - ctx := context.Background() - - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) - - consumed, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, consumed, 1) - jobID := consumed[0].ID - - // Retry with an error message - errs := map[string]error{jobID: errors.New("transient failure")} - require.NoError(t, q.Retry(ctx, 0, errs, jobID)) - - // Job should be back to pending (retry deadline not yet reached) - assert.Equal(t, 1, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusPending)) - - // Consume again (attempt 2) - consumed2, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, consumed2, 1) - assert.Equal(t, 2, consumed2[0].AttemptCount) -} - -func TestRetryExceedsDeadline(t *testing.T) { - q, db := newTestQueue(t, func(c *jobqueue.QueueConfig) { - c.RetryDuration = time.Millisecond // expires almost immediately - }) - ctx := context.Background() - - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) - - // Small sleep to ensure retry_deadline has passed - time.Sleep(5 * time.Millisecond) - - consumed, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, consumed, 1) - jobID := consumed[0].ID - - errs := map[string]error{jobID: errors.New("fatal")} - require.NoError(t, q.Retry(ctx, 0, errs, jobID)) - - // Job should be archived (not in active table) because retry deadline has passed - assert.Equal(t, 0, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed)) - assert.Equal(t, 0, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusPending)) - - // Job should be in the archive - assert.Equal(t, 1, countRows(t, db, "ccv_task_verifier_jobs_archive", jobqueue.JobStatusFailed)) -} - -func TestRetryWithDelay(t *testing.T) { - q, _ := newTestQueue(t) - ctx := context.Background() - - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) - - consumed, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, consumed, 1) - - // Retry with 1-hour delay - errs := map[string]error{consumed[0].ID: errors.New("oops")} - require.NoError(t, q.Retry(ctx, time.Hour, errs, consumed[0].ID)) - - // Job is pending but available_at is in the future → not consumable - second, err := q.Consume(ctx, 10) - require.NoError(t, err) - assert.Empty(t, second) -} - -func TestFail(t *testing.T) { - q, db := newTestQueue(t) - ctx := context.Background() - - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) - - consumed, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, consumed, 1) - jobID := consumed[0].ID - - errs := map[string]error{jobID: errors.New("permanent")} - require.NoError(t, q.Fail(ctx, errs, jobID)) - - // Failed job should NOT be in the active table - assert.Equal(t, 0, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed)) - - // Failed job should be in the archive - assert.Equal(t, 1, countRows(t, db, "ccv_task_verifier_jobs_archive", jobqueue.JobStatusFailed)) - - // Verify error is stored in the archive - var lastErr string - err = db.QueryRowxContext(ctx, "SELECT last_error FROM ccv_task_verifier_jobs_archive WHERE job_id = $1", jobID).Scan(&lastErr) - require.NoError(t, err) - assert.Equal(t, "permanent", lastErr) -} - -// Failed jobs are NOT re-consumable and should be archived. -func TestFailedJobsAreNotReconsumed(t *testing.T) { - q, db := newTestQueue(t) - ctx := context.Background() - - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) - - consumed, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, consumed, 1) - jobID := consumed[0].ID - - // Mark as failed - require.NoError(t, q.Fail(ctx, map[string]error{jobID: errors.New("err")}, jobID)) - - // Failed job should NOT be in the active table - assert.Equal(t, 0, countRows(t, db, "ccv_task_verifier_jobs", jobqueue.JobStatusFailed)) - - // Failed job should be in the archive - assert.Equal(t, 1, countRows(t, db, "ccv_task_verifier_jobs_archive", jobqueue.JobStatusFailed)) - - // Should NOT be consumable again - consumed2, err := q.Consume(ctx, 10) - require.NoError(t, err) - assert.Empty(t, consumed2, "failed jobs should not be consumed") -} - -func TestCleanup(t *testing.T) { - q, db := newTestQueue(t) - ctx := context.Background() - - // Publish, consume, and complete a job so it lands in the archive - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) - consumed, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, consumed, 1) - require.NoError(t, q.Complete(ctx, consumed[0].ID)) - - assert.Equal(t, 1, countAllRows(t, db, "ccv_task_verifier_jobs_archive")) - - // Back-date the completed_at so cleanup will pick it up - _, err = db.ExecContext(ctx, "UPDATE ccv_task_verifier_jobs_archive SET completed_at = NOW() - INTERVAL '2 hours'") - require.NoError(t, err) - - deleted, err := q.Cleanup(ctx, time.Hour) - require.NoError(t, err) - assert.Equal(t, 1, deleted) - assert.Equal(t, 0, countAllRows(t, db, "ccv_task_verifier_jobs_archive")) -} - -func TestCleanupRetainsRecentJobs(t *testing.T) { - q, db := newTestQueue(t) - ctx := context.Background() - - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: []byte("m1"), Data: "x"})) - consumed, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.NoError(t, q.Complete(ctx, consumed[0].ID)) - - // Cleanup with very long retention – nothing should be deleted - deleted, err := q.Cleanup(ctx, 24*time.Hour) - require.NoError(t, err) - assert.Equal(t, 0, deleted) - assert.Equal(t, 1, countAllRows(t, db, "ccv_task_verifier_jobs_archive")) -} - func TestSize(t *testing.T) { q, _ := newTestQueue(t) ctx := context.Background() From b6d335d314255eaa14b0d1a3474a6151326abc5a Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Fri, 13 Mar 2026 12:21:15 +0100 Subject: [PATCH 08/14] Mering some tests together to reuse single database container --- verifier/jobqueue/postgres_queue_test.go | 279 +++++++++++------------ 1 file changed, 139 insertions(+), 140 deletions(-) diff --git a/verifier/jobqueue/postgres_queue_test.go b/verifier/jobqueue/postgres_queue_test.go index 56e47dede..119612484 100644 --- a/verifier/jobqueue/postgres_queue_test.go +++ b/verifier/jobqueue/postgres_queue_test.go @@ -486,6 +486,145 @@ func Test_PostgresQueueOps(t *testing.T) { assert.Equal(t, 0, deleted) assert.Equal(t, 1, countAllRowsWithOwner(t, db, "ccv_task_verifier_jobs_archive", t.Name())) }) + + t.Run("Size", func(t *testing.T) { + q := newQueue(t) + + // Initially empty + size, err := q.Size(ctx) + require.NoError(t, err) + assert.Equal(t, 0, size) + + // Publish 5 jobs + jobs := []testJob{ + {Chain: 1, Message: []byte("msg-1"), Data: "data1"}, + {Chain: 2, Message: []byte("msg-2"), Data: "data2"}, + {Chain: 3, Message: []byte("msg-3"), Data: "data3"}, + {Chain: 4, Message: []byte("msg-4"), Data: "data4"}, + {Chain: 5, Message: []byte("msg-5"), Data: "data5"}, + } + require.NoError(t, q.Publish(ctx, jobs...)) + + // Size should be 5 (all pending) + size, err = q.Size(ctx) + require.NoError(t, err) + assert.Equal(t, 5, size) + + // Consume 3 jobs (now processing) + consumed, err := q.Consume(ctx, 3) + require.NoError(t, err) + require.Len(t, consumed, 3) + + // Size should still be 5 (2 pending + 3 processing) + size, err = q.Size(ctx) + require.NoError(t, err) + assert.Equal(t, 5, size) + + // Complete 2 jobs + require.NoError(t, q.Complete(ctx, consumed[0].ID, consumed[1].ID)) + + // Size should be 3 (2 pending + 1 processing) + size, err = q.Size(ctx) + require.NoError(t, err) + assert.Equal(t, 3, size) + + // Fail the last consumed job (it is archived immediately) + require.NoError(t, q.Fail(ctx, map[string]error{consumed[2].ID: errors.New("test error")}, consumed[2].ID)) + + // Size should be 2 (2 pending only - failed jobs are archived, not in active table) + size, err = q.Size(ctx) + require.NoError(t, err) + assert.Equal(t, 2, size) + + // Consume remaining jobs - should only get the 2 pending jobs + // Failed jobs are archived and NOT consumed + consumed2, err := q.Consume(ctx, 10) + require.NoError(t, err) + require.Len(t, consumed2, 2) // Only 2 pending jobs (failed job was archived) + + // Size should be 2 (2 processing) + size, err = q.Size(ctx) + require.NoError(t, err) + assert.Equal(t, 2, size) + + // Complete all remaining + require.NoError(t, q.Complete(ctx, consumed2[0].ID, consumed2[1].ID)) + + // Size should be 0 + size, err = q.Size(ctx) + require.NoError(t, err) + assert.Equal(t, 0, size) + }) + + t.Run("FullLifecycle", func(t *testing.T) { + q := newQueue(t) + + // 1. Publish + require.NoError(t, q.Publish(ctx, testJob{Chain: 42, Message: []byte("lifecycle-1"), Data: "step1"})) + + // 2. Consume (attempt 1) + consumed, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, consumed, 1) + jobID := consumed[0].ID + assert.Equal(t, 1, consumed[0].AttemptCount) + + // 3. Retry (simulate transient failure) + require.NoError(t, q.Retry(ctx, 0, map[string]error{jobID: errors.New("timeout")}, jobID)) + + // 4. Consume (attempt 2) + consumed2, err := q.Consume(ctx, 1) + require.NoError(t, err) + require.Len(t, consumed2, 1) + assert.Equal(t, jobID, consumed2[0].ID) + assert.Equal(t, 2, consumed2[0].AttemptCount) + + // 5. Complete + require.NoError(t, q.Complete(ctx, jobID)) + assert.Equal(t, 0, countAllRowsWithOwner(t, db, "ccv_task_verifier_jobs", t.Name())) + assert.Equal(t, 1, countAllRowsWithOwner(t, db, "ccv_task_verifier_jobs_archive", t.Name())) + + // 6. Back-date and cleanup + _, err = db.ExecContext(ctx, + "UPDATE ccv_task_verifier_jobs_archive SET completed_at = NOW() - INTERVAL '48 hours' WHERE owner_id = $1", + t.Name()) + require.NoError(t, err) + deleted, err := q.Cleanup(ctx, time.Hour) + require.NoError(t, err) + assert.Equal(t, 1, deleted) + }) + + t.Run("CleanupMixed", func(t *testing.T) { + q := newQueue(t) + + // Create 3 jobs, complete all + for i := range 3 { + require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: fmt.Appendf(nil, "cl-%d", i), Data: "x"})) + } + consumed, err := q.Consume(ctx, 3) + require.NoError(t, err) + require.Len(t, consumed, 3) + + ids := make([]string, 3) + for i, j := range consumed { + ids[i] = j.ID + } + require.NoError(t, q.Complete(ctx, ids...)) + assert.Equal(t, 3, countAllRowsWithOwner(t, db, "ccv_task_verifier_jobs_archive", t.Name())) + + // Back-date only 2 of them + _, err = db.ExecContext(ctx, ` + UPDATE ccv_task_verifier_jobs_archive + SET completed_at = NOW() - INTERVAL '10 hours' + WHERE job_id IN ($1, $2) AND owner_id = $3`, ids[0], ids[1], t.Name()) + require.NoError(t, err) + + // Cleanup with 1-hour retention: should delete only the 2 old ones + deleted, err := q.Cleanup(ctx, time.Hour) + require.NoError(t, err) + assert.Equal(t, 2, deleted) + assert.Equal(t, 1, countAllRowsWithOwner(t, db, "ccv_task_verifier_jobs_archive", t.Name())) + }) } func TestConsumeReclaimConcurrentNoDuplicates(t *testing.T) { @@ -551,113 +690,6 @@ func TestConsumeReclaimConcurrentNoDuplicates(t *testing.T) { assert.Equal(t, numJobs, count, "all stale jobs should have been reclaimed exactly once") } -func TestSize(t *testing.T) { - q, _ := newTestQueue(t) - ctx := context.Background() - - // Initially empty - size, err := q.Size(ctx) - require.NoError(t, err) - assert.Equal(t, 0, size) - - // Publish 5 jobs - jobs := []testJob{ - {Chain: 1, Message: []byte("msg-1"), Data: "data1"}, - {Chain: 2, Message: []byte("msg-2"), Data: "data2"}, - {Chain: 3, Message: []byte("msg-3"), Data: "data3"}, - {Chain: 4, Message: []byte("msg-4"), Data: "data4"}, - {Chain: 5, Message: []byte("msg-5"), Data: "data5"}, - } - require.NoError(t, q.Publish(ctx, jobs...)) - - // Size should be 5 (all pending) - size, err = q.Size(ctx) - require.NoError(t, err) - assert.Equal(t, 5, size) - - // Consume 3 jobs (now processing) - consumed, err := q.Consume(ctx, 3) - require.NoError(t, err) - require.Len(t, consumed, 3) - - // Size should still be 5 (2 pending + 3 processing) - size, err = q.Size(ctx) - require.NoError(t, err) - assert.Equal(t, 5, size) - - // Complete 2 jobs - require.NoError(t, q.Complete(ctx, consumed[0].ID, consumed[1].ID)) - - // Size should be 3 (2 pending + 1 processing) - size, err = q.Size(ctx) - require.NoError(t, err) - assert.Equal(t, 3, size) - - // Fail the last consumed job (it is archived immediately) - require.NoError(t, q.Fail(ctx, map[string]error{consumed[2].ID: errors.New("test error")}, consumed[2].ID)) - - // Size should be 2 (2 pending only - failed jobs are archived, not in active table) - size, err = q.Size(ctx) - require.NoError(t, err) - assert.Equal(t, 2, size) - - // Consume remaining jobs - should only get the 2 pending jobs - // Failed jobs are archived and NOT consumed - consumed2, err := q.Consume(ctx, 10) - require.NoError(t, err) - require.Len(t, consumed2, 2) // Only 2 pending jobs (failed job was archived) - - // Size should be 2 (2 processing) - size, err = q.Size(ctx) - require.NoError(t, err) - assert.Equal(t, 2, size) - - // Complete all remaining - require.NoError(t, q.Complete(ctx, consumed2[0].ID, consumed2[1].ID)) - - // Size should be 0 - size, err = q.Size(ctx) - require.NoError(t, err) - assert.Equal(t, 0, size) -} - -func TestFullLifecycle(t *testing.T) { - q, db := newTestQueue(t) - ctx := context.Background() - - // 1. Publish - require.NoError(t, q.Publish(ctx, testJob{Chain: 42, Message: []byte("lifecycle-1"), Data: "step1"})) - - // 2. Consume (attempt 1) - consumed, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, consumed, 1) - jobID := consumed[0].ID - assert.Equal(t, 1, consumed[0].AttemptCount) - - // 3. Retry (simulate transient failure) - require.NoError(t, q.Retry(ctx, 0, map[string]error{jobID: errors.New("timeout")}, jobID)) - - // 4. Consume (attempt 2) - consumed2, err := q.Consume(ctx, 1) - require.NoError(t, err) - require.Len(t, consumed2, 1) - assert.Equal(t, jobID, consumed2[0].ID) - assert.Equal(t, 2, consumed2[0].AttemptCount) - - // 5. Complete - require.NoError(t, q.Complete(ctx, jobID)) - assert.Equal(t, 0, countAllRows(t, db, "ccv_task_verifier_jobs")) - assert.Equal(t, 1, countAllRows(t, db, "ccv_task_verifier_jobs_archive")) - - // 6. Back-date and cleanup - _, err = db.ExecContext(ctx, "UPDATE ccv_task_verifier_jobs_archive SET completed_at = NOW() - INTERVAL '48 hours'") - require.NoError(t, err) - deleted, err := q.Cleanup(ctx, time.Hour) - require.NoError(t, err) - assert.Equal(t, 1, deleted) -} - func TestConcurrentPublishAndConsume(t *testing.T) { q, db := newTestQueue(t) ctx := context.Background() @@ -932,39 +964,6 @@ func TestRetryDeadlineExhaustionCycle(t *testing.T) { assert.Equal(t, "err3", lastErr) } -func TestCleanupMixed(t *testing.T) { - q, db := newTestQueue(t) - ctx := context.Background() - - // Create 3 jobs, complete all - for i := range 3 { - require.NoError(t, q.Publish(ctx, testJob{Chain: 1, Message: fmt.Appendf(nil, "cl-%d", i), Data: "x"})) - } - consumed, err := q.Consume(ctx, 3) - require.NoError(t, err) - require.Len(t, consumed, 3) - - ids := make([]string, 3) - for i, j := range consumed { - ids[i] = j.ID - } - require.NoError(t, q.Complete(ctx, ids...)) - assert.Equal(t, 3, countAllRows(t, db, "ccv_task_verifier_jobs_archive")) - - // Back-date only 2 of them - _, err = db.ExecContext(ctx, ` - UPDATE ccv_task_verifier_jobs_archive - SET completed_at = NOW() - INTERVAL '10 hours' - WHERE job_id IN ($1, $2)`, ids[0], ids[1]) - require.NoError(t, err) - - // Cleanup with 1-hour retention: should delete only the 2 old ones - deleted, err := q.Cleanup(ctx, time.Hour) - require.NoError(t, err) - assert.Equal(t, 2, deleted) - assert.Equal(t, 1, countAllRows(t, db, "ccv_task_verifier_jobs_archive")) -} - func TestConcurrentPublishStress(t *testing.T) { q, db := newTestQueue(t) ctx := context.Background() From 3a9b66b6619359dffed0a846e78b4b6d8d8e2cc9 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Fri, 13 Mar 2026 13:22:58 +0100 Subject: [PATCH 09/14] Make moving to the archive table under a single transaction to avoid orphaned records --- verifier/jobqueue/postgres_queue.go | 134 ++++++++++++++-------------- 1 file changed, 67 insertions(+), 67 deletions(-) diff --git a/verifier/jobqueue/postgres_queue.go b/verifier/jobqueue/postgres_queue.go index c6b3e6967..8556b4d99 100644 --- a/verifier/jobqueue/postgres_queue.go +++ b/verifier/jobqueue/postgres_queue.go @@ -287,24 +287,24 @@ func (q *PostgresJobQueue[T]) Retry(ctx context.Context, delay time.Duration, er availableAt := time.Now().Add(delay) - // Check if retry deadline has passed - query := fmt.Sprintf(` - UPDATE %s - SET status = CASE - WHEN NOW() >= retry_deadline THEN $1 - ELSE $2 - END, - available_at = $3, - last_error = $4 - WHERE job_id = $5 - AND owner_id = $6 - RETURNING job_id, status - `, q.tableName) - var failed []string var retried []string err := sqlutil.TransactDataSource(ctx, q.ds, nil, func(tx sqlutil.DataSource) error { + // Check if retry deadline has passed + query := fmt.Sprintf(` + UPDATE %s + SET status = CASE + WHEN NOW() >= retry_deadline THEN $1 + ELSE $2 + END, + available_at = $3, + last_error = $4 + WHERE job_id = $5 + AND owner_id = $6 + RETURNING job_id, status + `, q.tableName) + stmt, err := tx.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("failed to prepare retry statement: %w", err) @@ -347,38 +347,36 @@ func (q *PostgresJobQueue[T]) Retry(ctx context.Context, delay time.Duration, er } } - return nil - }) - if err != nil { - return err - } + // Archive jobs that exceeded the retry deadline within the same transaction + if len(failed) > 0 { + archiveQuery := fmt.Sprintf(` + WITH failed AS ( + DELETE FROM %s + WHERE job_id = ANY($1) + AND owner_id = $2 + AND status = $3 + RETURNING * + ) + INSERT INTO %s + SELECT *, NOW() as completed_at + FROM failed + `, q.tableName, q.archiveName) - // Archive jobs that exceeded the retry deadline - if len(failed) > 0 { - archiveQuery := fmt.Sprintf(` - WITH failed AS ( - DELETE FROM %s - WHERE job_id = ANY($1) - AND owner_id = $2 - AND status = $3 - RETURNING * - ) - INSERT INTO %s - SELECT *, NOW() as completed_at - FROM failed - `, q.tableName, q.archiveName) + result, err := tx.ExecContext(ctx, archiveQuery, pq.Array(failed), q.ownerID, JobStatusFailed) + if err != nil { + return fmt.Errorf("failed to archive jobs that exceeded retry deadline: %w", err) + } - result, err := q.ds.ExecContext(ctx, archiveQuery, pq.Array(failed), q.ownerID, JobStatusFailed) - if err != nil { - q.logger.Errorw("Failed to archive jobs that exceeded retry deadline", - "error", err, - "count", len(failed)) - } else { affected, _ := result.RowsAffected() q.logger.Infow("Archived jobs that exceeded retry deadline", "queue", q.config.Name, "count", affected) } + + return nil + }) + if err != nil { + return err } q.logger.Infow("Retried jobs", @@ -398,16 +396,18 @@ func (q *PostgresJobQueue[T]) Fail(ctx context.Context, errors map[string]error, return nil } - // First, update the last_error for each job - updateQuery := fmt.Sprintf(` - UPDATE %s - SET status = $1, - last_error = $2 - WHERE job_id = $3 - AND owner_id = $4 - `, q.tableName) + var affected int64 err := sqlutil.TransactDataSource(ctx, q.ds, nil, func(tx sqlutil.DataSource) error { + // First, update the last_error for each job + updateQuery := fmt.Sprintf(` + UPDATE %s + SET status = $1, + last_error = $2 + WHERE job_id = $3 + AND owner_id = $4 + `, q.tableName) + stmt, err := tx.PrepareContext(ctx, updateQuery) if err != nil { return fmt.Errorf("failed to prepare fail statement: %w", err) @@ -431,32 +431,32 @@ func (q *PostgresJobQueue[T]) Fail(ctx context.Context, errors map[string]error, } } + // Now move failed jobs to archive within the same transaction + archiveQuery := fmt.Sprintf(` + WITH failed AS ( + DELETE FROM %s + WHERE job_id = ANY($1) + AND owner_id = $2 + AND status = $3 + RETURNING * + ) + INSERT INTO %s + SELECT *, NOW() as completed_at + FROM failed + `, q.tableName, q.archiveName) + + result, err := tx.ExecContext(ctx, archiveQuery, pq.Array(jobIDs), q.ownerID, JobStatusFailed) + if err != nil { + return fmt.Errorf("failed to archive failed jobs: %w", err) + } + + affected, _ = result.RowsAffected() return nil }) if err != nil { return err } - // Now move failed jobs to archive - archiveQuery := fmt.Sprintf(` - WITH failed AS ( - DELETE FROM %s - WHERE job_id = ANY($1) - AND owner_id = $2 - AND status = $3 - RETURNING * - ) - INSERT INTO %s - SELECT *, NOW() as completed_at - FROM failed - `, q.tableName, q.archiveName) - - result, err := q.ds.ExecContext(ctx, archiveQuery, pq.Array(jobIDs), q.ownerID, JobStatusFailed) - if err != nil { - return fmt.Errorf("failed to archive failed jobs: %w", err) - } - - affected, _ := result.RowsAffected() q.logger.Infow("Failed and archived jobs", "queue", q.config.Name, "count", affected, From fb719af4073e8cfeeb1f10902bb5defb28d25515 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Fri, 13 Mar 2026 14:25:34 +0100 Subject: [PATCH 10/14] Make moving to the archive table under a single transaction to avoid orphaned records --- .../storage_writer_partial_errors_test.go | 8 ---- verifier/storage_writer_test.go | 38 ------------------- verifier/task_verifier_test.go | 25 ------------ 3 files changed, 71 deletions(-) diff --git a/verifier/storage_writer_partial_errors_test.go b/verifier/storage_writer_partial_errors_test.go index e68443915..ac3d4c089 100644 --- a/verifier/storage_writer_partial_errors_test.go +++ b/verifier/storage_writer_partial_errors_test.go @@ -18,13 +18,9 @@ import ( ) func TestStorageWriterProcessorDB_PartialBatchFailures(t *testing.T) { - t.Parallel() - db := testutil.NewTestDB(t) t.Run("retries only failed requests in a partially failed batch", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) selectiveStorage := NewSelectiveFailureStorage() @@ -107,8 +103,6 @@ func TestStorageWriterProcessorDB_PartialBatchFailures(t *testing.T) { }) t.Run("handles non-retryable failures without retry", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) nonRetryableStorage := NewNonRetryableFailureStorage() @@ -189,8 +183,6 @@ func TestStorageWriterProcessorDB_PartialBatchFailures(t *testing.T) { }) t.Run("processes mixed batch with retryable and non-retryable failures", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) mixedStorage := NewMixedFailureStorage() diff --git a/verifier/storage_writer_test.go b/verifier/storage_writer_test.go index e785a480a..7f172d747 100644 --- a/verifier/storage_writer_test.go +++ b/verifier/storage_writer_test.go @@ -23,13 +23,10 @@ import ( // TestStorageWriterProcessorDB_ProcessBatchesSuccessfully tests successful batch processing. func TestStorageWriterProcessorDB_ProcessBatchesSuccessfully(t *testing.T) { - t.Parallel() - // Shared DB instance for all subtests - unique OwnerIDs prevent collisions db := testutil.NewTestDB(t) t.Run("processes batches from queue until context cancelled with storage always succeeding", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Test(t) @@ -101,7 +98,6 @@ func TestStorageWriterProcessorDB_ProcessBatchesSuccessfully(t *testing.T) { }) t.Run("processes multiple batches concurrently", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Test(t) @@ -161,14 +157,10 @@ func TestStorageWriterProcessorDB_ProcessBatchesSuccessfully(t *testing.T) { // TestStorageWriterProcessorDB_RetryFailedBatches tests retry logic. func TestStorageWriterProcessorDB_RetryFailedBatches(t *testing.T) { - t.Parallel() - // Shared DB instance for all subtests db := testutil.NewTestDB(t) t.Run("retries failed batches after delay", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) fakeStorage := NewFakeCCVNodeDataWriter() @@ -235,8 +227,6 @@ func TestStorageWriterProcessorDB_RetryFailedBatches(t *testing.T) { }) t.Run("continues processing new batches when retry fails", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) fakeStorage := NewFakeCCVNodeDataWriter() @@ -304,8 +294,6 @@ func TestStorageWriterProcessorDB_RetryFailedBatches(t *testing.T) { }) t.Run("marks job as failed when retry deadline expires", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) fakeStorage := NewFakeCCVNodeDataWriter() @@ -368,13 +356,9 @@ func TestStorageWriterProcessorDB_RetryFailedBatches(t *testing.T) { // TestStorageWriterProcessorDB_Cleanup tests cleanup of archived results. func TestStorageWriterProcessorDB_Cleanup(t *testing.T) { - t.Parallel() - db := testutil.NewTestDB(t) t.Run("cleans up archived results older than retention period", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) fakeStorage := NewFakeCCVNodeDataWriter() @@ -459,13 +443,9 @@ func TestStorageWriterProcessorDB_Cleanup(t *testing.T) { // TestStorageWriterProcessorDB_StaleJobRecovery tests recovery of jobs stuck in processing state. func TestStorageWriterProcessorDB_StaleJobRecovery(t *testing.T) { - t.Parallel() - db := testutil.NewTestDB(t) t.Run("reclaims jobs stuck in processing state beyond lock duration", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) fakeStorage := NewFakeCCVNodeDataWriter() @@ -541,14 +521,10 @@ func TestStorageWriterProcessorDB_StaleJobRecovery(t *testing.T) { // TestStorageWriterProcessorDB_CheckpointManagement tests checkpoint functionality. func TestStorageWriterProcessorDB_CheckpointManagement(t *testing.T) { - t.Parallel() - // Shared DB instance for all subtests db := testutil.NewTestDB(t) t.Run("writes checkpoint after successful storage write", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) fakeStorage := NewFakeCCVNodeDataWriter() mockChainStatus := mocks.NewMockChainStatusManager(t) @@ -632,8 +608,6 @@ func TestStorageWriterProcessorDB_CheckpointManagement(t *testing.T) { }) t.Run("checkpoint advances monotonically", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) fakeStorage := NewFakeCCVNodeDataWriter() mockChainStatus := mocks.NewMockChainStatusManager(t) @@ -741,8 +715,6 @@ func TestStorageWriterProcessorDB_CheckpointManagement(t *testing.T) { }) t.Run("multiple chains handled independently", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) fakeStorage := NewFakeCCVNodeDataWriter() mockChainStatus := mocks.NewMockChainStatusManager(t) @@ -835,8 +807,6 @@ func TestStorageWriterProcessorDB_CheckpointManagement(t *testing.T) { }) t.Run("skips checkpoint update for disabled chains", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) fakeStorage := NewFakeCCVNodeDataWriter() mockChainStatus := mocks.NewMockChainStatusManager(t) @@ -961,8 +931,6 @@ func TestStorageWriterProcessorDB_CheckpointManagement(t *testing.T) { }) t.Run("handles error when reading chain status and skips checkpoint update", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) fakeStorage := NewFakeCCVNodeDataWriter() mockChainStatus := mocks.NewMockChainStatusManager(t) @@ -1027,8 +995,6 @@ func TestStorageWriterProcessorDB_CheckpointManagement(t *testing.T) { }) t.Run("handles missing chain in ReadChainStatuses response and skips checkpoint update", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) fakeStorage := NewFakeCCVNodeDataWriter() mockChainStatus := mocks.NewMockChainStatusManager(t) @@ -1101,11 +1067,7 @@ func TestStorageWriterProcessorDB_CheckpointManagement(t *testing.T) { // TestStorageWriterProcessorDB_ContextCancellation tests graceful shutdown. func TestStorageWriterProcessorDB_ContextCancellation(t *testing.T) { - t.Parallel() - t.Run("stops processing when context is cancelled", func(t *testing.T) { - t.Parallel() - lggr := logger.Test(t) fakeStorage := NewFakeCCVNodeDataWriter() diff --git a/verifier/task_verifier_test.go b/verifier/task_verifier_test.go index 1da0d04ff..3189bf80f 100644 --- a/verifier/task_verifier_test.go +++ b/verifier/task_verifier_test.go @@ -21,12 +21,9 @@ import ( // TestTaskVerifierProcessorDB_ProcessTasksSuccessfully tests successful task processing. func TestTaskVerifierProcessorDB_ProcessTasksSuccessfully(t *testing.T) { - t.Parallel() - db := testutil.NewTestDB(t) t.Run("processes tasks from queue and publishes results", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Nop() @@ -107,7 +104,6 @@ func TestTaskVerifierProcessorDB_ProcessTasksSuccessfully(t *testing.T) { }) t.Run("processes multiple batches concurrently", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Nop() @@ -190,12 +186,9 @@ func TestTaskVerifierProcessorDB_ProcessTasksSuccessfully(t *testing.T) { // TestTaskVerifierProcessorDB_RetryFailedTasks tests retry logic. func TestTaskVerifierProcessorDB_RetryFailedTasks(t *testing.T) { - t.Parallel() - db := testutil.NewTestDB(t) t.Run("retries failed tasks after delay", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Nop() @@ -276,7 +269,6 @@ func TestTaskVerifierProcessorDB_RetryFailedTasks(t *testing.T) { }) t.Run("does not retry permanent failures", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Nop() @@ -361,7 +353,6 @@ func TestTaskVerifierProcessorDB_RetryFailedTasks(t *testing.T) { }) t.Run("marks job as failed when retry deadline expires", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Nop() @@ -444,12 +435,9 @@ func TestTaskVerifierProcessorDB_RetryFailedTasks(t *testing.T) { // TestTaskVerifierProcessorDB_Cleanup tests cleanup of archived jobs. func TestTaskVerifierProcessorDB_Cleanup(t *testing.T) { - t.Parallel() - db := testutil.NewTestDB(t) t.Run("cleans up archived jobs older than retention period", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Nop() @@ -539,12 +527,9 @@ func TestTaskVerifierProcessorDB_Cleanup(t *testing.T) { // TestTaskVerifierProcessorDB_StaleJobRecovery tests recovery of jobs stuck in processing state. func TestTaskVerifierProcessorDB_StaleJobRecovery(t *testing.T) { - t.Parallel() - db := testutil.NewTestDB(t) t.Run("reclaims jobs stuck in processing state beyond lock duration", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Nop() @@ -639,12 +624,9 @@ func TestTaskVerifierProcessorDB_StaleJobRecovery(t *testing.T) { // TestTaskVerifierProcessorDB_Shutdown tests graceful shutdown. func TestTaskVerifierProcessorDB_Shutdown(t *testing.T) { - t.Parallel() - db := testutil.NewTestDB(t) t.Run("stops processing after close", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Nop() @@ -813,12 +795,9 @@ func (f *fakeVerifierDB) VerifyMessages(_ context.Context, tasks []verifier.Veri // TestTaskVerifierProcessorDB_CustomRetryDelays tests that custom retry delays from VerificationError are respected. func TestTaskVerifierProcessorDB_CustomRetryDelays(t *testing.T) { - t.Parallel() - db := testutil.NewTestDB(t) t.Run("uses custom delay from VerificationError", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Nop() @@ -915,7 +894,6 @@ func TestTaskVerifierProcessorDB_CustomRetryDelays(t *testing.T) { }) t.Run("groups jobs by delay for efficient retry", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Nop() @@ -1020,12 +998,9 @@ func TestTaskVerifierProcessorDB_CustomRetryDelays(t *testing.T) { // TestTaskVerifierProcessorDB_PublishFailureHandling tests stale lock reclaim mechanism. func TestTaskVerifierProcessorDB_PublishFailureHandling(t *testing.T) { - t.Parallel() - db := testutil.NewTestDB(t) t.Run("verifies stale lock reclaim mechanism is configured", func(t *testing.T) { - t.Parallel() ctx := t.Context() lggr := logger.Nop() From d645686d4c4535893221c71f81e20802b9de4db7 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Fri, 13 Mar 2026 16:10:44 +0100 Subject: [PATCH 11/14] Fixing tests --- verifier/storage_writer_partial_errors_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/verifier/storage_writer_partial_errors_test.go b/verifier/storage_writer_partial_errors_test.go index ac3d4c089..26f810bbe 100644 --- a/verifier/storage_writer_partial_errors_test.go +++ b/verifier/storage_writer_partial_errors_test.go @@ -171,14 +171,14 @@ func TestStorageWriterProcessorDB_PartialBatchFailures(t *testing.T) { stored = nonRetryableStorage.GetStored() require.NotContains(t, stored, batch[1].MessageID, "Seq 2 should never be stored (non-retryable)") - // Verify job was marked as failed in the database + // Verify job was marked as failed and moved to archive require.Eventually(t, func() bool { var count int err := db.QueryRow(` - SELECT COUNT(*) FROM ccv_storage_writer_jobs + SELECT COUNT(*) FROM ccv_storage_writer_jobs_archive WHERE owner_id = $1 AND status = 'failed' `, "test-"+t.Name()).Scan(&count) - return err == nil && count == 1 // Seq 2 should be failed + return err == nil && count == 1 // Seq 2 should be failed in archive }, tests.WaitTimeout(t), 50*time.Millisecond) }) From 5a0b342b3b92fb348559b7746b01454352c1f618 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Fri, 13 Mar 2026 17:02:12 +0100 Subject: [PATCH 12/14] Test fixes --- .../verification_coordinator_lombard_test.go | 45 +++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/verifier/verification_coordinator_lombard_test.go b/verifier/verification_coordinator_lombard_test.go index c8dc41d00..17f0b81e6 100644 --- a/verifier/verification_coordinator_lombard_test.go +++ b/verifier/verification_coordinator_lombard_test.go @@ -211,12 +211,17 @@ func Test_LombardMessages_RetryingAttestation(t *testing.T) { mockLatestBlocks(mockSetup.Reader) inMem := ccvstorage.NewInMemory() - v, err := createLombardCoordinator( + // Use shorter retry intervals for the test to avoid timeouts + // The test server returns pending attestations twice, then completed ones + // With 100ms retry delay, this should complete in ~300ms instead of 60+ seconds + v, err := createLombardCoordinatorWithRetryConfig( ts, &lombardConfig, config, sourceReaders, inMem, + 100*time.Millisecond, // attestationNotReadyRetry + 100*time.Millisecond, // anyErrorRetry ) require.NoError(t, err) @@ -255,6 +260,26 @@ func createLombardCoordinator( config verifier.CoordinatorConfig, sourceReaders map[protocol.ChainSelector]chainaccess.SourceReader, inMemStorage *ccvstorage.InMemoryCCVStorage, +) (*verifier.Coordinator, error) { + return createLombardCoordinatorWithRetryConfig( + ts, + lombardConfig, + config, + sourceReaders, + inMemStorage, + 0, // use default retry intervals + 0, + ) +} + +func createLombardCoordinatorWithRetryConfig( + ts *testSetup, + lombardConfig *lombard.LombardConfig, + config verifier.CoordinatorConfig, + sourceReaders map[protocol.ChainSelector]chainaccess.SourceReader, + inMemStorage *ccvstorage.InMemoryCCVStorage, + attestationNotReadyRetry time.Duration, + anyErrorRetry time.Duration, ) (*verifier.Coordinator, error) { noopMonitoring := monitoring.NewFakeVerifierMonitoring() noopLatencyTracker := verifier.NoopLatencyTracker{} @@ -262,8 +287,22 @@ func createLombardCoordinator( attestationService, err := lombard.NewAttestationService(ts.logger, *lombardConfig) require.NoError(ts.t, err) - lombardVerifier, err := lombard.NewVerifier(ts.logger, *lombardConfig, attestationService) - require.NoError(ts.t, err) + var lombardVerifier verifier.Verifier + if attestationNotReadyRetry > 0 || anyErrorRetry > 0 { + // Use custom retry intervals for tests + lombardVerifier = lombard.NewVerifierWithConfig( + ts.logger, + attestationService, + lombardConfig.VerifierVersion, + attestationNotReadyRetry, + anyErrorRetry, + ) + } else { + // Use default retry intervals + var err error + lombardVerifier, err = lombard.NewVerifier(ts.logger, *lombardConfig, attestationService) + require.NoError(ts.t, err) + } ccvWriter := storage.NewAttestationCCVWriter( ts.logger, From 64f57d73c2bb1ceeb38be3bca520eafe2cd15a47 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Mon, 16 Mar 2026 08:35:37 +0100 Subject: [PATCH 13/14] Fix --- verifier/jobqueue/postgres_queue.go | 56 ++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/verifier/jobqueue/postgres_queue.go b/verifier/jobqueue/postgres_queue.go index 8556b4d99..d6a843708 100644 --- a/verifier/jobqueue/postgres_queue.go +++ b/verifier/jobqueue/postgres_queue.go @@ -249,27 +249,49 @@ func (q *PostgresJobQueue[T]) Complete(ctx context.Context, jobIDs ...string) er return nil } - // Move to archive table for audit trail - // Note: This query works for both ccv_task_verifier_jobs (without task_job_id) - // and ccv_storage_writer_jobs (with task_job_id) by selecting all columns - query := fmt.Sprintf(` - WITH completed AS ( - DELETE FROM %s - WHERE job_id = ANY($1) - AND owner_id = $2 - RETURNING * - ) - INSERT INTO %s - SELECT *, NOW() as completed_at - FROM completed - `, q.tableName, q.archiveName) + var affected int64 + + err := sqlutil.TransactDataSource(ctx, q.ds, nil, func(tx sqlutil.DataSource) error { + // First, update the status to completed + updateQuery := fmt.Sprintf(` + UPDATE %s + SET status = $1 + WHERE job_id = ANY($2) + AND owner_id = $3 + `, q.tableName) - result, err := q.ds.ExecContext(ctx, query, pq.Array(jobIDs), q.ownerID) + _, err := tx.ExecContext(ctx, updateQuery, JobStatusCompleted, pq.Array(jobIDs), q.ownerID) + if err != nil { + return fmt.Errorf("failed to update job status to completed: %w", err) + } + + // Move to archive table for audit trail + // Note: This query works for both ccv_task_verifier_jobs (without task_job_id) + // and ccv_storage_writer_jobs (with task_job_id) by selecting all columns + archiveQuery := fmt.Sprintf(` + WITH completed AS ( + DELETE FROM %s + WHERE job_id = ANY($1) + AND owner_id = $2 + RETURNING * + ) + INSERT INTO %s + SELECT *, NOW() as completed_at + FROM completed + `, q.tableName, q.archiveName) + + result, err := tx.ExecContext(ctx, archiveQuery, pq.Array(jobIDs), q.ownerID) + if err != nil { + return fmt.Errorf("failed to archive completed jobs: %w", err) + } + + affected, _ = result.RowsAffected() + return nil + }) if err != nil { - return fmt.Errorf("failed to complete jobs: %w", err) + return err } - affected, _ := result.RowsAffected() q.logger.Debugw("Completed jobs", "queue", q.config.Name, "count", affected, From 87dcc710d41e8fc18b4120a23284394cf42c3c1a Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Mon, 16 Mar 2026 08:50:47 +0100 Subject: [PATCH 14/14] Fix --- verifier/storage_writer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/verifier/storage_writer_test.go b/verifier/storage_writer_test.go index 7f172d747..049c40834 100644 --- a/verifier/storage_writer_test.go +++ b/verifier/storage_writer_test.go @@ -343,7 +343,7 @@ func TestStorageWriterProcessorDB_RetryFailedBatches(t *testing.T) { require.Eventually(t, func() bool { var count int err := db.QueryRow(` - SELECT COUNT(*) FROM ccv_storage_writer_jobs + SELECT COUNT(*) FROM ccv_storage_writer_jobs_archive WHERE owner_id = $1 AND status = 'failed' `, "test-"+t.Name()).Scan(&count) return err == nil && count == 1