From 2bb559cae7c9139acb914364d524d51ca8170685 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 20 Jun 2026 08:06:53 -0600 Subject: [PATCH] Smoother completion: Earlier backpressure + possible early completion + fewer warnings This one's largely aimed at trying to resolve a common problem that we see during benchmarks, in which the `BatchCompleter` becomes completely saturated and starts producing noisy warnings saying that it'll produce backpressure while waiting for completions: $ go run ./cmd/river bench --database-url $DATABASE_URL --num-total-jobs 300_000 bench: jobs worked [ 0 ], inserted [ 300000 ], job/sec [ 0.0 ] [0s] Jun 20 07:33:22.258 WRN jobcompleter.BatchCompleter: Hit maximum backlog; completions will wait until below threshold max_backlog=20000 Jun 20 07:33:23.464 WRN jobcompleter.BatchCompleter: Hit maximum backlog; completions will wait until below threshold max_backlog=20000 bench: jobs worked [ 74578 ], inserted [ 0 ], job/sec [ 37289.0 ] [2s] Jun 20 07:33:24.480 WRN jobcompleter.BatchCompleter: Hit maximum backlog; completions will wait until below threshold max_backlog=20000 bench: jobs worked [ 87308 ], inserted [ 0 ], job/sec [ 43654.0 ] [2s] Jun 20 07:33:25.873 WRN jobcompleter.BatchCompleter: Hit maximum backlog; completions will wait until below threshold max_backlog=20000 Jun 20 07:33:27.060 WRN jobcompleter.BatchCompleter: Hit maximum backlog; completions will wait until below threshold max_backlog=20000 bench: jobs worked [ 113346 ], inserted [ 0 ], job/sec [ 56673.0 ] [2s] bench: jobs worked [ 24768 ], inserted [ 0 ], job/sec [ 12384.0 ] [2s] bench: total jobs worked [ 300000 ], total jobs inserted [ 300000 ], overall job/sec [ 46803.8 ], running 6.409735541s Here, we modify our approach so that instead of waiting until we're at a backlog of 20k jobs in the completer, we start to apply backpressure at 10k jobs, and reserve the warnings for if we get to the 20k level, which will be much more difficult to get to. This has the effect of producing much fewer warnings as we're running a benchmark: $ go run ./cmd/river bench --database-url $DATABASE_URL --num-total-jobs 300_000 bench: jobs worked [ 0 ], inserted [ 300000 ], job/sec [ 0.0 ] [0s] bench: jobs worked [ 79849 ], inserted [ 0 ], job/sec [ 39924.5 ] [2s] bench: jobs worked [ 96590 ], inserted [ 0 ], job/sec [ 48295.0 ] [2s] bench: jobs worked [ 105726 ], inserted [ 0 ], job/sec [ 52863.0 ] [2s] bench: jobs worked [ 17835 ], inserted [ 0 ], job/sec [ 8917.5 ] [2s] bench: total jobs worked [ 300000 ], total jobs inserted [ 300000 ], overall job/sec [ 47026.1 ], running 6.379439334s In my tests, throughput is roughly the same. Before at 20k the completer was the bottleneck, and it's still the bottleneck. We do put in one minor optimization in which `BatchCompleter` will start completing as soon as it's got a full batch rather than having to wait for a 50 ms tick. This doesn't have a super significant effect on numbers though. An argument against this change is that to some degree, we'd be sweeping the slow completer under the rug. I'm receptive to that, but a couple things: * It looks bad when the benchmark process regularly produces warnings as it runs. * I think we can do better than warnings by putting in a more comprehensive stats layer that tracks specific durations for each phase of a job's life cycle (time to lock, time to work, time waiting to complete, time to complete, etc.). This seems like a much better alternative to the status quo. --- CHANGELOG.md | 1 + internal/jobcompleter/job_completer.go | 179 ++++++++++++++---- internal/jobcompleter/job_completer_test.go | 192 +++++++++++++++++++- 3 files changed, 333 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9e6dd94..891513e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql - Convert SQLite JSON columns to JSONB (including migration). [PR #1224](https://github.com/riverqueue/river/pull/1224). - Change SQLite driver operations over to use bulk inserts where possible now that sqlc has better support for `json_each`. [PR #1276](https://github.com/riverqueue/river/pull/1276) - Detect duplicate step names across `river.ResumableStep` and return a validation error. [PR #1281](https://github.com/riverqueue/river/pull/1281) +- Earlier backpressure from `BatchCompleter` when it's throughput is saturated with fewer warnings to console. [PR #1292](https://github.com/riverqueue/river/pull/1292) ### Fixed diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 823394c4..13159473 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -257,9 +257,11 @@ type BatchCompleter struct { baseservice.BaseService startstop.BaseStartStop + backlogWaitThreshold int // configurable for testing purposes; backlog at which completions start waiting for the completer to catch up + batchReadyChan chan struct{} completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation disableSleep bool // disable sleep in testing - maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted + maxBacklog int // configurable for testing purposes; emergency backlog threshold where a warning is logged exec riverdriver.Executor pilot riverpilot.Pilot schema string @@ -273,19 +275,22 @@ type BatchCompleter struct { func NewBatchCompleter(archetype *baseservice.Archetype, schema string, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh SubscribeChan) *BatchCompleter { const ( - completionMaxSize = 5_000 - maxBacklog = 20_000 + completionMaxSize = 5_000 + backlogWaitThreshold = completionMaxSize * 2 + maxBacklog = 20_000 ) return baseservice.Init(archetype, &BatchCompleter{ - completionMaxSize: completionMaxSize, - exec: exec, - maxBacklog: maxBacklog, - pilot: pilot, - schema: schema, - setStateParams: make(map[int64]*batchCompleterSetState), - setStateStartTimes: make(map[int64]time.Time), - subscribeCh: subscribeCh, + backlogWaitThreshold: backlogWaitThreshold, + batchReadyChan: make(chan struct{}, 1), + completionMaxSize: completionMaxSize, + exec: exec, + maxBacklog: maxBacklog, + pilot: pilot, + schema: schema, + setStateParams: make(map[int64]*batchCompleterSetState), + setStateStartTimes: make(map[int64]time.Time), + subscribeCh: subscribeCh, }) } @@ -330,6 +335,7 @@ func (c *BatchCompleter) Start(ctx context.Context) error { } return + case <-c.batchReadyChan: case <-ticker.C: } @@ -340,7 +346,7 @@ func (c *BatchCompleter) Start(ctx context.Context) error { // multiple of 5. So, jobs will be completed every 250ms even if the // threshold hasn't been met. const batchCompleterStartThreshold = 100 - if backlogSize() < min(c.maxBacklog, batchCompleterStartThreshold) && numTicks != 0 && numTicks%5 != 0 { + if backlogSize() < min(c.backlogWaitThresholdEffective(), batchCompleterStartThreshold) && numTicks != 0 && numTicks%5 != 0 { continue } @@ -391,6 +397,16 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { return nil } + handleBatchError := func(err error) error { + if isNonRetryableCompleterError(err) { + c.releaseBacklogWaitIfReady(ctx) + return err + } + + c.requeueBatch(ctx, setStateBatch, setStateStartTimes) + return err + } + // Complete a sub-batch with retries. Also helps reduce visual noise and // increase readability of loop below. completeSubBatch := func(batchParams *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { @@ -465,7 +481,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { } jobRowsSubBatch, err := completeSubBatch(subBatch) if err != nil { - return err + return handleBatchError(err) } jobRows = append(jobRows, jobRowsSubBatch...) } @@ -473,7 +489,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { var err error jobRows, err = completeSubBatch(params) if err != nil { - return err + return handleBatchError(err) } } @@ -494,7 +510,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { c.setStateParamsMu.Lock() defer c.setStateParamsMu.Unlock() - if c.waitOnBacklogWaiting && len(c.setStateParams) < c.maxBacklog { + if c.waitOnBacklogWaiting && len(c.setStateParams) < c.backlogResumeThreshold() { c.Logger.DebugContext(ctx, c.Name+": Disabling waitOnBacklog; ready to complete more jobs") close(c.waitOnBacklogChan) c.waitOnBacklogWaiting = false @@ -504,6 +520,41 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { return nil } +func (c *BatchCompleter) releaseBacklogWaitIfReady(ctx context.Context) { + c.setStateParamsMu.Lock() + defer c.setStateParamsMu.Unlock() + + if c.waitOnBacklogWaiting && len(c.setStateParams) < c.backlogResumeThreshold() { + c.Logger.DebugContext(ctx, c.Name+": Disabling waitOnBacklog; ready to complete more jobs") + close(c.waitOnBacklogChan) + c.waitOnBacklogWaiting = false + } +} + +func (c *BatchCompleter) requeueBatch(ctx context.Context, setStateBatch map[int64]*batchCompleterSetState, setStateStartTimes map[int64]time.Time) { + c.setStateParamsMu.Lock() + for id, setState := range setStateBatch { + if _, exists := c.setStateParams[id]; exists { + continue + } + c.setStateParams[id] = setState + c.setStateStartTimes[id] = setStateStartTimes[id] + } + backlogSize := len(c.setStateParams) + if c.waitOnBacklogWaiting && backlogSize < c.backlogResumeThreshold() { + c.Logger.DebugContext(ctx, c.Name+": Disabling waitOnBacklog; ready to complete more jobs") + close(c.waitOnBacklogChan) + c.waitOnBacklogWaiting = false + } + c.setStateParamsMu.Unlock() + + if backlogSize >= c.batchReadyThreshold() { + c.signalBatchReady() + } + + c.Logger.DebugContext(ctx, c.Name+": Requeued failed batch of job(s)", "num_jobs", len(setStateBatch)) +} + func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { now := c.Time.Now() // If we've built up too much of a backlog because the completer's fallen @@ -512,22 +563,60 @@ func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta c.waitOrInitBacklogChannel(ctx) c.setStateParamsMu.Lock() - defer c.setStateParamsMu.Unlock() statsSnapshot := *stats c.setStateParams[params.ID] = &batchCompleterSetState{params, &statsSnapshot} c.setStateStartTimes[params.ID] = now + backlogSize := len(c.setStateParams) + c.setStateParamsMu.Unlock() + + if backlogSize >= c.batchReadyThreshold() { + c.signalBatchReady() + } return nil } +// backlogResumeThreshold returns the low-water mark below which waiting +// completers are released. Keeping this below the wait threshold avoids rapidly +// cycling between waiting and not waiting when the completer is near capacity. +func (c *BatchCompleter) backlogResumeThreshold() int { + return max(c.backlogWaitThresholdEffective()/2, 1) +} + +// backlogWaitThresholdEffective returns the backlog size at which new +// completions should wait for the batch completer to catch up. It's capped at +// maxBacklog so tests and future configuration can't set a normal wait +// threshold beyond the emergency warning threshold. +func (c *BatchCompleter) backlogWaitThresholdEffective() int { + if c.backlogWaitThreshold <= 0 { + return c.maxBacklog + } + return min(c.backlogWaitThreshold, c.maxBacklog) +} + +// batchReadyThreshold returns the backlog size at which the run loop should be +// nudged to process a batch immediately instead of waiting for its next ticker. +// It aims for a full database batch while still respecting low test thresholds. +func (c *BatchCompleter) batchReadyThreshold() int { + return min(c.completionMaxSize, c.backlogWaitThresholdEffective()) +} + +func (c *BatchCompleter) signalBatchReady() { + select { + case c.batchReadyChan <- struct{}{}: + default: + } +} + func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) { c.setStateParamsMu.RLock() var ( backlogSize = len(c.setStateParams) waitChan = c.waitOnBacklogChan waiting = c.waitOnBacklogWaiting + waitAt = c.backlogWaitThresholdEffective() ) c.setStateParamsMu.RUnlock() @@ -536,33 +625,52 @@ func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) { return } - // Not at max backlog. A little raciness is allowed here: multiple + // Not at the wait threshold. A little raciness is allowed here: multiple // goroutines may have acquired the read lock above and seen a size under // limit, but with all allowed to continue it could put the backlog over its - // maximum. The backlog will only be nominally over because generally max - // backlog >> max workers, so consider this okay. - if backlogSize < c.maxBacklog { + // wait threshold. The backlog will only be nominally over because generally + // the wait threshold >> max workers, so consider this okay. + if backlogSize < waitAt { return } c.setStateParamsMu.Lock() - defer c.setStateParamsMu.Unlock() - - // Check once more if another process has already started waiting (it's - // possible for multiple to race between the acquiring the lock above). If - // so, we fall through and allow this insertion to happen, even though it - // might bring the batch slightly over limit, because arranging the locks - // otherwise would get complicated. if c.waitOnBacklogWaiting { + waitChan := c.waitOnBacklogChan + c.setStateParamsMu.Unlock() + <-waitChan + return + } + backlogSize = len(c.setStateParams) + waitAt = c.backlogWaitThresholdEffective() + if backlogSize < waitAt { + c.setStateParamsMu.Unlock() return } - // Tell all future insertions to start waiting. This one is allowed to fall - // through and succeed even though it may bring the batch a little over - // limit. + // Tell future insertions to start waiting. This caller falls through so + // there is guaranteed to be a pending completion for a future batch to + // process, even if the current in-flight batch fails. c.waitOnBacklogChan = make(chan struct{}) c.waitOnBacklogWaiting = true - c.Logger.WarnContext(ctx, c.Name+": Hit maximum backlog; completions will wait until below threshold", "max_backlog", c.maxBacklog) + if backlogSize >= c.maxBacklog { + c.Logger.WarnContext(ctx, c.Name+": Hit maximum backlog; completions will wait until below threshold", + "backlog_size", backlogSize, + "backlog_wait_threshold", waitAt, + "max_backlog", c.maxBacklog, + ) + } else { + c.Logger.DebugContext(ctx, c.Name+": Applying completion backlog pressure", + "backlog_resume_threshold", c.backlogResumeThreshold(), + "backlog_size", backlogSize, + "backlog_wait_threshold", waitAt, + ) + } + c.setStateParamsMu.Unlock() +} + +func isNonRetryableCompleterError(err error) bool { + return errors.Is(err, context.Canceled) || errors.Is(err, riverdriver.ErrClosedPool) } // As configured, total time asleep from initial attempt is ~7 seconds (1 + 2 + @@ -586,13 +694,8 @@ func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseSer retVal, err := retryFunc(ctx) if err != nil { - // A cancelled context will never succeed, return immediately. - if errors.Is(err, context.Canceled) { - return defaultVal, err - } - - // A closed pool will never succeed, return immediately. - if errors.Is(err, riverdriver.ErrClosedPool) { + // A cancelled context or a closed pool will never succeed. + if isNonRetryableCompleterError(err) { return defaultVal, err } diff --git a/internal/jobcompleter/job_completer_test.go b/internal/jobcompleter/job_completer_test.go index 18d9f465..54e74623 100644 --- a/internal/jobcompleter/job_completer_test.go +++ b/internal/jobcompleter/job_completer_test.go @@ -459,7 +459,8 @@ func TestBatchCompleter(t *testing.T) { t.Parallel() completer, bundle := setup(t) - completer.maxBacklog = 10 // set to something artificially low + completer.backlogWaitThreshold = 10 // set to something artificially low + completer.completionMaxSize = 10 startCompleter(ctx, t, completer) jobUpdateChan := make(chan CompleterJobUpdated, 100) @@ -487,6 +488,195 @@ func TestBatchCompleter(t *testing.T) { }) } +func TestBatchCompleter_BackpressureBeforeMaxBacklog(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + execMock := &partialExecutorMock{} + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + rows := make([]*rivertype.JobRow, len(params.ID)) + for i := range params.ID { + rows[i] = &rivertype.JobRow{ + ID: params.ID[i], + State: params.State[i], + } + } + return rows, nil + } + + subscribeCh := make(chan []CompleterJobUpdated, 1) + completer := NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), "", execMock, &riverpilot.StandardPilot{}, subscribeCh) + completer.backlogWaitThreshold = 2 + completer.completionMaxSize = 2 + completer.disableSleep = true + completer.maxBacklog = 100 + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(2, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(3, time.Now(), nil))) + + errCh := make(chan error, 1) + go func() { + errCh <- completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(4, time.Now(), nil)) + }() + + require.Eventually(t, func() bool { + completer.setStateParamsMu.RLock() + defer completer.setStateParamsMu.RUnlock() + return completer.waitOnBacklogWaiting + }, riversharedtest.WaitTimeout(), 10*time.Millisecond) + + select { + case err := <-errCh: + require.NoError(t, err) + require.FailNow(t, "expected completion to wait for backlog pressure to clear") + default: + } + + require.NoError(t, completer.handleBatch(ctx)) + require.NoError(t, riversharedtest.WaitOrTimeout(t, errCh)) +} + +func TestBatchCompleter_BackpressureReleasedAfterNonRetryableCompletionFailure(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + execMock := &partialExecutorMock{} + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + return nil, context.Canceled + } + + subscribeCh := make(chan []CompleterJobUpdated, 1) + completer := NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), "", execMock, &riverpilot.StandardPilot{}, subscribeCh) + completer.backlogWaitThreshold = 2 + completer.disableSleep = true + completer.maxBacklog = 100 + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(2, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(3, time.Now(), nil))) + + require.Eventually(t, func() bool { + completer.setStateParamsMu.RLock() + defer completer.setStateParamsMu.RUnlock() + return completer.waitOnBacklogWaiting + }, riversharedtest.WaitTimeout(), 10*time.Millisecond) + + errCh := make(chan error, 1) + go func() { + errCh <- completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(4, time.Now(), nil)) + }() + + select { + case err := <-errCh: + require.NoError(t, err) + require.FailNow(t, "expected completion to wait for backlog pressure to clear") + default: + } + + require.ErrorIs(t, completer.handleBatch(ctx), context.Canceled) + require.NoError(t, riversharedtest.WaitOrTimeout(t, errCh)) +} + +func TestBatchCompleter_BackpressureRequeuesBatchAfterCompletionFailure(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + expectedErr := errors.New("error from batch completion") + var numCalls int + execMock := &partialExecutorMock{} + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + numCalls++ + if numCalls <= numRetries { + return nil, expectedErr + } + + rows := make([]*rivertype.JobRow, len(params.ID)) + for i := range params.ID { + rows[i] = &rivertype.JobRow{ + ID: params.ID[i], + State: params.State[i], + } + } + return rows, nil + } + + subscribeCh := make(chan []CompleterJobUpdated, 1) + completer := NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), "", execMock, &riverpilot.StandardPilot{}, subscribeCh) + completer.backlogWaitThreshold = 2 + completer.completionMaxSize = 10 + completer.disableSleep = true + completer.maxBacklog = 100 + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(2, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(3, time.Now(), nil))) + + errCh := make(chan error, 1) + go func() { + errCh <- completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(4, time.Now(), nil)) + }() + + require.Eventually(t, func() bool { + completer.setStateParamsMu.RLock() + defer completer.setStateParamsMu.RUnlock() + return completer.waitOnBacklogWaiting + }, riversharedtest.WaitTimeout(), 10*time.Millisecond) + + require.ErrorIs(t, completer.handleBatch(ctx), expectedErr) + + completer.setStateParamsMu.RLock() + require.Len(t, completer.setStateParams, 3) + completer.setStateParamsMu.RUnlock() + + require.NoError(t, completer.handleBatch(ctx)) + require.NoError(t, riversharedtest.WaitOrTimeout(t, errCh)) +} + +func TestBatchCompleter_NonRetryableCompletionFailureDoesNotRequeueBatch(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + for _, tt := range []struct { + err error + name string + }{ + {err: context.Canceled, name: "ContextCanceled"}, + {err: riverdriver.ErrClosedPool, name: "ErrClosedPool"}, + } { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + var numCalls int + execMock := &partialExecutorMock{} + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + numCalls++ + return nil, tt.err + } + + subscribeCh := make(chan []CompleterJobUpdated, 1) + completer := NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), "", execMock, &riverpilot.StandardPilot{}, subscribeCh) + completer.disableSleep = true + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(2, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(3, time.Now(), nil))) + + require.ErrorIs(t, completer.handleBatch(ctx), tt.err) + require.Equal(t, 1, numCalls) + + completer.setStateParamsMu.RLock() + require.Empty(t, completer.setStateParams) + require.Empty(t, completer.setStateStartTimes) + completer.setStateParamsMu.RUnlock() + }) + } +} + func TestBatchCompleter_JobStatsSnapshotsPerUpdate(t *testing.T) { t.Parallel()