Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql
- Convert SQLite JSON columns to JSONB (including migration). [PR #1224](https://github.com/riverqueue/river/pull/1224).
- Change SQLite driver operations over to use bulk inserts where possible now that sqlc has better support for `json_each`. [PR #1276](https://github.com/riverqueue/river/pull/1276)
- Detect duplicate step names across `river.ResumableStep` and return a validation error. [PR #1281](https://github.com/riverqueue/river/pull/1281)
- Earlier backpressure from `BatchCompleter` when it's throughput is saturated with fewer warnings to console. [PR #1292](https://github.com/riverqueue/river/pull/1292)

### Fixed

Expand Down
179 changes: 141 additions & 38 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,11 @@ type BatchCompleter struct {
baseservice.BaseService
startstop.BaseStartStop

backlogWaitThreshold int // configurable for testing purposes; backlog at which completions start waiting for the completer to catch up
batchReadyChan chan struct{}
completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation
disableSleep bool // disable sleep in testing
maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted
maxBacklog int // configurable for testing purposes; emergency backlog threshold where a warning is logged
exec riverdriver.Executor
pilot riverpilot.Pilot
schema string
Expand All @@ -273,19 +275,22 @@ type BatchCompleter struct {

func NewBatchCompleter(archetype *baseservice.Archetype, schema string, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh SubscribeChan) *BatchCompleter {
const (
completionMaxSize = 5_000
maxBacklog = 20_000
completionMaxSize = 5_000
backlogWaitThreshold = completionMaxSize * 2
maxBacklog = 20_000
)

return baseservice.Init(archetype, &BatchCompleter{
completionMaxSize: completionMaxSize,
exec: exec,
maxBacklog: maxBacklog,
pilot: pilot,
schema: schema,
setStateParams: make(map[int64]*batchCompleterSetState),
setStateStartTimes: make(map[int64]time.Time),
subscribeCh: subscribeCh,
backlogWaitThreshold: backlogWaitThreshold,
batchReadyChan: make(chan struct{}, 1),
completionMaxSize: completionMaxSize,
exec: exec,
maxBacklog: maxBacklog,
pilot: pilot,
schema: schema,
setStateParams: make(map[int64]*batchCompleterSetState),
setStateStartTimes: make(map[int64]time.Time),
subscribeCh: subscribeCh,
})
}

Expand Down Expand Up @@ -330,6 +335,7 @@ func (c *BatchCompleter) Start(ctx context.Context) error {
}
return

case <-c.batchReadyChan:
case <-ticker.C:
}

Expand All @@ -340,7 +346,7 @@ func (c *BatchCompleter) Start(ctx context.Context) error {
// multiple of 5. So, jobs will be completed every 250ms even if the
// threshold hasn't been met.
const batchCompleterStartThreshold = 100
if backlogSize() < min(c.maxBacklog, batchCompleterStartThreshold) && numTicks != 0 && numTicks%5 != 0 {
if backlogSize() < min(c.backlogWaitThresholdEffective(), batchCompleterStartThreshold) && numTicks != 0 && numTicks%5 != 0 {
continue
}

Expand Down Expand Up @@ -391,6 +397,16 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
return nil
}

handleBatchError := func(err error) error {
if isNonRetryableCompleterError(err) {
c.releaseBacklogWaitIfReady(ctx)
return err
}

c.requeueBatch(ctx, setStateBatch, setStateStartTimes)
return err
}

// Complete a sub-batch with retries. Also helps reduce visual noise and
// increase readability of loop below.
completeSubBatch := func(batchParams *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) {
Expand Down Expand Up @@ -465,15 +481,15 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
}
jobRowsSubBatch, err := completeSubBatch(subBatch)
if err != nil {
return err
return handleBatchError(err)
}
jobRows = append(jobRows, jobRowsSubBatch...)
}
} else {
var err error
jobRows, err = completeSubBatch(params)
if err != nil {
return err
return handleBatchError(err)
}
}

Expand All @@ -494,7 +510,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
c.setStateParamsMu.Lock()
defer c.setStateParamsMu.Unlock()

if c.waitOnBacklogWaiting && len(c.setStateParams) < c.maxBacklog {
if c.waitOnBacklogWaiting && len(c.setStateParams) < c.backlogResumeThreshold() {
c.Logger.DebugContext(ctx, c.Name+": Disabling waitOnBacklog; ready to complete more jobs")
close(c.waitOnBacklogChan)
c.waitOnBacklogWaiting = false
Expand All @@ -504,6 +520,41 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
return nil
}

func (c *BatchCompleter) releaseBacklogWaitIfReady(ctx context.Context) {
c.setStateParamsMu.Lock()
defer c.setStateParamsMu.Unlock()

if c.waitOnBacklogWaiting && len(c.setStateParams) < c.backlogResumeThreshold() {
c.Logger.DebugContext(ctx, c.Name+": Disabling waitOnBacklog; ready to complete more jobs")
close(c.waitOnBacklogChan)
c.waitOnBacklogWaiting = false
}
}

func (c *BatchCompleter) requeueBatch(ctx context.Context, setStateBatch map[int64]*batchCompleterSetState, setStateStartTimes map[int64]time.Time) {
c.setStateParamsMu.Lock()
for id, setState := range setStateBatch {
if _, exists := c.setStateParams[id]; exists {
continue
}
c.setStateParams[id] = setState
c.setStateStartTimes[id] = setStateStartTimes[id]
}
backlogSize := len(c.setStateParams)
if c.waitOnBacklogWaiting && backlogSize < c.backlogResumeThreshold() {
c.Logger.DebugContext(ctx, c.Name+": Disabling waitOnBacklog; ready to complete more jobs")
close(c.waitOnBacklogChan)
c.waitOnBacklogWaiting = false
}
c.setStateParamsMu.Unlock()

if backlogSize >= c.batchReadyThreshold() {
c.signalBatchReady()
}

c.Logger.DebugContext(ctx, c.Name+": Requeued failed batch of job(s)", "num_jobs", len(setStateBatch))
}

func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error {
now := c.Time.Now()
// If we've built up too much of a backlog because the completer's fallen
Expand All @@ -512,22 +563,60 @@ func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta
c.waitOrInitBacklogChannel(ctx)

c.setStateParamsMu.Lock()
defer c.setStateParamsMu.Unlock()

statsSnapshot := *stats

c.setStateParams[params.ID] = &batchCompleterSetState{params, &statsSnapshot}
c.setStateStartTimes[params.ID] = now
backlogSize := len(c.setStateParams)
c.setStateParamsMu.Unlock()

if backlogSize >= c.batchReadyThreshold() {
c.signalBatchReady()
}

return nil
}

// backlogResumeThreshold returns the low-water mark below which waiting
// completers are released. Keeping this below the wait threshold avoids rapidly
// cycling between waiting and not waiting when the completer is near capacity.
func (c *BatchCompleter) backlogResumeThreshold() int {
return max(c.backlogWaitThresholdEffective()/2, 1)
}

// backlogWaitThresholdEffective returns the backlog size at which new
// completions should wait for the batch completer to catch up. It's capped at
// maxBacklog so tests and future configuration can't set a normal wait
// threshold beyond the emergency warning threshold.
func (c *BatchCompleter) backlogWaitThresholdEffective() int {
if c.backlogWaitThreshold <= 0 {
return c.maxBacklog
}
return min(c.backlogWaitThreshold, c.maxBacklog)
}

// batchReadyThreshold returns the backlog size at which the run loop should be
// nudged to process a batch immediately instead of waiting for its next ticker.
// It aims for a full database batch while still respecting low test thresholds.
func (c *BatchCompleter) batchReadyThreshold() int {
return min(c.completionMaxSize, c.backlogWaitThresholdEffective())
}

func (c *BatchCompleter) signalBatchReady() {
select {
case c.batchReadyChan <- struct{}{}:
default:
}
}

func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) {
c.setStateParamsMu.RLock()
var (
backlogSize = len(c.setStateParams)
waitChan = c.waitOnBacklogChan
waiting = c.waitOnBacklogWaiting
waitAt = c.backlogWaitThresholdEffective()
)
c.setStateParamsMu.RUnlock()

Expand All @@ -536,33 +625,52 @@ func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) {
return
}

// Not at max backlog. A little raciness is allowed here: multiple
// Not at the wait threshold. A little raciness is allowed here: multiple
// goroutines may have acquired the read lock above and seen a size under
// limit, but with all allowed to continue it could put the backlog over its
// maximum. The backlog will only be nominally over because generally max
// backlog >> max workers, so consider this okay.
if backlogSize < c.maxBacklog {
// wait threshold. The backlog will only be nominally over because generally
// the wait threshold >> max workers, so consider this okay.
if backlogSize < waitAt {
return
}

c.setStateParamsMu.Lock()
defer c.setStateParamsMu.Unlock()

// Check once more if another process has already started waiting (it's
// possible for multiple to race between the acquiring the lock above). If
// so, we fall through and allow this insertion to happen, even though it
// might bring the batch slightly over limit, because arranging the locks
// otherwise would get complicated.
if c.waitOnBacklogWaiting {
waitChan := c.waitOnBacklogChan
c.setStateParamsMu.Unlock()
<-waitChan
return
}
backlogSize = len(c.setStateParams)
waitAt = c.backlogWaitThresholdEffective()
if backlogSize < waitAt {
c.setStateParamsMu.Unlock()
return
}

// Tell all future insertions to start waiting. This one is allowed to fall
// through and succeed even though it may bring the batch a little over
// limit.
// Tell future insertions to start waiting. This caller falls through so
// there is guaranteed to be a pending completion for a future batch to
// process, even if the current in-flight batch fails.
c.waitOnBacklogChan = make(chan struct{})
c.waitOnBacklogWaiting = true
c.Logger.WarnContext(ctx, c.Name+": Hit maximum backlog; completions will wait until below threshold", "max_backlog", c.maxBacklog)
if backlogSize >= c.maxBacklog {
c.Logger.WarnContext(ctx, c.Name+": Hit maximum backlog; completions will wait until below threshold",
"backlog_size", backlogSize,
"backlog_wait_threshold", waitAt,
"max_backlog", c.maxBacklog,
)
} else {
c.Logger.DebugContext(ctx, c.Name+": Applying completion backlog pressure",
"backlog_resume_threshold", c.backlogResumeThreshold(),
"backlog_size", backlogSize,
"backlog_wait_threshold", waitAt,
)
}
c.setStateParamsMu.Unlock()
}

func isNonRetryableCompleterError(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, riverdriver.ErrClosedPool)
}

// As configured, total time asleep from initial attempt is ~7 seconds (1 + 2 +
Expand All @@ -586,13 +694,8 @@ func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseSer

retVal, err := retryFunc(ctx)
if err != nil {
// A cancelled context will never succeed, return immediately.
if errors.Is(err, context.Canceled) {
return defaultVal, err
}

// A closed pool will never succeed, return immediately.
if errors.Is(err, riverdriver.ErrClosedPool) {
// A cancelled context or a closed pool will never succeed.
if isNonRetryableCompleterError(err) {
return defaultVal, err
}

Expand Down
Loading
Loading