Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/riverqueue/river/internal/dblist"
Expand Down Expand Up @@ -46,11 +47,12 @@ const (
FetchPollIntervalDefault = 1 * time.Second
FetchPollIntervalMin = 1 * time.Millisecond

JobTimeoutDefault = 1 * time.Minute
MaxAttemptsDefault = rivercommon.MaxAttemptsDefault
PriorityDefault = rivercommon.PriorityDefault
QueueDefault = rivercommon.QueueDefault
QueueNumWorkersMax = 10_000
JobStuckThresholdDefault = 10 * time.Second
JobTimeoutDefault = 1 * time.Minute
MaxAttemptsDefault = rivercommon.MaxAttemptsDefault
PriorityDefault = rivercommon.PriorityDefault
QueueDefault = rivercommon.QueueDefault
QueueNumWorkersMax = 10_000
)

var (
Expand Down Expand Up @@ -192,6 +194,25 @@ type Config struct {
// instances of rivertype.JobInsertMiddleware).
JobInsertMiddleware []rivertype.JobInsertMiddleware

// JobStuckHandler is invoked when a producer detects that a job exceeded
// its timeout and did not return from context cancellation within the
// allotted JobStuckThreshold (and if it didn't, we usually assume it won't
// return at all). The handler receives minimal information about the stuck
// job and the total number of jobs currently considered stuck across the
// client.
//
// JobStuckHandler lets an implementation indicate that a new worker slot
// should be opened to replace the one now occupied by a stuck job. It can
// also be used (for example) to stop and exit the program if too many jobs
// have been reported stuck.
JobStuckHandler JobStuckHandler

// JobStuckThreshold is the amount of time after JobTimeout elapses to
// wait before a still-running job is considered stuck.
//
// Defaults to 10 seconds.
JobStuckThreshold time.Duration

// JobTimeout is the maximum amount of time a job is allowed to run before its
// context is cancelled. A timeout of zero means JobTimeoutDefault will be
// used, whereas a value of -1 means the job's context will not be cancelled
Expand Down Expand Up @@ -471,6 +492,8 @@ func (c *Config) WithDefaults() *Config {
ID: valutil.ValOrDefaultFunc(c.ID, func() string { return defaultClientID(time.Now().UTC()) }),
Hooks: c.Hooks,
JobInsertMiddleware: c.JobInsertMiddleware,
JobStuckHandler: c.JobStuckHandler,
JobStuckThreshold: cmp.Or(c.JobStuckThreshold, JobStuckThresholdDefault),
JobTimeout: cmp.Or(c.JobTimeout, JobTimeoutDefault),
Logger: logger,
MaxAttempts: cmp.Or(c.MaxAttempts, MaxAttemptsDefault),
Expand Down Expand Up @@ -521,6 +544,9 @@ func (c *Config) validate() error {
if c.JobTimeout < -1 {
return errors.New("JobTimeout cannot be negative, except for -1 (infinite)")
}
if c.JobStuckThreshold < 0 {
return errors.New("JobStuckThreshold cannot be less than zero")
}
if c.MaxAttempts < 0 {
return errors.New("MaxAttempts cannot be less than zero")
}
Expand Down Expand Up @@ -670,6 +696,7 @@ type Client[TTx any] struct {
queues *QueueBundle
services []startstop.Service
stopped <-chan struct{}
stuckJobCount atomic.Int32
subscriptionManager *subscriptionManager
testSignals clientTestSignals

Expand Down Expand Up @@ -2260,6 +2287,9 @@ func (c *Client[TTx]) producerAdd(queueName string, queueConfig QueueConfig) (*p
FetchPollInterval: cmp.Or(queueConfig.FetchPollInterval, c.config.FetchPollInterval),
HookLookupByJob: c.hookLookupByJob,
HookLookupGlobal: c.hookLookupGlobal,
JobStuckHandler: c.config.JobStuckHandler,
JobStuckCount: &c.stuckJobCount,
JobStuckThreshold: c.config.JobStuckThreshold,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
MiddlewareLookupGlobal: c.middlewareLookupGlobal,
Expand Down
16 changes: 16 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8094,6 +8094,8 @@ func Test_NewClient_Defaults(t *testing.T) {
require.Nil(t, client.config.ErrorHandler)
require.Equal(t, FetchCooldownDefault, client.config.FetchCooldown)
require.Equal(t, FetchPollIntervalDefault, client.config.FetchPollInterval)
require.Nil(t, client.config.JobStuckHandler)
require.Equal(t, JobStuckThresholdDefault, client.config.JobStuckThreshold)
require.Equal(t, JobTimeoutDefault, client.config.JobTimeout)
require.Nil(t, client.config.Hooks)
require.NotZero(t, client.baseService.Logger)
Expand All @@ -8117,6 +8119,9 @@ func Test_NewClient_Overrides(t *testing.T) {
)

errorHandler := &testErrorHandler{}
jobStuckHandler := JobStuckHandler(func(ctx context.Context, params JobStuckHandlerParams) JobStuckHandlerResult {
return JobStuckHandlerResult{}
})
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

workers := NewWorkers()
Expand Down Expand Up @@ -8146,6 +8151,8 @@ func Test_NewClient_Overrides(t *testing.T) {
FetchPollInterval: 124 * time.Millisecond,
Hooks: []rivertype.Hook{&noOpHook{}},
JobInsertMiddleware: []rivertype.JobInsertMiddleware{&noOpInsertMiddleware{}},
JobStuckHandler: jobStuckHandler,
JobStuckThreshold: 126 * time.Millisecond,
JobTimeout: 125 * time.Millisecond,
Logger: logger,
MaxAttempts: 5,
Expand Down Expand Up @@ -8184,6 +8191,8 @@ func Test_NewClient_Overrides(t *testing.T) {
require.Equal(t, 123*time.Millisecond, client.config.FetchCooldown)
require.Equal(t, 124*time.Millisecond, client.config.FetchPollInterval)
require.Len(t, client.config.JobInsertMiddleware, 1)
require.NotNil(t, client.config.JobStuckHandler)
require.Equal(t, 126*time.Millisecond, client.config.JobStuckThreshold)
require.Equal(t, 125*time.Millisecond, client.config.JobTimeout)
require.Equal(t, []rivertype.Hook{&noOpHook{}}, client.config.Hooks)
require.Equal(t, logger, client.baseService.Logger)
Expand Down Expand Up @@ -8366,6 +8375,13 @@ func Test_NewClient_Validations(t *testing.T) {
require.Equal(t, MaxAttemptsDefault, client.config.MaxAttempts)
},
},
{
name: "JobStuckThreshold cannot be less than zero",
configFunc: func(config *Config) {
config.JobStuckThreshold = -1
},
wantErr: errors.New("JobStuckThreshold cannot be less than zero"),
},
{
name: "Middleware can be configured independently",
configFunc: func(config *Config) {
Expand Down
118 changes: 118 additions & 0 deletions example_job_stuck_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package river_test

import (
"context"
"fmt"
"sync"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/testutil"
)

type StuckJobHandlerArgs struct{}

func (StuckJobHandlerArgs) Kind() string { return "stuck_job_handler" }

type StuckJobHandlerWorker struct {
river.WorkerDefaults[StuckJobHandlerArgs]

releaseJobs chan struct{}
started chan struct{}
}

func (w *StuckJobHandlerWorker) Work(ctx context.Context, job *river.Job[StuckJobHandlerArgs]) error {
w.started <- struct{}{}

// Ignore ctx.Done() to simulate a job that doesn't respond to cancellation.
<-w.releaseJobs

return nil
}

// Example_jobStuckHandler demonstrates how to use JobStuckHandler to stop a
// client when too many jobs are stuck so the process can be restarted. For the
// first couple stuck jobs it uses OpenWorkerSlot to add additional worker slots
// to replace those occupied by stuck jobs, but after maxStuckJobsBeforeRestart
// it gives up and exits so it can be restarted.
func Example_jobStuckHandler() {
ctx := context.Background()

dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()

var riverClient *river.Client[pgx.Tx]

const maxStuckJobsBeforeRestart = 2

var (
releaseJobs = make(chan struct{})
restartRequested = make(chan struct{})
started = make(chan struct{}, maxStuckJobsBeforeRestart+1)
stopOnce sync.Once
)

workers := river.NewWorkers()
river.AddWorker(workers, &StuckJobHandlerWorker{releaseJobs: releaseJobs, started: started})

riverClient, err = river.NewClient(riverpgxv5.New(dbPool), initTestConfig(ctx, dbPool, &river.Config{
JobStuckHandler: func(ctx context.Context, params river.JobStuckHandlerParams) river.JobStuckHandlerResult {
if params.TotalStuckJobs <= maxStuckJobsBeforeRestart {
fmt.Printf("stuck jobs: %d; opening replacement worker slot\n", params.TotalStuckJobs)
return river.JobStuckHandlerResult{OpenWorkerSlot: true}
}

stopOnce.Do(func() {
fmt.Printf("too many stuck jobs: %d; shutting down so the process can restart\n", params.TotalStuckJobs)
close(restartRequested)

shutdownCtx := context.WithoutCancel(ctx)
go func() {
if err := riverClient.Stop(shutdownCtx); err != nil {
panic(err)
}
}()
})

return river.JobStuckHandlerResult{}
},
JobStuckThreshold: time.Millisecond,
JobTimeout: 10 * time.Millisecond,
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 1},
},
Workers: workers,
}))
if err != nil {
panic(err)
}

if err := riverClient.Start(ctx); err != nil {
panic(err)
}

for range maxStuckJobsBeforeRestart + 1 {
if _, err := riverClient.Insert(ctx, StuckJobHandlerArgs{}, nil); err != nil {
panic(err)
}
}

riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), started, maxStuckJobsBeforeRestart+1)

<-restartRequested
close(releaseJobs)
<-riverClient.Stopped()

// Output:
// stuck jobs: 1; opening replacement worker slot
// stuck jobs: 2; opening replacement worker slot
// too many stuck jobs: 3; shutting down so the process can restart
}
23 changes: 16 additions & 7 deletions internal/jobexecutor/job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"log/slog"
"runtime"
"strings"
"sync/atomic"
"time"

"github.com/tidwall/gjson"
Expand Down Expand Up @@ -116,7 +117,7 @@ type JobExecutor struct {
MiddlewareLookupGlobal middlewarelookup.MiddlewareLookupInterface
ProducerCallbacks struct {
JobDone func(jobRow *rivertype.JobRow)
Stuck func()
Stuck func(ctx context.Context, jobRow *rivertype.JobRow)
Unstuck func()
}
SchedulerInterval time.Duration
Expand All @@ -125,8 +126,17 @@ type JobExecutor struct {
WorkUnit workunit.WorkUnit

// Meant to be used from within the job executor only.
start time.Time
stats *jobstats.JobStatistics // initialized by the executor, and handed off to completer
slotClosed atomic.Bool
start time.Time
stats *jobstats.JobStatistics // initialized by the executor, and handed off to completer
}

// TryCloseSlot marks this executor's producer slot as closed. A closed slot
// means the producer has already stopped counting this executor against its
// active worker capacity, although the executor goroutine may still be running.
// It returns true only the first time the slot is closed.
func (e *JobExecutor) TryCloseSlot() bool {
return e.slotClosed.CompareAndSwap(false, true)
}

func (e *JobExecutor) Cancel(ctx context.Context) {
Expand Down Expand Up @@ -262,9 +272,8 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
// their job timeout (plus a small margin) and don't appear to be responding to
// context cancellation (unfortunately, quite an easy error to make in Go).
//
// Currently we don't do anything if we notice a job is stuck. Knowing about
// stuck jobs is just used for informational purposes in the producer in
// generating periodic stats.
// Producers use stuck-job notifications for periodic stats and optional user
// handlers.
func (e *JobExecutor) watchStuck(ctx context.Context, jobTimeout time.Duration) context.CancelFunc {
// We add a WithoutCancel here so that this inner goroutine becomes
// immune to all context cancellations _except_ the one where it's
Expand All @@ -281,7 +290,7 @@ func (e *JobExecutor) watchStuck(ctx context.Context, jobTimeout time.Duration)
// context cancelled as we leave JobExecutor.execute

case <-time.After(jobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)):
e.ProducerCallbacks.Stuck()
e.ProducerCallbacks.Stuck(ctx, e.JobRow)

e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck",
slog.Int64("job_id", e.JobRow.ID),
Expand Down
10 changes: 5 additions & 5 deletions internal/jobexecutor/job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ func TestJobExecutor_Execute(t *testing.T) {
MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil),
ProducerCallbacks: struct {
JobDone func(jobRow *rivertype.JobRow)
Stuck func()
Stuck func(ctx context.Context, jobRow *rivertype.JobRow)
Unstuck func()
}{
JobDone: func(jobRow *rivertype.JobRow) {},
Stuck: func() {},
Stuck: func(ctx context.Context, jobRow *rivertype.JobRow) {},
Unstuck: func() {},
},
SchedulerInterval: riverinternaltest.SchedulerShortInterval,
Expand Down Expand Up @@ -720,7 +720,7 @@ func TestJobExecutor_Execute(t *testing.T) {
informProducerStuckReceived = make(chan struct{})
informProducerUnstuckReceived = make(chan struct{})
)
executor.ProducerCallbacks.Stuck = func() {
executor.ProducerCallbacks.Stuck = func(ctx context.Context, jobRow *rivertype.JobRow) {
t.Log("Job executor reported stuck")
close(informProducerStuckReceived)
}
Expand Down Expand Up @@ -761,7 +761,7 @@ func TestJobExecutor_Execute(t *testing.T) {
informProducerStuckReceived = make(chan struct{})
informProducerUnstuckReceived = make(chan struct{})
)
executor.ProducerCallbacks.Stuck = func() {
executor.ProducerCallbacks.Stuck = func(ctx context.Context, jobRow *rivertype.JobRow) {
t.Log("Job executor reported stuck")
close(informProducerStuckReceived)
}
Expand Down Expand Up @@ -809,7 +809,7 @@ func TestJobExecutor_Execute(t *testing.T) {
informProducerStuckReceived = make(chan struct{})
informProducerUnstuckReceived = make(chan struct{})
)
executor.ProducerCallbacks.Stuck = func() {
executor.ProducerCallbacks.Stuck = func(ctx context.Context, jobRow *rivertype.JobRow) {
t.Log("Job executor reported stuck")
close(informProducerStuckReceived)
}
Expand Down
Loading
Loading