Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql
- Convert SQLite JSON columns to JSONB (including migration). [PR #1224](https://github.com/riverqueue/river/pull/1224).
- Change SQLite driver operations over to use bulk inserts where possible now that sqlc has better support for `json_each`. [PR #1276](https://github.com/riverqueue/river/pull/1276)
- Detect duplicate step names across `river.ResumableStep` and return a validation error. [PR #1281](https://github.com/riverqueue/river/pull/1281)
- Jobs that didn't finish in time organically while a client was stopping and had to have their context cancelled no longer have this cancellation counted as an error. `attempt` is reset to the number it was before the job started working, `errors` is left unchanged, and `state` is made `available` so jobs are eligible to be retried immediately. [PR #1290](https://github.com/riverqueue/river/pull/1290)

### Fixed

Expand Down
61 changes: 61 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2581,6 +2581,67 @@ func Test_Client_SoftStopTimeout(t *testing.T) {
}
})

t.Run("ErroringJobGetsFreshAttempt", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, "")
config.SoftStopTimeout = 100 * time.Millisecond

firstRunDoneChan := make(chan struct{})
jobStartedChan := make(chan int64, 2)
var runCount atomic.Int32
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
switch runCount.Add(1) {
case 1:
<-ctx.Done()
close(firstRunDoneChan)
return ctx.Err()
default:
return errors.New("real job error")
}
}))

client := runNewTestClient(ctx, t, config)

insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 2})
require.NoError(t, err)

jobID := riversharedtest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes.Job.ID, jobID)

require.NoError(t, client.Stop(ctx))
riversharedtest.WaitOrTimeout(t, firstRunDoneChan)

jobAfter, err := client.driver.GetExecutor().JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: jobID, Schema: client.config.Schema})
require.NoError(t, err)
require.Equal(t, 0, jobAfter.Attempt)
require.Nil(t, jobAfter.FinalizedAt)
require.Equal(t, 2, jobAfter.MaxAttempts)
require.Equal(t, rivertype.JobStateAvailable, jobAfter.State)
require.WithinDuration(t, time.Now(), jobAfter.ScheduledAt, 2*time.Second)
require.Empty(t, jobAfter.Errors)

require.NoError(t, client.Start(ctx))

jobID = riversharedtest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes.Job.ID, jobID)

var jobAfterRealError *rivertype.JobRow
require.Eventually(t, func() bool {
var err error
jobAfterRealError, err = client.driver.GetExecutor().JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: jobID, Schema: client.config.Schema})
require.NoError(t, err)
return jobAfterRealError.State != rivertype.JobStateRunning
}, 5*time.Second, 10*time.Millisecond)

require.Equal(t, 1, jobAfterRealError.Attempt)
require.Equal(t, rivertype.JobStateRetryable, jobAfterRealError.State)
require.Len(t, jobAfterRealError.Errors, 1)
require.Equal(t, "real job error", jobAfterRealError.Errors[0].Error)
require.Less(t, time.Until(jobAfterRealError.ScheduledAt), 3*time.Second)
})

