From 9b268fac8d9f490dbae2c36da7404eedcfbadc7d Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 20 Jun 2026 09:52:01 -0600 Subject: [PATCH] LLM-based optimizations for `BatchCompleter` This one's just throwing Codex at `BatchCompleter` to see what optimizations it can find in there as this seems to be by far our slowest spot in River with respect to benchmarking at least. It didn't do anything huge, but ended up putting in a variety of minor optimizations: - Removed the separate `setStateStartTimes` map by storing `StartTime` in `batchCompleterSetState`. - Changed queued state storage from pointer values to map values, removing a wrapper allocation. - Collapsed backlog checking + enqueue into one lock acquisition on the common path. - Replaced `sliceutil.Map` with a direct event-building loop. - Use one batch completion timestamp instead of calling `Now()` once per completed row. - Avoid repeatedly assigning `params.Schema` inside the batch mapping loop. Benchmarks seem to indicate a previous 12-13 us/op coming down to 10-11 us/op, and with one fewer allocation per op. I'm seeing a definite speedup when I run the full benchmark. Our bench has some consistency problems (can show considerably different results per run), but whereas before ~46k jobs/sec was about the best I ever saw on my commodity MBP here, I'm now seeing up to ~56k jobs/sec, so at best a 10k jobs/sec improvement: $ go run ./cmd/river bench --database-url $DATABASE_URL --num-total-jobs 1_000_000 bench: jobs worked [ 0 ], inserted [ 1000000 ], job/sec [ 0.0 ] [0s] bench: jobs worked [ 106472 ], inserted [ 0 ], job/sec [ 53236.0 ] [2s] bench: jobs worked [ 108440 ], inserted [ 0 ], job/sec [ 54220.0 ] [2s] bench: jobs worked [ 114035 ], inserted [ 0 ], job/sec [ 57017.5 ] [2s] bench: jobs worked [ 107402 ], inserted [ 0 ], job/sec [ 53701.0 ] [2s] bench: jobs worked [ 114433 ], inserted [ 0 ], job/sec [ 57216.5 ] [2s] bench: jobs worked [ 105701 ], inserted [ 0 ], job/sec [ 52850.5 ] [2s] bench: jobs worked [ 116051 ], inserted [ 0 ], job/sec [ 58025.5 ] [2s] bench: jobs worked [ 108054 ], inserted [ 0 ], job/sec [ 54027.0 ] [2s] bench: jobs worked [ 119412 ], inserted [ 0 ], job/sec [ 59706.0 ] [2s] bench: total jobs worked [ 1000000 ], total jobs inserted [ 1000000 ], overall job/sec [ 55710.8 ], running 17.949838958s The number should be even better on faster computers. Nothing in the code gets any worse (and I think some of it is actually an improvement?) so I think it's probably worthwhile to bring these in. --- CHANGELOG.md | 1 + internal/jobcompleter/job_completer.go | 126 ++++++++------------ internal/jobcompleter/job_completer_test.go | 1 - 3 files changed, 49 insertions(+), 79 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 891513e4..bf84baff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql - Change SQLite driver operations over to use bulk inserts where possible now that sqlc has better support for `json_each`. [PR #1276](https://github.com/riverqueue/river/pull/1276) - Detect duplicate step names across `river.ResumableStep` and return a validation error. [PR #1281](https://github.com/riverqueue/river/pull/1281) - Earlier backpressure from `BatchCompleter` when it's throughput is saturated with fewer warnings to console. [PR #1292](https://github.com/riverqueue/river/pull/1292) +- Series of minor optimizations in `BatchCompleter` raising throughput ~20% when it's the bottleneck in job processing (e.g. in benchmarks). [PR #1293](https://github.com/riverqueue/river/pull/1293) ### Fixed diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 13159473..51bfacd4 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -16,7 +16,6 @@ import ( "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/util/serviceutil" - "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivertype" ) @@ -243,8 +242,9 @@ func (c *AsyncCompleter) Start(ctx context.Context) error { } type batchCompleterSetState struct { - Params *riverdriver.JobSetStateIfRunningParams - Stats *jobstats.JobStatistics + Params *riverdriver.JobSetStateIfRunningParams + StartTime time.Time + Stats *jobstats.JobStatistics } // BatchCompleter accumulates incoming completions, and instead of completing @@ -265,9 +265,8 @@ type BatchCompleter struct { exec riverdriver.Executor pilot riverpilot.Pilot schema string - setStateParams map[int64]*batchCompleterSetState + setStateParams map[int64]batchCompleterSetState setStateParamsMu sync.RWMutex - setStateStartTimes map[int64]time.Time subscribeCh SubscribeChan waitOnBacklogChan chan struct{} waitOnBacklogWaiting bool @@ -288,8 +287,7 @@ func NewBatchCompleter(archetype *baseservice.Archetype, schema string, exec riv maxBacklog: maxBacklog, pilot: pilot, schema: schema, - setStateParams: make(map[int64]*batchCompleterSetState), - setStateStartTimes: make(map[int64]time.Time), + setStateParams: make(map[int64]batchCompleterSetState), subscribeCh: subscribeCh, }) } @@ -370,22 +368,17 @@ func (c *BatchCompleter) Start(ctx context.Context) error { } func (c *BatchCompleter) handleBatch(ctx context.Context) error { - var ( - setStateBatch map[int64]*batchCompleterSetState - setStateStartTimes map[int64]time.Time - ) + var setStateBatch map[int64]batchCompleterSetState func() { c.setStateParamsMu.Lock() defer c.setStateParamsMu.Unlock() setStateBatch = c.setStateParams - setStateStartTimes = c.setStateStartTimes // Don't bother resetting the map if there's nothing to process, // allowing the completer to idle efficiently. if len(setStateBatch) > 0 { - c.setStateParams = make(map[int64]*batchCompleterSetState) - c.setStateStartTimes = make(map[int64]time.Time) + c.setStateParams = make(map[int64]batchCompleterSetState) } else { // Set nil to avoid a data race below in case the map is set as a // new job comes in. @@ -403,7 +396,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { return err } - c.requeueBatch(ctx, setStateBatch, setStateStartTimes) + c.requeueBatch(ctx, setStateBatch) return err } @@ -425,9 +418,9 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { }) } - // This could be written more simply using multiple `sliceutil.Map`s, but - // it's done this way to allocate as few new slices as necessary. - mapBatch := func(setStateBatch map[int64]*batchCompleterSetState) *riverdriver.JobSetStateIfRunningManyParams { + // This could be written more simply using multiple map helpers, but it's + // done this way to allocate as few new slices as necessary. + mapBatch := func(setStateBatch map[int64]batchCompleterSetState) *riverdriver.JobSetStateIfRunningManyParams { params := &riverdriver.JobSetStateIfRunningManyParams{ ID: make([]int64, len(setStateBatch)), Attempt: make([]*int, len(setStateBatch)), @@ -447,10 +440,10 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { params.MetadataDoMerge[i] = setState.Params.MetadataDoMerge params.MetadataUpdates[i] = setState.Params.MetadataUpdates params.ScheduledAt[i] = setState.Params.ScheduledAt - params.Schema = c.schema params.State[i] = setState.Params.State i++ } + params.Schema = c.schema return params } @@ -493,16 +486,19 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { } } - events := sliceutil.Map(jobRows, func(jobRow *rivertype.JobRow) CompleterJobUpdated { + var ( + completeTime = c.Time.Now() + events = make([]CompleterJobUpdated, len(jobRows)) + ) + for i, jobRow := range jobRows { setState := setStateBatch[jobRow.ID] - startTime := setStateStartTimes[jobRow.ID] - setState.Stats.CompleteDuration = c.Time.Now().Sub(startTime) - return CompleterJobUpdated{ + setState.Stats.CompleteDuration = completeTime.Sub(setState.StartTime) + events[i] = CompleterJobUpdated{ Job: jobRow, JobStats: setState.Stats, Snoozed: setState.Params.Snoozed, } - }) + } c.subscribeCh <- events @@ -531,14 +527,13 @@ func (c *BatchCompleter) releaseBacklogWaitIfReady(ctx context.Context) { } } -func (c *BatchCompleter) requeueBatch(ctx context.Context, setStateBatch map[int64]*batchCompleterSetState, setStateStartTimes map[int64]time.Time) { +func (c *BatchCompleter) requeueBatch(ctx context.Context, setStateBatch map[int64]batchCompleterSetState) { c.setStateParamsMu.Lock() for id, setState := range setStateBatch { if _, exists := c.setStateParams[id]; exists { continue } c.setStateParams[id] = setState - c.setStateStartTimes[id] = setStateStartTimes[id] } backlogSize := len(c.setStateParams) if c.waitOnBacklogWaiting && backlogSize < c.backlogResumeThreshold() { @@ -557,19 +552,32 @@ func (c *BatchCompleter) requeueBatch(ctx context.Context, setStateBatch map[int func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { now := c.Time.Now() - // If we've built up too much of a backlog because the completer's fallen - // behind, block completions until the complete loop's had a chance to catch - // up. - c.waitOrInitBacklogChannel(ctx) - c.setStateParamsMu.Lock() + var backlogSize int + for { + // Keep the common enqueue path to one lock acquisition. If the + // completer is behind, wait for the current backlog gate to open and + // retry so the threshold is checked against fresh state. + c.setStateParamsMu.Lock() + if c.waitOnBacklogWaiting { + waitChan := c.waitOnBacklogChan + c.setStateParamsMu.Unlock() + <-waitChan + continue + } - statsSnapshot := *stats + waitAt := c.backlogWaitThresholdEffective() + backlogSize = len(c.setStateParams) + if backlogSize >= waitAt { + c.initBacklogWaitLocked(ctx, backlogSize, waitAt) + } - c.setStateParams[params.ID] = &batchCompleterSetState{params, &statsSnapshot} - c.setStateStartTimes[params.ID] = now - backlogSize := len(c.setStateParams) - c.setStateParamsMu.Unlock() + statsSnapshot := *stats + c.setStateParams[params.ID] = batchCompleterSetState{Params: params, StartTime: now, Stats: &statsSnapshot} + backlogSize = len(c.setStateParams) + c.setStateParamsMu.Unlock() + break + } if backlogSize >= c.batchReadyThreshold() { c.signalBatchReady() @@ -610,47 +618,9 @@ func (c *BatchCompleter) signalBatchReady() { } } -func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) { - c.setStateParamsMu.RLock() - var ( - backlogSize = len(c.setStateParams) - waitChan = c.waitOnBacklogChan - waiting = c.waitOnBacklogWaiting - waitAt = c.backlogWaitThresholdEffective() - ) - c.setStateParamsMu.RUnlock() - - if waiting { - <-waitChan - return - } - - // Not at the wait threshold. A little raciness is allowed here: multiple - // goroutines may have acquired the read lock above and seen a size under - // limit, but with all allowed to continue it could put the backlog over its - // wait threshold. The backlog will only be nominally over because generally - // the wait threshold >> max workers, so consider this okay. - if backlogSize < waitAt { - return - } - - c.setStateParamsMu.Lock() - if c.waitOnBacklogWaiting { - waitChan := c.waitOnBacklogChan - c.setStateParamsMu.Unlock() - <-waitChan - return - } - backlogSize = len(c.setStateParams) - waitAt = c.backlogWaitThresholdEffective() - if backlogSize < waitAt { - c.setStateParamsMu.Unlock() - return - } - - // Tell future insertions to start waiting. This caller falls through so - // there is guaranteed to be a pending completion for a future batch to - // process, even if the current in-flight batch fails. +// initBacklogWaitLocked starts a backlog wait gate and must be called with +// setStateParamsMu held. +func (c *BatchCompleter) initBacklogWaitLocked(ctx context.Context, backlogSize, waitAt int) chan struct{} { c.waitOnBacklogChan = make(chan struct{}) c.waitOnBacklogWaiting = true if backlogSize >= c.maxBacklog { @@ -666,7 +636,7 @@ func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) { "backlog_wait_threshold", waitAt, ) } - c.setStateParamsMu.Unlock() + return c.waitOnBacklogChan } func isNonRetryableCompleterError(err error) bool { diff --git a/internal/jobcompleter/job_completer_test.go b/internal/jobcompleter/job_completer_test.go index 54e74623..e4ec74f8 100644 --- a/internal/jobcompleter/job_completer_test.go +++ b/internal/jobcompleter/job_completer_test.go @@ -671,7 +671,6 @@ func TestBatchCompleter_NonRetryableCompletionFailureDoesNotRequeueBatch(t *test completer.setStateParamsMu.RLock() require.Empty(t, completer.setStateParams) - require.Empty(t, completer.setStateStartTimes) completer.setStateParamsMu.RUnlock() }) }