diff --git a/indexer/aggregates_calculator.go b/indexer/aggregates_calculator.go index 5a54551e..40d0a035 100644 --- a/indexer/aggregates_calculator.go +++ b/indexer/aggregates_calculator.go @@ -43,17 +43,43 @@ func NewAggregatesCalculator(config config.Config) *AggregatesCalculator { } } +// aggregatesPassInterval paces successive full passes of the aggregates +// calculator. A full pass was observed taking ~10 min in production; the old +// `for {}` loop kicked off the next pass the instant the previous one returned, +// so it ran continuously and IO-starved the block-indexing loop. apps' legacy +// `update_aggregates` celery task ran on a fixed 10m beat (src/app.py +// beat_schedule), so we pace to the same cadence: wait until 10m has elapsed +// since the previous pass *started* before beginning the next. Because the work +// is a single serial loop, a pass can never overlap itself, so no mutex guard +// is needed — the pacing alone is what was missing. +const aggregatesPassInterval = 10 * time.Minute + func (a *AggregatesCalculator) Start(ctx context.Context) error { a.logger.Info("Starting aggregates calculator") go logging.SyncOnTicks(ctx, a.logger, time.Second*10) - // This job runs in a continous loop until the context is cancelled. + // This job runs in a continuous loop until the context is cancelled, paced + // at aggregatesPassInterval between the *starts* of consecutive passes. If a + // pass overruns the interval the next one starts immediately (no negative + // sleep), so we never fall behind — we just stop hot-looping when passes are + // fast. for { + passStart := time.Now() + a.updateAggregatesJob.Run(ctx) + select { case <-ctx.Done(): a.logger.Info("Shutting down aggregates calculator") return ctx.Err() default: - a.updateAggregatesJob.Run(ctx) + } + + if wait := aggregatesPassInterval - time.Since(passStart); wait > 0 { + select { + case <-ctx.Done(): + a.logger.Info("Shutting down aggregates calculator") + return ctx.Err() + case <-time.After(wait): + } } } } diff --git a/indexer/indexer.go b/indexer/indexer.go index 94721b5f..8828a33e 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -40,6 +40,23 @@ func NewIndexer(cfg config.Config) *CoreIndexer { if err != nil { panic(fmt.Errorf("error parsing database URL: %w", err)) } + // This single write pool is shared by ~9 parity jobs scheduled in + // startParityJobs (HourlyPlayCounts, PrunePlays, UserListeningHistory, + // Trending, UpdateDelistStatuses, IndexChallenges, EngagementNotifications, + // ListenStreakReminder, RemixContestNotifications). pgx defaults MaxConns to + // max(4, runtime.NumCPU()), so on a small box that floor leaves jobs queueing + // for connections behind each other and serializing work that should overlap + // (a single job holding a tx for minutes — e.g. IndexChallenges — can starve + // the rest). Pin a higher floor so the jobs don't fight over a handful of + // connections. Mirrors the explicit-MaxConns pattern in + // solana/indexer/solana_indexer.go. 20 is a conservative ceiling, well under + // Postgres max_connections while giving each concurrent job room. We only + // raise the floor, so a DB URL that already asks for more via pool_max_conns + // is respected. + const defaultJobsPoolMaxConns = 20 + if connConfig.MaxConns < defaultJobsPoolMaxConns { + connConfig.MaxConns = defaultJobsPoolMaxConns + } pool, err := pgxpool.NewWithConfig(context.Background(), connConfig) if err != nil { panic(fmt.Errorf("error connecting to database: %w", err)) @@ -156,8 +173,16 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) { // Reconcile derived challenge state from source tables. Per-challenge // scanners live in api/jobs/challenges/. + // + // Running every processor on one shared tick forced a single cadence on a + // mix of cheap checkpoint-incremental processors (near-real-time, idle ticks + // ~free) and a few bounded full-scan ones — a slow processor delayed the + // fast ones and all the DB work bunched into one burst. Schedule() instead + // runs each cadence group on its own goroutine and interval (see + // jobs/index_challenges.go), so the real-time challenges stay real-time + // without the heavy ones hot-looping. jobs.NewIndexChallengesJob(ci.Config, ci.pool). - ScheduleEvery(ctx, 30*time.Second) + Schedule(ctx) // Time-based notifications that the legacy Python beat produced. Unlike // the event-driven notifications (handled by DB triggers), these fire on diff --git a/jobs/index_challenges.go b/jobs/index_challenges.go index 7ebaa7cf..89e1f9e9 100644 --- a/jobs/index_challenges.go +++ b/jobs/index_challenges.go @@ -2,8 +2,6 @@ package jobs import ( "context" - "fmt" - "sync" "time" "api.audius.co/config" @@ -13,120 +11,156 @@ import ( "go.uber.org/zap" ) -// IndexChallengesJob runs all registered challenge processors on a tick. -// Each processor runs inside its own pgx transaction; failures in one -// don't stop the others. +// IndexChallengesJob reconciles derived challenge state from source tables. // // Mirrors apps' index_challenges celery task in role but not implementation: // where apps drains a Redis queue of dispatched events, we reconcile derived -// state from source tables. See package docs in jobs/challenges/processor.go -// for the rationale. +// state from source tables. See package docs in jobs/challenges/processor.go. +// +// Scheduling model: rather than one tick that runs every processor back-to-back +// (which made a slow processor delay the cheap real-time ones and bunched all +// the DB work into one burst), each challengeGroup runs on its own cadence in +// its own goroutine. Most groups hold a single processor; processors with an +// ordering dependency (the play-count milestones, where p2 requires p1) share a +// group so they run in sequence within one tick. Independent goroutines plus a +// small startup stagger keep the groups from stomping on each other. type IndexChallengesJob struct { - pool database.DbPool - logger *zap.Logger - processors []challenges.Processor + pool database.DbPool + logger *zap.Logger + groups []challengeGroup +} - mutex sync.Mutex - isRunning bool +// challengeGroup is one or more processors that run together on a shared +// cadence. Processors in a group run sequentially in registration order (so an +// intra-group dependency like play-count p1→p2→p3 is honored); separate groups +// run concurrently on their own goroutines. +type challengeGroup struct { + name string + interval time.Duration + procs []challenges.Processor } -// NewIndexChallengesJob constructs the umbrella job with Phase 1 processors -// pre-wired. +const ( + // Checkpoint-incremental processors: each tick is an index range scan over + // rows changed since the last checkpoint, so an idle tick is nearly free and + // a just-earned challenge is picked up on the next tick. Run near-real-time. + challengeFastInterval = 30 * time.Second + + // Bounded full-scan processors: rescan a constrained source set (verified + // artists, top-N trending, current contest events) each tick. Not free, but + // cheap enough that a couple of minutes keeps them timely without hot-looping. + challengeMediumInterval = 2 * time.Minute + + // Trending reward processors only mint rows once per week (Friday UTC); off + // that window each tick is a guard check, so a slow cadence is plenty. + challengeSlowInterval = 10 * time.Minute +) + +// NewIndexChallengesJob constructs the umbrella job with all processors wired +// into cadence groups. func NewIndexChallengesJob(cfg config.Config, pool database.DbPool) *IndexChallengesJob { - return &IndexChallengesJob{ - pool: pool, - logger: logging.NewZapLogger(cfg).Named("IndexChallengesJob"), - processors: []challenges.Processor{ - // Phase 1 - &challenges.TrackUploadProcessor{}, - &challenges.FirstPlaylistProcessor{}, - &challenges.ProfileCompletionProcessor{}, - &challenges.ProfileVerifiedProcessor{}, - &challenges.ListenStreakProcessor{}, + groups := []challengeGroup{ + // Fast / checkpoint-incremental — one group per processor at 30s. + {"track_upload", challengeFastInterval, []challenges.Processor{&challenges.TrackUploadProcessor{}}}, + {"first_playlist", challengeFastInterval, []challenges.Processor{&challenges.FirstPlaylistProcessor{}}}, + {"profile_completion", challengeFastInterval, []challenges.Processor{&challenges.ProfileCompletionProcessor{}}}, + {"listen_streak", challengeFastInterval, []challenges.Processor{&challenges.ListenStreakProcessor{}}}, + {"cosign", challengeFastInterval, []challenges.Processor{&challenges.CosignProcessor{}}}, + {"audio_matching_buyer", challengeFastInterval, []challenges.Processor{challenges.NewAudioMatchingBuyerProcessor()}}, + {"audio_matching_seller", challengeFastInterval, []challenges.Processor{challenges.NewAudioMatchingSellerProcessor()}}, + {"mobile_install", challengeFastInterval, []challenges.Processor{&challenges.MobileInstallProcessor{}}}, + {"referral", challengeFastInterval, []challenges.Processor{challenges.NewReferralProcessor()}}, + {"verified_referral", challengeFastInterval, []challenges.Processor{challenges.NewVerifiedReferralProcessor()}}, + {"referred", challengeFastInterval, []challenges.Processor{&challenges.ReferredProcessor{}}}, + + // Medium / bounded full-scan — 2m each. + {"profile_verified", challengeMediumInterval, []challenges.Processor{&challenges.ProfileVerifiedProcessor{}}}, + {"first_weekly_comment", challengeMediumInterval, []challenges.Processor{&challenges.FirstWeeklyCommentProcessor{}}}, + {"comment_pin", challengeMediumInterval, []challenges.Processor{&challenges.CommentPinProcessor{}}}, + {"tastemaker", challengeMediumInterval, []challenges.Processor{&challenges.TastemakerProcessor{}}}, + {"remix_contest_winner", challengeMediumInterval, []challenges.Processor{&challenges.RemixContestWinnerProcessor{}}}, + // Play-count milestones share a group: p2 requires p1 complete and p3 + // requires p2, so they must run in order within a single tick. + {"play_count_milestones", challengeMediumInterval, []challenges.Processor{ challenges.NewPlayCount250Processor(), challenges.NewPlayCount1000Processor(), challenges.NewPlayCount10000Processor(), - challenges.NewTrendingTrackProcessor(), - challenges.NewTrendingUndergroundProcessor(), - challenges.NewTrendingPlaylistProcessor(), - // Phase 2 - &challenges.FirstWeeklyCommentProcessor{}, - &challenges.CommentPinProcessor{}, - &challenges.CosignProcessor{}, - &challenges.TastemakerProcessor{}, - &challenges.RemixContestWinnerProcessor{}, - challenges.NewAudioMatchingBuyerProcessor(), - challenges.NewAudioMatchingSellerProcessor(), - // Phase 3 (signal-driven) - &challenges.MobileInstallProcessor{}, - challenges.NewReferralProcessor(), - challenges.NewVerifiedReferralProcessor(), - &challenges.ReferredProcessor{}, - }, + }}, + + // Slow / weekly reward processors — 10m each. + {"trending_track", challengeSlowInterval, []challenges.Processor{challenges.NewTrendingTrackProcessor()}}, + {"trending_underground", challengeSlowInterval, []challenges.Processor{challenges.NewTrendingUndergroundProcessor()}}, + {"trending_playlist", challengeSlowInterval, []challenges.Processor{challenges.NewTrendingPlaylistProcessor()}}, + } + + return &IndexChallengesJob{ + pool: pool, + logger: logging.NewZapLogger(cfg).Named("IndexChallengesJob"), + groups: groups, } } -// ScheduleEvery runs the job every `interval` until the context is cancelled. -func (j *IndexChallengesJob) ScheduleEvery(ctx context.Context, interval time.Duration) *IndexChallengesJob { - go func() { - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - j.Run(ctx) - case <-ctx.Done(): - j.logger.Info("Job shutting down") - return - } - } - }() +// Schedule launches one goroutine per cadence group, each running its +// processors on the group's interval until the context is cancelled. Returns +// the job so callers can chain. +func (j *IndexChallengesJob) Schedule(ctx context.Context) *IndexChallengesJob { + for i := range j.groups { + // Stagger startup so the groups don't all fire their first run at once. + offset := time.Duration(i) * time.Second + go j.runGroupLoop(ctx, j.groups[i], offset) + } return j } -// Run executes the job once. -func (j *IndexChallengesJob) Run(ctx context.Context) { - if err := j.run(ctx); err != nil { - j.logger.Error("Job run failed", zap.Error(err)) +// runGroupLoop runs one group: an initial (staggered) pass, then a pass on each +// tick. Because a group runs in a single goroutine, its passes can never +// overlap — if a pass overruns the interval the Ticker drops the missed ticks. +func (j *IndexChallengesJob) runGroupLoop(ctx context.Context, g challengeGroup, offset time.Duration) { + select { + case <-ctx.Done(): + return + case <-time.After(offset): } -} -func (j *IndexChallengesJob) run(ctx context.Context) error { - j.mutex.Lock() - if j.isRunning { - j.mutex.Unlock() - return fmt.Errorf("job is already running") + j.runGroup(ctx, g) + + ticker := time.NewTicker(g.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + j.logger.Info("Challenge group shutting down", zap.String("group", g.name)) + return + case <-ticker.C: + j.runGroup(ctx, g) + } } - j.isRunning = true - j.mutex.Unlock() - defer func() { - j.mutex.Lock() - j.isRunning = false - j.mutex.Unlock() - }() +} +// runGroup runs every processor in a group in order, each in its own +// transaction. A failure in one processor doesn't stop the others. +func (j *IndexChallengesJob) runGroup(ctx context.Context, g challengeGroup) { start := time.Now() - var anyErr error - for _, p := range j.processors { + for _, p := range g.procs { if err := j.runProcessor(ctx, p); err != nil { j.logger.Error("processor failed", + zap.String("group", g.name), zap.String("challenge_id", p.ChallengeID()), zap.Error(err)) - anyErr = err // Continue — one bad processor shouldn't kill the rest. } } - j.logger.Info("Reconciled challenges", - zap.Int("processors", len(j.processors)), + j.logger.Info("Reconciled challenge group", + zap.String("group", g.name), + zap.Int("processors", len(g.procs)), zap.Duration("duration", time.Since(start))) - return anyErr } // runProcessor runs a single processor in its own transaction. func (j *IndexChallengesJob) runProcessor(ctx context.Context, p challenges.Processor) error { tx, err := j.pool.Begin(ctx) if err != nil { - return fmt.Errorf("begin tx: %w", err) + return err } defer tx.Rollback(ctx) if err := p.Reconcile(ctx, tx); err != nil {