Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql
- Change SQLite driver operations over to use bulk inserts where possible now that sqlc has better support for `json_each`. [PR #1276](https://github.com/riverqueue/river/pull/1276)
- Detect duplicate step names across `river.ResumableStep` and return a validation error. [PR #1281](https://github.com/riverqueue/river/pull/1281)
- Earlier backpressure from `BatchCompleter` when it's throughput is saturated with fewer warnings to console. [PR #1292](https://github.com/riverqueue/river/pull/1292)
- Series of minor optimizations in `BatchCompleter` raising throughput ~20% when it's the bottleneck in job processing (e.g. in benchmarks). [PR #1293](https://github.com/riverqueue/river/pull/1293)

### Fixed

Expand Down
126 changes: 48 additions & 78 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/riverqueue/river/rivershared/riverpilot"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
)

Expand Down Expand Up @@ -243,8 +242,9 @@ func (c *AsyncCompleter) Start(ctx context.Context) error {
}

type batchCompleterSetState struct {
Params *riverdriver.JobSetStateIfRunningParams
Stats *jobstats.JobStatistics
Params *riverdriver.JobSetStateIfRunningParams
StartTime time.Time
Stats *jobstats.JobStatistics
}

// BatchCompleter accumulates incoming completions, and instead of completing
Expand All @@ -265,9 +265,8 @@ type BatchCompleter struct {
exec riverdriver.Executor
pilot riverpilot.Pilot
schema string
setStateParams map[int64]*batchCompleterSetState
setStateParams map[int64]batchCompleterSetState
setStateParamsMu sync.RWMutex
setStateStartTimes map[int64]time.Time
subscribeCh SubscribeChan
waitOnBacklogChan chan struct{}
waitOnBacklogWaiting bool
Expand All @@ -288,8 +287,7 @@ func NewBatchCompleter(archetype *baseservice.Archetype, schema string, exec riv
maxBacklog: maxBacklog,
pilot: pilot,
schema: schema,
setStateParams: make(map[int64]*batchCompleterSetState),
setStateStartTimes: make(map[int64]time.Time),
setStateParams: make(map[int64]batchCompleterSetState),
subscribeCh: subscribeCh,
})
}
Expand Down Expand Up @@ -370,22 +368,17 @@ func (c *BatchCompleter) Start(ctx context.Context) error {
}

func (c *BatchCompleter) handleBatch(ctx context.Context) error {
var (
setStateBatch map[int64]*batchCompleterSetState
setStateStartTimes map[int64]time.Time
)
var setStateBatch map[int64]batchCompleterSetState
func() {
c.setStateParamsMu.Lock()
defer c.setStateParamsMu.Unlock()

setStateBatch = c.setStateParams
setStateStartTimes = c.setStateStartTimes

// Don't bother resetting the map if there's nothing to process,
// allowing the completer to idle efficiently.
if len(setStateBatch) > 0 {
c.setStateParams = make(map[int64]*batchCompleterSetState)
c.setStateStartTimes = make(map[int64]time.Time)
c.setStateParams = make(map[int64]batchCompleterSetState)
} else {
// Set nil to avoid a data race below in case the map is set as a
// new job comes in.
Expand All @@ -403,7 +396,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
return err
}

c.requeueBatch(ctx, setStateBatch, setStateStartTimes)
c.requeueBatch(ctx, setStateBatch)
return err
}

Expand All @@ -425,9 +418,9 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
})
}

// This could be written more simply using multiple `sliceutil.Map`s, but
// it's done this way to allocate as few new slices as necessary.
mapBatch := func(setStateBatch map[int64]*batchCompleterSetState) *riverdriver.JobSetStateIfRunningManyParams {
// This could be written more simply using multiple map helpers, but it's
// done this way to allocate as few new slices as necessary.
mapBatch := func(setStateBatch map[int64]batchCompleterSetState) *riverdriver.JobSetStateIfRunningManyParams {
params := &riverdriver.JobSetStateIfRunningManyParams{
ID: make([]int64, len(setStateBatch)),
Attempt: make([]*int, len(setStateBatch)),
Expand All @@ -447,10 +440,10 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
params.MetadataDoMerge[i] = setState.Params.MetadataDoMerge
params.MetadataUpdates[i] = setState.Params.MetadataUpdates
params.ScheduledAt[i] = setState.Params.ScheduledAt
params.Schema = c.schema
params.State[i] = setState.Params.State
i++
}
params.Schema = c.schema
return params
}

Expand Down Expand Up @@ -493,16 +486,19 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
}
}

events := sliceutil.Map(jobRows, func(jobRow *rivertype.JobRow) CompleterJobUpdated {
var (
completeTime = c.Time.Now()
events = make([]CompleterJobUpdated, len(jobRows))
)
for i, jobRow := range jobRows {
setState := setStateBatch[jobRow.ID]
startTime := setStateStartTimes[jobRow.ID]
setState.Stats.CompleteDuration = c.Time.Now().Sub(startTime)
return CompleterJobUpdated{
setState.Stats.CompleteDuration = completeTime.Sub(setState.StartTime)
events[i] = CompleterJobUpdated{
Job: jobRow,
JobStats: setState.Stats,
Snoozed: setState.Params.Snoozed,
}
})
}

c.subscribeCh <- events

Expand Down Expand Up @@ -531,14 +527,13 @@ func (c *BatchCompleter) releaseBacklogWaitIfReady(ctx context.Context) {
}
}

