Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions verifier/const.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package verifier

import "time"

const (
DefaultConfigFile = "/etc/config.toml"
// ConfirmationDepth is the number of blocks to wait before considering a block finalized.
// This is used for calculating finalized blocks as: (latest - ConfirmationDepth)
// when running standalone mode. In CL node it's HeadTracker configuration.
ConfirmationDepth = 15
// DefaultJobQueueOperationTimeout is the timeout for job queue operations across verifier components.
// Used by task_verifier and storage_writer for Consume, Complete, Retry, Fail, Cleanup operations.
DefaultJobQueueOperationTimeout = 10 * time.Second
)
258 changes: 258 additions & 0 deletions verifier/jobqueue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
# Job Queue System

## Overview

The Job Queue provides a durable, PostgreSQL-backed task queue with automatic retry, failure handling, and job archiving. It's designed for reliable message processing in the CCV verifier pipeline.

## Architecture

The queue consists of two main tables:
- **Active Table** (`ccv_task_verifier_jobs` / `ccv_storage_writer_jobs`): Contains jobs that are pending or currently being processed
- **Archive Table** (`*_archive`): Contains completed and failed jobs for audit purposes

## Job States

Jobs transition through the following states:

| State | Description |
|-------|-------------|
| `pending` | Job is waiting to be consumed by a worker |
| `processing` | Job is currently being processed by a worker |
| `completed` | Job finished successfully (exists only in archive) |
| `failed` | Job permanently failed (exists only in archive) |

**Important**: Only `pending` and `processing` jobs exist in the active table. Once a job is `completed` or `failed`, it is immediately moved to the archive table.

## State Machine Diagram

```
┌────────────────────────────────────────────────────────────────────────┐
│ ACTIVE TABLE │
│ │
│ ┌─────────┐ │
│ │ Publish │ │
│ └────┬────┘ │
│ │ │
│ v │
│ ┌─────────┐ ┌────────────┐ │
│ │ pending │───Consume─────>│ processing │ │
│ └─────────┘ └──────┬─────┘ │
│ ^ │ │
│ │ │ │
│ │ Retry │ │
│ │ (within ┌──────┼────────┬──────────┐ │
│ │ deadline) │ │ │ │ │
│ │ v v v v │
│ └──────────────── Retry Complete Fail Retry │
│ │ (exceeded deadline) │
│ │ │ │
│ │ │ │
│ │ │ │
└──────────────────────────────────────┼─────────────┼───────────────────┘
│ │
│ │
v v
┌─────────────┐ ┌─────────────┐
│ Archive │ │ Archive │
│ (completed) │ │ (failed) │
└─────────────┘ └─────────────┘

ARCHIVE TABLE
```

## State Transitions

### 1. Publish → Pending

```go
err := queue.Publish(ctx, job1, job2, job3)
```

- Creates new jobs in `pending` state
- Sets `available_at` timestamp (default: immediate)
- Sets `retry_deadline` based on `RetryDuration` config
- Jobs become immediately available for consumption

### 2. Consume: Pending → Processing

```go
jobs, err := queue.Consume(ctx, batchSize)
```

**Selection Criteria:**
- Jobs in `pending` state where `available_at <= NOW()`
- Jobs in `processing` state where `started_at + LockDuration < NOW()` (stale locks)

**Effects:**
- Updates status to `processing`
- Sets `started_at` timestamp
- Increments `attempt_count`
- Locks the job using `FOR UPDATE SKIP LOCKED` (prevents duplicate consumption)

**Note**: Failed jobs are **NOT** consumed - they are archived and cannot be retried.

### 3. Complete: Processing → Archived (Completed)

```go
err := queue.Complete(ctx, jobID1, jobID2)
```

- Deletes jobs from active table
- Inserts into archive table with `completed` status
- Sets `completed_at` timestamp
- Creates permanent audit trail

### 4. Retry: Processing → Pending (or Archived if deadline exceeded)

```go
err := queue.Retry(ctx, delay, errorMap, jobID1, jobID2)
```

**If `NOW() < retry_deadline`:**
- Updates status to `pending`
- Sets `available_at = NOW() + delay`
- Records error message in `last_error`
- Job becomes available for retry after delay

**If `NOW() >= retry_deadline`:**
- Updates status to `failed`
- Records error message
- **Archives the job** (moves to archive table)
- Job will NOT be retried again

### 5. Fail: Processing → Archived (Failed)

```go
err := queue.Fail(ctx, errorMap, jobID1, jobID2)
```

- Updates status to `failed`
- Records error message in `last_error`
- **Immediately archives the job** (moves to archive table)
- Job will NOT be retried

## Key Invariant

**Every job consumed from the queue must eventually be:**
- **Completed** → archived with `completed` status
- **Failed** → archived with `failed` status
- **Retried** → returned to `pending` OR archived if retry deadline exceeded

This design ensures:
- ✅ Active table only contains pending/processing jobs
- ✅ No unbounded table growth
- ✅ Complete audit trail in archive
- ✅ Predictable resource usage

## Stale Lock Recovery

If a worker crashes while processing a job, the job remains in `processing` state. The queue automatically reclaims these "stale" jobs:

```
Worker A: Consume job → started_at = 10:00 AM → [CRASH]
Worker B: Consume (at 10:15 AM) → detects stale lock (10:15 - 10:00 > LockDuration)
Worker B: Reclaims job → attempt_count++
```

