diff --git a/CHANGELOG.md b/CHANGELOG.md index 891513e4..bf84baff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql - Change SQLite driver operations over to use bulk inserts where possible now that sqlc has better support for `json_each`. [PR #1276](https://github.com/riverqueue/river/pull/1276) - Detect duplicate step names across `river.ResumableStep` and return a validation error. [PR #1281](https://github.com/riverqueue/river/pull/1281) - Earlier backpressure from `BatchCompleter` when it's throughput is saturated with fewer warnings to console. [PR #1292](https://github.com/riverqueue/river/pull/1292) +- Series of minor optimizations in `BatchCompleter` raising throughput ~20% when it's the bottleneck in job processing (e.g. in benchmarks). [PR #1293](https://github.com/riverqueue/river/pull/1293) ### Fixed diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 13159473..51bfacd4 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -16,7 +16,6 @@ import ( "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/util/serviceutil" - "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivertype" ) @@ -243,8 +242,9 @@ func (c *AsyncCompleter) Start(ctx context.Context) error { } type batchCompleterSetState struct { - Params *riverdriver.JobSetStateIfRunningParams - Stats *jobstats.JobStatistics + Params *riverdriver.JobSetStateIfRunningParams + StartTime time.Time + Stats *jobstats.JobStatistics } // BatchCompleter accumulates incoming completions, and instead of completing @@ -265,9 +265,8 @@ type BatchCompleter struct { exec riverdriver.Executor pilot riverpilot.Pilot schema string - setStateParams map[int64]*batchCompleterSetState + setStateParams map[int64]batchCompleterSetState setStateParamsMu sync.RWMutex - setStateStartTimes map[int64]time.Time subscribeCh SubscribeChan waitOnBacklogChan chan struct{} waitOnBacklogWaiting bool @@ -288,8 +287,7 @@ func NewBatchCompleter(archetype *baseservice.Archetype, schema string, exec riv maxBacklog: maxBacklog, pilot: pilot, schema: schema, - setStateParams: make(map[int64]*batchCompleterSetState), - setStateStartTimes: make(map[int64]time.Time), + setStateParams: make(map[int64]batchCompleterSetState), subscribeCh: subscribeCh, }) } @@ -370,22 +368,17 @@ func (c *BatchCompleter) Start(ctx context.Context) error { } func (c *BatchCompleter) handleBatch(ctx context.Context) error { - var ( - setStateBatch map[int64]*batchCompleterSetState - setStateStartTimes map[int64]time.Time - ) + var setStateBatch map[int64]batchCompleterSetState func() { c.setStateParamsMu.Lock() defer c.setStateParamsMu.Unlock() setStateBatch = c.setStateParams - setStateStartTimes = c.setStateStartTimes // Don't bother resetting the map if there's nothing to process, // allowing the completer to idle efficiently. if len(setStateBatch) > 0 { - c.setStateParams = make(map[int64]*batchCompleterSetState) - c.setStateStartTimes = make(map[int64]time.Time) + c.setStateParams = make(map[int64]batchCompleterSetState) } else { // Set nil to avoid a data race below in case the map is set as a // new job comes in. @@ -403,7 +396,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { return err } - c.requeueBatch(ctx, setStateBatch, setStateStartTimes) + c.requeueBatch(ctx, setStateBatch) return err } @@ -425,9 +418,9 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { }) } - // This could be written more simply using multiple `sliceutil.Map`s, but - // it's done this way to allocate as few new slices as necessary. - mapBatch := func(setStateBatch map[int64]*batchCompleterSetState) *riverdriver.JobSetStateIfRunningManyParams { + // This could be written more simply using multiple map helpers, but it's + // done this way to allocate as few new slices as necessary. + mapBatch := func(setStateBatch map[int64]batchCompleterSetState) *riverdriver.JobSetStateIfRunningManyParams { params := &riverdriver.JobSetStateIfRunningManyParams{ ID: make([]int64, len(setStateBatch)), Attempt: make([]*int, len(setStateBatch)), @@ -447,10 +440,10 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { params.MetadataDoMerge[i] = setState.Params.MetadataDoMerge params.MetadataUpdates[i] = setState.Params.MetadataUpdates params.ScheduledAt[i] = setState.Params.ScheduledAt - params.Schema = c.schema params.State[i] = setState.Params.State i++ } + params.Schema = c.schema return params } @@ -493,16 +486,19 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { } } - events := sliceutil.Map(jobRows, func(jobRow *rivertype.JobRow) CompleterJobUpdated { + var ( + completeTime = c.Time.Now() + events = make([]CompleterJobUpdated, len(jobRows)) + ) + for i, jobRow := range jobRows { setState := setStateBatch[jobRow.ID] - startTime := setStateStartTimes[jobRow.ID] - setState.Stats.CompleteDuration = c.Time.Now().Sub(startTime) - return CompleterJobUpdated{ + setState.Stats.CompleteDuration = completeTime.Sub(setState.StartTime) + events[i] = CompleterJobUpdated{ Job: jobRow, JobStats: setState.Stats, Snoozed: setState.Params.Snoozed, } - }) + } c.subscribeCh <- events @@ -531,14 +527,13 @@ func (c *BatchCompleter) releaseBacklogWaitIfReady(ctx context.Context) { } } -func (c *BatchCompleter) requeueBatch(ctx context.Context, setStateBatch map[int64]*batchCompleterSetState, setStateStartTimes map[int64]time.Time) { +func (c *BatchCompleter) requeueBatch(ctx context.Context, setStateBatch map[int64]batchCompleterSetState) { c.setStateParamsMu.Lock() for id, setState := range setStateBatch { if _, exists := c.setStateParams[id]; exists { continue } c.setStateParams[id] = setState - c.setStateStartTimes[id] = setStateStartTimes[id] } backlogSize := len(c.setStateParams) if c.waitOnBacklogWaiting && backlogSize < c.backlogResumeThreshold() { @@ -557,19 +552,32 @@ func (c *BatchCompleter) requeueBatch(ctx context.Context, setStateBatch map[int func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { now := c.Time.Now() - // If we've built up too much of a backlog because the completer's fallen - // behind, block completions until the complete loop's had a chance to catch - // up. - c.waitOrInitBacklogChannel(ctx) - c.setStateParamsMu.Lock() + var backlogSize int + for { + // Keep the common enqueue path to one lock acquisition. If the + // completer is behind, wait for the current backlog gate to open and + // retry so the threshold is checked against fresh state. + c.setStateParamsMu.Lock() + if c.waitOnBacklogWaiting { + waitChan := c.waitOnBacklogChan + c.setStateParamsMu.Unlock() + <-waitChan + continue + } - statsSnapshot := *stats + waitAt := c.backlogWaitThresholdEffective() + backlogSize = len(c.setStateParams) + if backlogSize >= waitAt { + c.initBacklogWaitLocked(ctx, backlogSize, waitAt) + } - c.setStateParams[params.ID] = &batchCompleterSetState{params, &statsSnapshot} - c.setStateStartTimes[params.ID] = now - backlogSize := len(c.setStateParams) - c.setStateParamsMu.Unlock() + statsSnapshot := *stats + c.setStateParams[params.ID] = batchCompleterSetState{Params: params, StartTime: now, Stats: &statsSnapshot} + backlogSize = len(c.setStateParams) + c.setStateParamsMu.Unlock() + break + } if backlogSize >= c.batchReadyThreshold() { c.signalBatchReady() @@ -610,47 +618,9 @@ func (c *BatchCompleter) signalBatchReady() { } } -func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) { - c.setStateParamsMu.RLock() - var ( - backlogSize = len(c.setStateParams) - waitChan = c.waitOnBacklogChan - waiting = c.waitOnBacklogWaiting - waitAt = c.backlogWaitThresholdEffective() - ) - c.setStateParamsMu.RUnlock() - - if waiting { - <-waitChan - return - } - - // Not at the wait threshold. A little raciness is allowed here: multiple - // goroutines may have acquired the read lock above and seen a size under - // limit, but with all allowed to continue it could put the backlog over its - // wait threshold. The backlog will only be nominally over because generally - // the wait threshold >> max workers, so consider this okay. - if backlogSize < waitAt { - return - } - - c.setStateParamsMu.Lock() - if c.waitOnBacklogWaiting { - waitChan := c.waitOnBacklogChan - c.setStateParamsMu.Unlock() - <-waitChan - return - } - backlogSize = len(c.setStateParams) - waitAt = c.backlogWaitThresholdEffective() - if backlogSize < waitAt { - c.setStateParamsMu.Unlock() - return - } - - // Tell future insertions to start waiting. This caller falls through so - // there is guaranteed to be a pending completion for a future batch to - // process, even if the current in-flight batch fails. +// initBacklogWaitLocked starts a backlog wait gate and must be called with +// setStateParamsMu held. +func (c *BatchCompleter) initBacklogWaitLocked(ctx context.Context, backlogSize, waitAt int) chan struct{} { c.waitOnBacklogChan = make(chan struct{}) c.waitOnBacklogWaiting = true if backlogSize >= c.maxBacklog { @@ -666,7 +636,7 @@ func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) { "backlog_wait_threshold", waitAt, ) } - c.setStateParamsMu.Unlock() + return c.waitOnBacklogChan } func isNonRetryableCompleterError(err error) bool { diff --git a/internal/jobcompleter/job_completer_test.go b/internal/jobcompleter/job_completer_test.go index 54e74623..e4ec74f8 100644 --- a/internal/jobcompleter/job_completer_test.go +++ b/internal/jobcompleter/job_completer_test.go @@ -671,7 +671,6 @@ func TestBatchCompleter_NonRetryableCompletionFailureDoesNotRequeueBatch(t *test completer.setStateParamsMu.RLock() require.Empty(t, completer.setStateParams) - require.Empty(t, completer.setStateStartTimes) completer.setStateParamsMu.RUnlock() }) }