Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions indexer/aggregates_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,43 @@ func NewAggregatesCalculator(config config.Config) *AggregatesCalculator {
}
}

// aggregatesPassInterval paces successive full passes of the aggregates
// calculator. A full pass was observed taking ~10 min in production; the old
// `for {}` loop kicked off the next pass the instant the previous one returned,
// so it ran continuously and IO-starved the block-indexing loop. apps' legacy
// `update_aggregates` celery task ran on a fixed 10m beat (src/app.py
// beat_schedule), so we pace to the same cadence: wait until 10m has elapsed
// since the previous pass *started* before beginning the next. Because the work
// is a single serial loop, a pass can never overlap itself, so no mutex guard
// is needed — the pacing alone is what was missing.
const aggregatesPassInterval = 10 * time.Minute

func (a *AggregatesCalculator) Start(ctx context.Context) error {
a.logger.Info("Starting aggregates calculator")
go logging.SyncOnTicks(ctx, a.logger, time.Second*10)
// This job runs in a continous loop until the context is cancelled.
// This job runs in a continuous loop until the context is cancelled, paced
// at aggregatesPassInterval between the *starts* of consecutive passes. If a
// pass overruns the interval the next one starts immediately (no negative
// sleep), so we never fall behind — we just stop hot-looping when passes are
// fast.
for {
passStart := time.Now()
a.updateAggregatesJob.Run(ctx)

select {
case <-ctx.Done():
a.logger.Info("Shutting down aggregates calculator")
return ctx.Err()
default:
a.updateAggregatesJob.Run(ctx)
}

if wait := aggregatesPassInterval - time.Since(passStart); wait > 0 {
select {
case <-ctx.Done():
a.logger.Info("Shutting down aggregates calculator")
return ctx.Err()
case <-time.After(wait):
}
}
}
}
Expand Down
27 changes: 26 additions & 1 deletion indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@ func NewIndexer(cfg config.Config) *CoreIndexer {
if err != nil {
panic(fmt.Errorf("error parsing database URL: %w", err))
}
// This single write pool is shared by ~9 parity jobs scheduled in
// startParityJobs (HourlyPlayCounts, PrunePlays, UserListeningHistory,
// Trending, UpdateDelistStatuses, IndexChallenges, EngagementNotifications,
// ListenStreakReminder, RemixContestNotifications). pgx defaults MaxConns to
// max(4, runtime.NumCPU()), so on a small box that floor leaves jobs queueing
// for connections behind each other and serializing work that should overlap
// (a single job holding a tx for minutes — e.g. IndexChallenges — can starve
// the rest). Pin a higher floor so the jobs don't fight over a handful of
// connections. Mirrors the explicit-MaxConns pattern in
// solana/indexer/solana_indexer.go. 20 is a conservative ceiling, well under
// Postgres max_connections while giving each concurrent job room. We only
// raise the floor, so a DB URL that already asks for more via pool_max_conns
// is respected.
const defaultJobsPoolMaxConns = 20
if connConfig.MaxConns < defaultJobsPoolMaxConns {
connConfig.MaxConns = defaultJobsPoolMaxConns
}
pool, err := pgxpool.NewWithConfig(context.Background(), connConfig)
if err != nil {
panic(fmt.Errorf("error connecting to database: %w", err))
Expand Down Expand Up @@ -156,8 +173,16 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) {

// Reconcile derived challenge state from source tables. Per-challenge
// scanners live in api/jobs/challenges/.
//
// Running every processor on one shared tick forced a single cadence on a
// mix of cheap checkpoint-incremental processors (near-real-time, idle ticks
// ~free) and a few bounded full-scan ones — a slow processor delayed the
// fast ones and all the DB work bunched into one burst. Schedule() instead
// runs each cadence group on its own goroutine and interval (see
// jobs/index_challenges.go), so the real-time challenges stay real-time
// without the heavy ones hot-looping.
jobs.NewIndexChallengesJob(ci.Config, ci.pool).
ScheduleEvery(ctx, 30*time.Second)
Schedule(ctx)

// Time-based notifications that the legacy Python beat produced. Unlike
// the event-driven notifications (handled by DB triggers), these fire on
Expand Down
194 changes: 114 additions & 80 deletions jobs/index_challenges.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package jobs

import (
"context"
"fmt"
"sync"
"time"

"api.audius.co/config"
Expand All @@ -13,120 +11,156 @@ import (
"go.uber.org/zap"
)

// IndexChallengesJob runs all registered challenge processors on a tick.
// Each processor runs inside its own pgx transaction; failures in one
// don't stop the others.
// IndexChallengesJob reconciles derived challenge state from source tables.
//
// Mirrors apps' index_challenges celery task in role but not implementation:
// where apps drains a Redis queue of dispatched events, we reconcile derived
// state from source tables. See package docs in jobs/challenges/processor.go
// for the rationale.
// state from source tables. See package docs in jobs/challenges/processor.go.
//
// Scheduling model: rather than one tick that runs every processor back-to-back
// (which made a slow processor delay the cheap real-time ones and bunched all
// the DB work into one burst), each challengeGroup runs on its own cadence in
// its own goroutine. Most groups hold a single processor; processors with an
// ordering dependency (the play-count milestones, where p2 requires p1) share a
// group so they run in sequence within one tick. Independent goroutines plus a
// small startup stagger keep the groups from stomping on each other.
type IndexChallengesJob struct {
pool database.DbPool
logger *zap.Logger
processors []challenges.Processor
pool database.DbPool
logger *zap.Logger
groups []challengeGroup
}

mutex sync.Mutex
isRunning bool
// challengeGroup is one or more processors that run together on a shared
// cadence. Processors in a group run sequentially in registration order (so an
// intra-group dependency like play-count p1→p2→p3 is honored); separate groups
// run concurrently on their own goroutines.
type challengeGroup struct {
name string
interval time.Duration
procs []challenges.Processor
}

// NewIndexChallengesJob constructs the umbrella job with Phase 1 processors
// pre-wired.
const (
// Checkpoint-incremental processors: each tick is an index range scan over
// rows changed since the last checkpoint, so an idle tick is nearly free and
// a just-earned challenge is picked up on the next tick. Run near-real-time.
challengeFastInterval = 30 * time.Second

// Bounded full-scan processors: rescan a constrained source set (verified
// artists, top-N trending, current contest events) each tick. Not free, but
// cheap enough that a couple of minutes keeps them timely without hot-looping.
challengeMediumInterval = 2 * time.Minute

// Trending reward processors only mint rows once per week (Friday UTC); off
// that window each tick is a guard check, so a slow cadence is plenty.
challengeSlowInterval = 10 * time.Minute
)

// NewIndexChallengesJob constructs the umbrella job with all processors wired
// into cadence groups.
func NewIndexChallengesJob(cfg config.Config, pool database.DbPool) *IndexChallengesJob {
return &IndexChallengesJob{
pool: pool,
logger: logging.NewZapLogger(cfg).Named("IndexChallengesJob"),
processors: []challenges.Processor{
// Phase 1
&challenges.TrackUploadProcessor{},
&challenges.FirstPlaylistProcessor{},
&challenges.ProfileCompletionProcessor{},
&challenges.ProfileVerifiedProcessor{},
&challenges.ListenStreakProcessor{},
groups := []challengeGroup{
// Fast / checkpoint-incremental — one group per processor at 30s.
{"track_upload", challengeFastInterval, []challenges.Processor{&challenges.TrackUploadProcessor{}}},
{"first_playlist", challengeFastInterval, []challenges.Processor{&challenges.FirstPlaylistProcessor{}}},
{"profile_completion", challengeFastInterval, []challenges.Processor{&challenges.ProfileCompletionProcessor{}}},
{"listen_streak", challengeFastInterval, []challenges.Processor{&challenges.ListenStreakProcessor{}}},
{"cosign", challengeFastInterval, []challenges.Processor{&challenges.CosignProcessor{}}},
{"audio_matching_buyer", challengeFastInterval, []challenges.Processor{challenges.NewAudioMatchingBuyerProcessor()}},
{"audio_matching_seller", challengeFastInterval, []challenges.Processor{challenges.NewAudioMatchingSellerProcessor()}},
{"mobile_install", challengeFastInterval, []challenges.Processor{&challenges.MobileInstallProcessor{}}},
{"referral", challengeFastInterval, []challenges.Processor{challenges.NewReferralProcessor()}},
{"verified_referral", challengeFastInterval, []challenges.Processor{challenges.NewVerifiedReferralProcessor()}},
{"referred", challengeFastInterval, []challenges.Processor{&challenges.ReferredProcessor{}}},

// Medium / bounded full-scan — 2m each.
{"profile_verified", challengeMediumInterval, []challenges.Processor{&challenges.ProfileVerifiedProcessor{}}},
{"first_weekly_comment", challengeMediumInterval, []challenges.Processor{&challenges.FirstWeeklyCommentProcessor{}}},
{"comment_pin", challengeMediumInterval, []challenges.Processor{&challenges.CommentPinProcessor{}}},
{"tastemaker", challengeMediumInterval, []challenges.Processor{&challenges.TastemakerProcessor{}}},
{"remix_contest_winner", challengeMediumInterval, []challenges.Processor{&challenges.RemixContestWinnerProcessor{}}},
// Play-count milestones share a group: p2 requires p1 complete and p3
// requires p2, so they must run in order within a single tick.
{"play_count_milestones", challengeMediumInterval, []challenges.Processor{
challenges.NewPlayCount250Processor(),
challenges.NewPlayCount1000Processor(),
challenges.NewPlayCount10000Processor(),
challenges.NewTrendingTrackProcessor(),
challenges.NewTrendingUndergroundProcessor(),
challenges.NewTrendingPlaylistProcessor(),
// Phase 2
&challenges.FirstWeeklyCommentProcessor{},
&challenges.CommentPinProcessor{},
&challenges.CosignProcessor{},
&challenges.TastemakerProcessor{},
&challenges.RemixContestWinnerProcessor{},
challenges.NewAudioMatchingBuyerProcessor(),
challenges.NewAudioMatchingSellerProcessor(),
// Phase 3 (signal-driven)
&challenges.MobileInstallProcessor{},
challenges.NewReferralProcessor(),
challenges.NewVerifiedReferralProcessor(),
&challenges.ReferredProcessor{},
},
}},

// Slow / weekly reward processors — 10m each.
{"trending_track", challengeSlowInterval, []challenges.Processor{challenges.NewTrendingTrackProcessor()}},
{"trending_underground", challengeSlowInterval, []challenges.Processor{challenges.NewTrendingUndergroundProcessor()}},
{"trending_playlist", challengeSlowInterval, []challenges.Processor{challenges.NewTrendingPlaylistProcessor()}},
}

return &IndexChallengesJob{
pool: pool,
logger: logging.NewZapLogger(cfg).Named("IndexChallengesJob"),
groups: groups,
}
}

// ScheduleEvery runs the job every `interval` until the context is cancelled.
func (j *IndexChallengesJob) ScheduleEvery(ctx context.Context, interval time.Duration) *IndexChallengesJob {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
j.Run(ctx)
case <-ctx.Done():
j.logger.Info("Job shutting down")
return
}
}
}()
// Schedule launches one goroutine per cadence group, each running its
// processors on the group's interval until the context is cancelled. Returns
// the job so callers can chain.
func (j *IndexChallengesJob) Schedule(ctx context.Context) *IndexChallengesJob {
for i := range j.groups {
// Stagger startup so the groups don't all fire their first run at once.
offset := time.Duration(i) * time.Second
go j.runGroupLoop(ctx, j.groups[i], offset)
}
return j
}

// Run executes the job once.
func (j *IndexChallengesJob) Run(ctx context.Context) {
if err := j.run(ctx); err != nil {
j.logger.Error("Job run failed", zap.Error(err))
// runGroupLoop runs one group: an initial (staggered) pass, then a pass on each
// tick. Because a group runs in a single goroutine, its passes can never
// overlap — if a pass overruns the interval the Ticker drops the missed ticks.
func (j *IndexChallengesJob) runGroupLoop(ctx context.Context, g challengeGroup, offset time.Duration) {
select {
case <-ctx.Done():
return
case <-time.After(offset):
}
}

func (j *IndexChallengesJob) run(ctx context.Context) error {
j.mutex.Lock()
if j.isRunning {
j.mutex.Unlock()
return fmt.Errorf("job is already running")
j.runGroup(ctx, g)

ticker := time.NewTicker(g.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
j.logger.Info("Challenge group shutting down", zap.String("group", g.name))
return
case <-ticker.C:
j.runGroup(ctx, g)
}
}
j.isRunning = true
j.mutex.Unlock()
defer func() {
j.mutex.Lock()
j.isRunning = false
j.mutex.Unlock()
}()
}

// runGroup runs every processor in a group in order, each in its own
// transaction. A failure in one processor doesn't stop the others.
func (j *IndexChallengesJob) runGroup(ctx context.Context, g challengeGroup) {
start := time.Now()
var anyErr error
for _, p := range j.processors {
for _, p := range g.procs {
if err := j.runProcessor(ctx, p); err != nil {
j.logger.Error("processor failed",
zap.String("group", g.name),
zap.String("challenge_id", p.ChallengeID()),
zap.Error(err))
anyErr = err
// Continue — one bad processor shouldn't kill the rest.
}
}
j.logger.Info("Reconciled challenges",
zap.Int("processors", len(j.processors)),
j.logger.Info("Reconciled challenge group",
zap.String("group", g.name),
zap.Int("processors", len(g.procs)),
zap.Duration("duration", time.Since(start)))
return anyErr
}

// runProcessor runs a single processor in its own transaction.
func (j *IndexChallengesJob) runProcessor(ctx context.Context, p challenges.Processor) error {
tx, err := j.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
return err
}
defer tx.Rollback(ctx)
if err := p.Reconcile(ctx, tx); err != nil {
Expand Down
Loading