t.Run("SoftStopSucceedsBeforeTimeout", func(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 2 additions & 2 deletions internal/jobcompleter/job_completer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ func testCompleter[TCompleter JobCompleter](
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCancelled(job1.ID, time.Now(), []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, time.Now(), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateDiscarded(job3.ID, time.Now(), []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorAvailable(job4.ID, time.Now(), []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorAvailable(job4.ID, time.Now(), nil, []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorRetryable(job5.ID, time.Now(), []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozed(job6.ID, time.Now(), 10, nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozedAvailable(job7.ID, time.Now(), 10, nil)))
Expand Down Expand Up @@ -1128,7 +1128,7 @@ func benchmarkCompleter(
require.NoError(b, err)

case 3:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorAvailable(bundle.jobs[i].ID, time.Now(), []byte("{}"), nil))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorAvailable(bundle.jobs[i].ID, time.Now(), nil, []byte("{}"), nil))
require.NoError(b, err)

case 4:
Expand Down
30 changes: 26 additions & 4 deletions internal/jobexecutor/job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"github.com/riverqueue/river/internal/jobcompleter"
"github.com/riverqueue/river/internal/jobstats"
"github.com/riverqueue/river/internal/middlewarelookup"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/workunit"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivertype"
)

Expand Down Expand Up @@ -423,6 +425,7 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow,
cancelJob bool
cancelErr *rivertype.JobCancelError
)
softStopped := isSoftStopCancelError(ctx, res.Err)

logAttrs := []any{
slog.String("error", res.ErrorStr()),
Expand All @@ -434,6 +437,8 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow,
case errors.As(res.Err, &cancelErr):
cancelJob = true
e.Logger.DebugContext(ctx, e.Name+": Job cancelled explicitly", logAttrs...)
case softStopped:
e.Logger.InfoContext(ctx, e.Name+": Job stopped due to client shutdown; retrying", logAttrs...)
case res.Err != nil:
if jobRow.Attempt >= jobRow.MaxAttempts {
e.Logger.InfoContext(ctx, e.Name+": Job errored", logAttrs...)
Expand All @@ -444,11 +449,21 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow,
e.Logger.InfoContext(ctx, e.Name+": Job panicked", logAttrs...)
}

if e.ErrorHandler != nil && !cancelJob {
if e.ErrorHandler != nil && !cancelJob && !softStopped {
// Error handlers also have an opportunity to cancel the job.
cancelJob = e.invokeErrorHandler(ctx, res)
}

now := e.Time.Now()

if softStopped {
params := riverdriver.JobSetStateErrorAvailable(jobRow.ID, now, ptrutil.Ptr(max(jobRow.Attempt-1, 0)), nil, metadataUpdates)
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, params); err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Failed to make soft-stopped job available", logAttrs...)
}
return
}

attemptErr := rivertype.AttemptError{
At: e.start,
Attempt: jobRow.Attempt,
Expand All @@ -462,8 +477,6 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow,
return
}

now := e.Time.Now()

if cancelJob {
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCancelled(jobRow.ID, now, errData, metadataUpdates)); err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Failed to cancel job and report error", logAttrs...)
Expand Down Expand Up @@ -503,7 +516,7 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow,
// `available` if their retry was smaller than the scheduler's run interval.
var params *riverdriver.JobSetStateIfRunningParams
if nextRetryScheduledAt.Sub(e.Time.Now()) <= e.SchedulerInterval {
params = riverdriver.JobSetStateErrorAvailable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates)
params = riverdriver.JobSetStateErrorAvailable(jobRow.ID, nextRetryScheduledAt, nil, errData, metadataUpdates)
} else {
params = riverdriver.JobSetStateErrorRetryable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates)
}
Expand All @@ -512,6 +525,15 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow,
}
}

// isSoftStopCancelError reports whether a worker returned because the client
// was stopping and cancelled its job context. The context cause distinguishes
// client stop cancellation from ordinary worker cancellation or timeouts.
func isSoftStopCancelError(ctx context.Context, err error) bool {
return err != nil &&
errors.Is(context.Cause(ctx), rivercommon.ErrStop) &&
(errors.Is(err, context.Canceled) || errors.Is(err, rivercommon.ErrStop))
}

type withJobsAndErrorsByID interface {
ErrorsByID() map[int64]error
Jobs() []*rivertype.JobRow
Expand Down
36 changes: 36 additions & 0 deletions internal/jobexecutor/job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,42 @@ func TestJobExecutor_Execute(t *testing.T) {
require.Equal(t, rivertype.JobStateDiscarded, job.State)
})

// "Decrements attempt" means restoring attempt to the value it had before
// JobGetAvailable incremented it for this run.
t.Run("SoftStopCancelMakesJobAvailableAndDecrementsAttempt", func(t *testing.T) {
t.Parallel()

executor, bundle := setup(t)

bundle.jobRow.Attempt = bundle.jobRow.MaxAttempts
_, err := bundle.exec.JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
ID: bundle.jobRow.ID,
AttemptDoUpdate: true,
Attempt: bundle.jobRow.Attempt,
})
require.NoError(t, err)

workCtx, cancel := context.WithCancelCause(ctx)
cancel(rivercommon.ErrStop)

executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return context.Canceled }, nil).MakeUnit(bundle.jobRow)

executor.Execute(workCtx)
riversharedtest.WaitOrTimeout(t, bundle.updateCh)

job, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{
ID: bundle.jobRow.ID,
Schema: "",
})
require.NoError(t, err)
require.Nil(t, job.FinalizedAt)
require.Equal(t, bundle.jobRow.Attempt-1, job.Attempt)
require.Equal(t, bundle.jobRow.MaxAttempts, job.MaxAttempts)
require.Equal(t, rivertype.JobStateAvailable, job.State)
require.WithinDuration(t, time.Now(), job.ScheduledAt, 2*time.Second)
require.Empty(t, job.Errors)
})