**Configuration:**
- `LockDuration`: How long a job can stay in `processing` before being reclaimed (default: 1 minute)

## Configuration

```go
type QueueConfig struct {
Name string // Queue name for logging and table naming
OwnerID string // Scopes jobs so multiple verifiers can share tables
RetryDuration time.Duration // How long jobs can be retried before permanent failure
LockDuration time.Duration // How long before a processing job is considered stale
}
```

## Usage Example

### Basic Flow

```go
// 1. Create queue
queue, err := jobqueue.NewPostgresJobQueue[MyJob](
db,
jobqueue.QueueConfig{
Name: "my_jobs",
OwnerID: "my-verifier",
RetryDuration: time.Hour,
LockDuration: time.Minute,
},
logger,
)

// 2. Publish jobs
err = queue.Publish(ctx, job1, job2, job3)

// 3. Worker: Consume and process
jobs, err := queue.Consume(ctx, 10) // batch of up to 10 jobs
for _, job := range jobs {
err := processJob(job)
if err == nil {
// Success - archive as completed
queue.Complete(ctx, job.ID)
} else if isTransientError(err) {
// Transient error - retry after delay
queue.Retry(ctx, 10*time.Second, map[string]error{job.ID: err}, job.ID)
} else {
// Permanent error - archive as failed
queue.Fail(ctx, map[string]error{job.ID: err}, job.ID)
}
}

// 4. Periodic cleanup of old archive entries
deleted, err := queue.Cleanup(ctx, 30*24*time.Hour) // delete after 30 days
```

### Delayed Publishing

```go
// Publish job that becomes available in 1 hour
err := queue.PublishWithDelay(ctx, time.Hour, job)
```

### Monitoring

```go
// Get count of pending + processing jobs
size, err := queue.Size(ctx)
log.Printf("Queue size: %d", size)
```

## Concurrency Guarantees

- **Multiple Publishers**: Safe - concurrent `Publish()` calls are isolated
- **Multiple Consumers**: Safe - `SELECT FOR UPDATE SKIP LOCKED` ensures no duplicate consumption
- **Concurrent Operations**: Safe - all operations use database transactions

### Retry Strategy Example

```go
func handleResult(queue JobQueue, job Job, err error) {
if err == nil {
queue.Complete(ctx, job.ID)
return
}

// Check retry deadline
if time.Now().After(job.RetryDeadline) {
// Too late to retry - fail permanently
queue.Fail(ctx, map[string]error{job.ID: err}, job.ID)
return
}

// Classify error
if isTransient(err) {
// Exponential backoff based on attempt count
delay := time.Duration(math.Pow(2, float64(job.AttemptCount))) * time.Second
queue.Retry(ctx, delay, map[string]error{job.ID: err}, job.ID)
} else {
// Permanent error
queue.Fail(ctx, map[string]error{job.ID: err}, job.ID)
}
}
```
11 changes: 7 additions & 4 deletions verifier/jobqueue/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ type JobQueue[T Jobable] interface {
// Useful for implementing retry backoff strategies.
PublishWithDelay(ctx context.Context, delay time.Duration, jobs ...T) error
// Consume retrieves and locks up to batchSize jobs for processing.
// Jobs in 'pending' or 'failed' status that are past their available_at time are eligible.
// Jobs in 'pending' status that are past their available_at time are eligible.
// Additionally, jobs stuck in 'processing' for longer than the configured LockDuration
// are considered stale (e.g. from a crashed worker) and are automatically reclaimed.
// Returns empty slice if no jobs are available.
//
// Note: Jobs in 'failed' status are NOT consumed.
// Failed jobs are moved to the archive by Fail() or Retry() when retry deadline is exceeded.
//
// The implementation should use SELECT FOR UPDATE SKIP LOCKED to ensure
// concurrent consumers don't compete for the same jobs.
Consume(ctx context.Context, batchSize int) ([]Job[T], error)
Expand All @@ -69,10 +72,10 @@ type JobQueue[T Jobable] interface {
Complete(ctx context.Context, jobIDs ...string) error
// Retry schedules jobs for retry after the specified delay.
// Increments attempt count and records the error message.
// If max attempts is exceeded, jobs are moved to failed status.
// If the retry deadline has been exceeded, jobs are marked as failed and archived.
Retry(ctx context.Context, delay time.Duration, errors map[string]error, jobIDs ...string) error
// Fail marks jobs as permanently failed.
// These jobs will not be retried and should be investigated.
// Fail marks jobs as permanently failed and moves them to the archive.
// These jobs will not be retried and should be investigated via the archive table.
Fail(ctx context.Context, errors map[string]error, jobIDs ...string) error
// Cleanup archives or deletes jobs older than the retention period.
// Should be called periodically to prevent unbounded table growth.
Expand Down
2 changes: 1 addition & 1 deletion verifier/jobqueue/observability_decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewObservabilityDecorator[T Jobable](
}

// Start begins the observability monitoring loop.
func (d *ObservabilityDecorator[T]) Start(ctx context.Context) error {
func (d *ObservabilityDecorator[T]) Start(_ context.Context) error {
return d.StartOnce(d.Name(), func() error {
d.lggr.Infow("Starting JobQueue observability monitoring",
"queue", d.queue.Name(),
Expand Down
Loading
Loading