Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 93 additions & 32 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,18 @@ type Config struct {
// Jobs may have their own specific hooks by implementing JobArgsWithHooks.
Hooks []rivertype.Hook

// HardStopTimeout is the maximum amount of time that the client will wait
// after job contexts are cancelled during shutdown before forcing jobs still
// running to an errored state. This hard stop phase lets jobs be retried
// immediately on the next client start instead of waiting for rescue.
//
// The timer starts only after a soft stop has begun by cancelling job
// contexts, like after SoftStopTimeout elapses, StopAndCancel is called, or
// the Start context is cancelled without SoftStopTimeout configured.
//
// Defaults to no timeout (hard stop disabled).
HardStopTimeout time.Duration

// Logger is the structured logger to use for logging purposes. If none is
// specified, logs will be emitted to STDOUT with messages at warn level
// or higher.
Expand Down Expand Up @@ -330,11 +342,9 @@ type Config struct {
Schema string

// SoftStopTimeout is the maximum amount of time that the client will wait
// for running jobs to finish during a stop before their contexts are
// cancelled. After the timeout elapses, the client escalates to a hard stop
// by cancelling the context of all running jobs. This applies regardless of
// how stop is initiated — whether by calling Stop, StopAndCancel, or by
// cancelling the context passed to Start.
// for running jobs to finish during a graceful stop before entering soft
// stop by cancelling job contexts. This applies when stop is initiated by
// calling Stop or by cancelling the context passed to Start.
//
// In combination with signal.NotifyContext on the context passed to Start,
// this can simplify graceful stop to:
Expand All @@ -345,12 +355,12 @@ type Config struct {
// if err := client.Start(ctx); err != nil { ... }
// <-client.Stopped()
//
// The signal cancels the Start context, which initiates a soft stop. If
// The signal cancels the Start context, which initiates a graceful stop. If
// running jobs haven't finished after SoftStopTimeout, their contexts are
// automatically cancelled to trigger a hard stop.
// cancelled.
//
// StopAndCancel bypasses the timeout entirely and cancels job contexts
// immediately.
// StopAndCancel cancels job contexts immediately instead of waiting for
// SoftStopTimeout.
//
// Defaults to no timeout (wait indefinitely for jobs to finish).
SoftStopTimeout time.Duration
Expand Down Expand Up @@ -468,6 +478,7 @@ func (c *Config) WithDefaults() *Config {
ErrorHandler: c.ErrorHandler,
FetchCooldown: cmp.Or(c.FetchCooldown, FetchCooldownDefault),
FetchPollInterval: cmp.Or(c.FetchPollInterval, FetchPollIntervalDefault),
HardStopTimeout: c.HardStopTimeout,
ID: valutil.ValOrDefaultFunc(c.ID, func() string { return defaultClientID(time.Now().UTC()) }),
Hooks: c.Hooks,
JobInsertMiddleware: c.JobInsertMiddleware,
Expand Down Expand Up @@ -515,6 +526,9 @@ func (c *Config) validate() error {
if c.FetchPollInterval < c.FetchCooldown {
return fmt.Errorf("FetchPollInterval cannot be shorter than FetchCooldown (%s)", c.FetchCooldown)
}
if c.HardStopTimeout < 0 {
return errors.New("HardStopTimeout cannot be less than zero")
}
if len(c.ID) > 100 {
return errors.New("ID cannot be longer than 100 characters")
}
Expand Down Expand Up @@ -547,6 +561,9 @@ func (c *Config) validate() error {
if c.Schema != "" && !postgresSchemaNameRE.MatchString(c.Schema) {
return errors.New("Schema name can only contain letters, numbers, and underscores, and must start with a letter or underscore")
}
if c.SoftStopTimeout < 0 {
return errors.New("SoftStopTimeout cannot be less than zero")
}

for queue, queueConfig := range c.Queues {
if err := queueConfig.validate(queue, c.FetchCooldown, c.FetchPollInterval); err != nil {
Expand Down Expand Up @@ -1048,10 +1065,12 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// A graceful shutdown stops fetching new jobs but allows any previously fetched
// jobs to complete. This can be initiated with the Stop method.
//
// A more abrupt shutdown can be achieved by either cancelling the provided
// context or by calling StopAndCancel. This will not only stop fetching new
// jobs, but will also cancel the context for any currently-running jobs. If
// using StopAndCancel, there's no need to also call Stop.
// A soft stop cancels job contexts after fetching has stopped. It can be
// initiated by calling StopAndCancel, by cancelling the provided context when
// SoftStopTimeout is not configured, or by waiting for SoftStopTimeout to elapse
// during graceful stop. If HardStopTimeout is configured, jobs still running
// after that timeout will be forced into an errored state. If using
// StopAndCancel, there's no need to also call Stop.
func (c *Client[TTx]) Start(ctx context.Context) error {
fetchCtx, shouldStart, started, stopped := c.baseStartStop.StartInit(ctx)
if !shouldStart {
Expand All @@ -1065,9 +1084,13 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// sure to take a channel reference before finishing stopped.
c.stopped = c.baseStartStop.StoppedUnsafe()

producersAsServices := func() []startstop.Service {
producers := func() []*producer {
return maputil.Values(c.producersByQueueName)
}

producersAsServices := func(producers []*producer) []startstop.Service {
return sliceutil.Map(
maputil.Values(c.producersByQueueName),
producers,
func(p *producer) startstop.Service { return p },
)
}
Expand Down Expand Up @@ -1121,8 +1144,8 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// We use separate contexts for fetching and working to allow for a
// graceful stop. When SoftStopTimeout is configured, the work context
// is detached from the start context so that cancelling the start
// context initiates a soft stop (with timeout escalation) rather than
// an immediate hard stop. When SoftStopTimeout is not configured, the
// context initiates a graceful stop (with timeout escalation) rather
// than an immediate soft stop. When SoftStopTimeout is not configured, the
// work context inherits from the start context to preserve the
// existing behavior where cancelling the start context is equivalent
// to StopAndCancel.
Expand All @@ -1145,7 +1168,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
for _, producer := range c.producersByQueueName {
if err := producer.StartWorkContext(fetchCtx, workCtx); err != nil {
workCancel(err)
startstop.StopAllParallel(producersAsServices()...)
startstop.StopAllParallel(producersAsServices(producers())...)
stopServicesOnError()
return err
}
Expand All @@ -1167,7 +1190,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// Generate producer services while c.queues.startStopMu.Lock() is still
// held. This is used for WaitAllStarted below, but don't use it elsewhere
// because new producers may have been added while the client is running.
producerServices := producersAsServices()
producerServices := producersAsServices(producers())

go func() {
// Wait for all subservices to start up before signaling our own start.
Expand All @@ -1194,22 +1217,57 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
c.queues.startStopMu.Lock()
defer c.queues.startStopMu.Unlock()

producerList := producers()

hardStopTimerCtx, hardStopTimerCancel := context.WithCancel(context.WithoutCancel(ctx))
defer hardStopTimerCancel()

startHardStopTimer := sync.OnceFunc(func() {
if c.config.HardStopTimeout <= 0 {
return
}

go func() {
timer := time.NewTimer(c.config.HardStopTimeout)
defer timer.Stop()

select {
case <-timer.C:
c.baseService.Logger.WarnContext(ctx, c.baseService.Name+": Hard stop timeout; setting remaining jobs to errored", slog.Duration("hard_stop_timeout", c.config.HardStopTimeout))
for _, producer := range producerList {
producer.hardStop()
}
case <-hardStopTimerCtx.Done():
}
}()
})

workCtx := c.queues.workCtx
go func() {
select {
case <-workCtx.Done():
startHardStopTimer()
case <-hardStopTimerCtx.Done():
}
}()

// If SoftStopTimeout is configured, start a timer that will cancel
// the work context (escalating to a hard stop) if producers don't
// finish in time. StopAndCancel also calls workCancel, in which case
// this timer is a harmless no-op because the context is already done.
// the work context if producers don't finish in time. Once the work
// context is cancelled, the optional hard stop timer starts.
if c.config.SoftStopTimeout > 0 {
softStopTimer := time.AfterFunc(c.config.SoftStopTimeout, func() {
c.baseService.Logger.WarnContext(ctx, c.baseService.Name+": Soft stop timeout; cancelling remaining job contexts", slog.Duration("soft_stop_timeout", c.config.SoftStopTimeout))
c.workCancel(rivercommon.ErrStop)
startHardStopTimer()
})
defer softStopTimer.Stop()
}

// On stop, have the producers stop fetching first of all.
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers")
startstop.StopAllParallel(producersAsServices()...)
startstop.StopAllParallel(producersAsServices(producerList)...)
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": All producers stopped")
hardStopTimerCancel()

c.workCancel(rivercommon.ErrStop)

Expand Down Expand Up @@ -1238,12 +1296,14 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// complete before exiting. If the provided context is done before shutdown has
// completed, Stop will return immediately with the context's error.
//
// If SoftStopTimeout is configured, running job contexts will be automatically
// cancelled after the timeout elapses, escalating to a hard stop. This also
// If SoftStopTimeout is configured, jobs still running after the timeout
// elapses have their contexts cancelled. If HardStopTimeout is also configured,
// jobs still running after that second timeout are forced into an errored state
// so they can be retried immediately on the next client start. This also
// applies when stop is initiated by cancelling the context passed to Start.
//
// There's no need to call this method if a hard stop has already been initiated
// by cancelling the context passed to Start or by calling StopAndCancel.
// There's no need to call this method if shutdown has already been initiated by
// cancelling the context passed to Start or by calling StopAndCancel.
func (c *Client[TTx]) Stop(ctx context.Context) error {
shouldStop, stopped, finalizeStop := c.baseStartStop.StopInit()
if !shouldStop {
Expand All @@ -1262,10 +1322,11 @@ func (c *Client[TTx]) Stop(ctx context.Context) error {

// StopAndCancel shuts down the client and cancels all work in progress. It is a
// more aggressive stop than Stop because the contexts for any in-progress jobs
// are cancelled. However, it still waits for jobs to complete before returning,
// even though their contexts are cancelled. If the provided context is done
// before shutdown has completed, StopAndCancel will return immediately with the
// context's error.
// are cancelled immediately. If HardStopTimeout is configured, jobs that still
// remain running after the timeout are hard-stopped; otherwise, StopAndCancel
// waits for jobs to complete even though their contexts are cancelled. If the
// provided context is done before shutdown has completed, StopAndCancel will
// return immediately with the context's error.
//
// This can also be initiated by cancelling the context passed to Start. There is
// no need to call this method if the context passed to Start is cancelled
Expand All @@ -1277,7 +1338,7 @@ func (c *Client[TTx]) Stop(ctx context.Context) error {
// graceful stop semantics without requiring manual orchestration of Stop and
// StopAndCancel.
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Soft stop started; cancelling all work")
c.workCancel(rivercommon.ErrStop)

shouldStop, stopped, finalizeStop := c.baseStartStop.StopInit()
Expand Down
Loading
Loading