func (c *BatchCompleter) requeueBatch(ctx context.Context, setStateBatch map[int64]*batchCompleterSetState, setStateStartTimes map[int64]time.Time) {
func (c *BatchCompleter) requeueBatch(ctx context.Context, setStateBatch map[int64]batchCompleterSetState) {
c.setStateParamsMu.Lock()
for id, setState := range setStateBatch {
if _, exists := c.setStateParams[id]; exists {
continue
}
c.setStateParams[id] = setState
c.setStateStartTimes[id] = setStateStartTimes[id]
}
backlogSize := len(c.setStateParams)
if c.waitOnBacklogWaiting && backlogSize < c.backlogResumeThreshold() {
Expand All @@ -557,19 +552,32 @@ func (c *BatchCompleter) requeueBatch(ctx context.Context, setStateBatch map[int

func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error {
now := c.Time.Now()
// If we've built up too much of a backlog because the completer's fallen
// behind, block completions until the complete loop's had a chance to catch
// up.
c.waitOrInitBacklogChannel(ctx)

c.setStateParamsMu.Lock()
var backlogSize int
for {
// Keep the common enqueue path to one lock acquisition. If the
// completer is behind, wait for the current backlog gate to open and
// retry so the threshold is checked against fresh state.
c.setStateParamsMu.Lock()
if c.waitOnBacklogWaiting {
waitChan := c.waitOnBacklogChan
c.setStateParamsMu.Unlock()
<-waitChan
continue
}

statsSnapshot := *stats
waitAt := c.backlogWaitThresholdEffective()
backlogSize = len(c.setStateParams)
if backlogSize >= waitAt {
c.initBacklogWaitLocked(ctx, backlogSize, waitAt)
}

c.setStateParams[params.ID] = &batchCompleterSetState{params, &statsSnapshot}
c.setStateStartTimes[params.ID] = now
backlogSize := len(c.setStateParams)
c.setStateParamsMu.Unlock()
statsSnapshot := *stats
c.setStateParams[params.ID] = batchCompleterSetState{Params: params, StartTime: now, Stats: &statsSnapshot}
backlogSize = len(c.setStateParams)
c.setStateParamsMu.Unlock()
break
}

if backlogSize >= c.batchReadyThreshold() {
c.signalBatchReady()
Expand Down Expand Up @@ -610,47 +618,9 @@ func (c *BatchCompleter) signalBatchReady() {
}
}

func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) {
c.setStateParamsMu.RLock()
var (
backlogSize = len(c.setStateParams)
waitChan = c.waitOnBacklogChan
waiting = c.waitOnBacklogWaiting
waitAt = c.backlogWaitThresholdEffective()
)
c.setStateParamsMu.RUnlock()

if waiting {
<-waitChan
return
}

// Not at the wait threshold. A little raciness is allowed here: multiple
// goroutines may have acquired the read lock above and seen a size under
// limit, but with all allowed to continue it could put the backlog over its
// wait threshold. The backlog will only be nominally over because generally
// the wait threshold >> max workers, so consider this okay.
if backlogSize < waitAt {
return
}

c.setStateParamsMu.Lock()
if c.waitOnBacklogWaiting {
waitChan := c.waitOnBacklogChan
c.setStateParamsMu.Unlock()
<-waitChan
return
}
backlogSize = len(c.setStateParams)
waitAt = c.backlogWaitThresholdEffective()
if backlogSize < waitAt {
c.setStateParamsMu.Unlock()
return
}

// Tell future insertions to start waiting. This caller falls through so
// there is guaranteed to be a pending completion for a future batch to
// process, even if the current in-flight batch fails.
// initBacklogWaitLocked starts a backlog wait gate and must be called with
// setStateParamsMu held.
func (c *BatchCompleter) initBacklogWaitLocked(ctx context.Context, backlogSize, waitAt int) chan struct{} {
c.waitOnBacklogChan = make(chan struct{})
c.waitOnBacklogWaiting = true
if backlogSize >= c.maxBacklog {
Expand All @@ -666,7 +636,7 @@ func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) {
"backlog_wait_threshold", waitAt,
)
}
c.setStateParamsMu.Unlock()
return c.waitOnBacklogChan
}

func isNonRetryableCompleterError(err error) bool {
Expand Down
1 change: 0 additions & 1 deletion internal/jobcompleter/job_completer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,6 @@ func TestBatchCompleter_NonRetryableCompletionFailureDoesNotRequeueBatch(t *test

completer.setStateParamsMu.RLock()
require.Empty(t, completer.setStateParams)
require.Empty(t, completer.setStateStartTimes)
completer.setStateParamsMu.RUnlock()
})
}
Expand Down
Loading