t.Run("JobCancelErrorCancelsJobEvenWithRemainingAttempts", func(t *testing.T) {
t.Parallel()

Expand Down
5 changes: 4 additions & 1 deletion riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,11 @@ func JobSetStateDiscarded(id int64, finalizedAt time.Time, errData []byte, metad
}
}

func JobSetStateErrorAvailable(id int64, scheduledAt time.Time, errData []byte, metadataUpdates []byte) *JobSetStateIfRunningParams {
// JobSetStateErrorAvailable makes an errored job immediately available.
// attempt can be set to nil to leave attempt unchanged.
func JobSetStateErrorAvailable(id int64, scheduledAt time.Time, attempt *int, errData []byte, metadataUpdates []byte) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{
Attempt: attempt,
ID: id,
ErrData: errData,
MetadataDoMerge: len(metadataUpdates) > 0,
Expand Down
27 changes: 25 additions & 2 deletions riverdriver/river_driver_interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,36 @@ func TestJobSetStateDiscarded(t *testing.T) {
func TestJobSetStateErrorAvailable(t *testing.T) {
t.Parallel()

t.Run("Attempt", func(t *testing.T) {
t.Parallel()

id := int64(4)
scheduledAt := time.Now().Truncate(time.Second)
errData := []byte("error available")
attempt := 3
result := JobSetStateErrorAvailable(id, scheduledAt, &attempt, errData, nil)
require.Equal(t, id, result.ID)
require.NotNil(t, result.Attempt)
require.Equal(t, attempt, *result.Attempt)
require.Equal(t, errData, result.ErrData)
require.False(t, result.MetadataDoMerge)
require.Nil(t, result.MetadataUpdates)
require.NotNil(t, result.ScheduledAt)
require.True(t, result.ScheduledAt.Equal(scheduledAt))
require.Empty(t, result.Schema)
require.False(t, result.Snoozed)
require.Equal(t, rivertype.JobStateAvailable, result.State)
})

t.Run("EmptyMetadata", func(t *testing.T) {
t.Parallel()

id := int64(4)
scheduledAt := time.Now().Truncate(time.Second)
errData := []byte("error available")
result := JobSetStateErrorAvailable(id, scheduledAt, errData, nil)
result := JobSetStateErrorAvailable(id, scheduledAt, nil, errData, nil)
require.Equal(t, id, result.ID)
require.Nil(t, result.Attempt)
require.Equal(t, errData, result.ErrData)
require.False(t, result.MetadataDoMerge)
require.Nil(t, result.MetadataUpdates)
Expand All @@ -156,8 +178,9 @@ func TestJobSetStateErrorAvailable(t *testing.T) {
scheduledAt := time.Now().Truncate(time.Second)
errData := []byte("error available")
metadata := []byte(`{"key": "value"}`)
result := JobSetStateErrorAvailable(id, scheduledAt, errData, metadata)
result := JobSetStateErrorAvailable(id, scheduledAt, nil, errData, metadata)
require.Equal(t, id, result.ID)
require.Nil(t, result.Attempt)
require.True(t, result.MetadataDoMerge)
require.Equal(t, metadata, result.MetadataUpdates)
require.NotNil(t, result.ScheduledAt)
Expand Down
38 changes: 38 additions & 0 deletions riverdriver/riverdrivertest/job_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,44 @@ func exerciseJobUpdate[TTx any](ctx context.Context, t *testing.T, executorWithT
require.Equal(t, "foo.go:123\nbar.go:456", jobAfter.Errors[0].Trace)
})

t.Run("SetsARunningJobToAvailableWithUpdatedAttempt", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)

now := time.Now().UTC()
attempt := 2

job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
Attempt: ptrutil.Ptr(3),
MaxAttempts: ptrutil.Ptr(3),
State: ptrutil.Ptr(rivertype.JobStateRunning),
UniqueKey: []byte("unique-key"),
})

params := riverdriver.JobSetStateErrorAvailable(job.ID, now, &attempt, makeErrPayload(t, now), nil)
jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(params))
require.NoError(t, err)
jobAfter := jobsAfter[0]
require.Equal(t, attempt, jobAfter.Attempt)
require.Equal(t, rivertype.JobStateAvailable, jobAfter.State)
require.Equal(t, 3, jobAfter.MaxAttempts)
require.WithinDuration(t, now, jobAfter.ScheduledAt, time.Microsecond)

jobUpdated, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: ""})
require.NoError(t, err)
require.Equal(t, attempt, jobUpdated.Attempt)
require.Equal(t, rivertype.JobStateAvailable, jobUpdated.State)
require.Equal(t, 3, jobUpdated.MaxAttempts)
require.Equal(t, "unique-key", string(jobUpdated.UniqueKey))

require.Len(t, jobAfter.Errors, 1)
require.Equal(t, now, jobAfter.Errors[0].At)
require.Equal(t, 1, jobAfter.Errors[0].Attempt)
require.Equal(t, "fake error", jobAfter.Errors[0].Error)
require.Equal(t, "foo.go:123\nbar.go:456", jobAfter.Errors[0].Trace)
})

t.Run("DoesNotTouchAlreadyRetryableJobWithNoMetadataUpdates", func(t *testing.T) {
t.Parallel()

Expand Down
Loading