From dc4ce2f1a3c81f0b8d5731486f983c0d41c66283 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Mon, 1 Jun 2026 19:30:26 -0700 Subject: [PATCH] perf(api): make play-count milestones incremental, single pass The p1/p2/p3 play milestones (250/1k/10k plays, verified artists only) each ran a non-incremental GROUP BY every tick: a Parallel Seq Scan of ~485k tracks producing ~1,867 rows in ~49s, times three tiers. That one processor was the dominant cost of IndexChallengesJob. Merge the three into a single PlayCountMilestonesProcessor that checkpoints plays.id and recomputes only the verified artists whose tracks were played since the checkpoint, deriving all three tiers in one pass. Tier eligibility gates on the live count reaching the previous tier's threshold, which is equivalent to Python's "previous milestone complete" check and removes the cross-tick cascade. Against prod (read-only EXPLAIN ANALYZE): the dirty scan is ~1.4s for a 200k-play catch-up window (far less per normal tick) and the bounded recompute is ~113ms for 50 users, versus 48.7s x 3 every tick before. Migration 0215 seeds the new checkpoint to max(plays.id) so prod starts "from now" rather than backfilling the whole plays table. Co-Authored-By: Claude Opus 4.7 --- ..._seed_play_count_milestones_checkpoint.sql | 25 +++ jobs/challenges/play_count_milestones.go | 186 +++++++++++------- jobs/challenges/play_count_milestones_test.go | 72 +++++-- jobs/index_challenges.go | 4 +- 4 files changed, 202 insertions(+), 85 deletions(-) create mode 100644 ddl/migrations/0215_seed_play_count_milestones_checkpoint.sql diff --git a/ddl/migrations/0215_seed_play_count_milestones_checkpoint.sql b/ddl/migrations/0215_seed_play_count_milestones_checkpoint.sql new file mode 100644 index 00000000..cfa99140 --- /dev/null +++ b/ddl/migrations/0215_seed_play_count_milestones_checkpoint.sql @@ -0,0 +1,25 @@ +-- Seed the incremental checkpoint for the play-count milestone processor. +-- +-- The p1/p2/p3 play milestones (250/1k/10k plays, verified artists only) used to +-- run one non-incremental GROUP BY per tier every tick — a Parallel Seq Scan of +-- ~485k tracks producing ~1,867 rows in ~49s, times three. They were merged into +-- a single processor that checkpoints plays.id and recomputes only the verified +-- artists whose tracks were played since the checkpoint. See +-- jobs/challenges/play_count_milestones.go and jobs/challenges/incremental.go. +-- +-- A fresh checkpoint defaults to 0, which would walk the entire plays table from +-- the beginning the first time the job runs. We don't want that: the legacy +-- Python stack already populated user_challenges and the upserts are idempotent, +-- so a full historical re-derivation is pure redundant write load. Seed the +-- checkpoint to the current max(plays.id) so the processor starts "from now" and +-- only picks up new plays. +-- +-- ON CONFLICT DO NOTHING keeps this idempotent and never rewinds a checkpoint the +-- running job has already advanced. The max(id) probe is index-only against the +-- plays primary key. +-- +-- Checkpoint name must match playCountMilestonesCheckpoint in the processor. + +INSERT INTO indexing_checkpoints (tablename, last_checkpoint) +SELECT 'challenges:play_count_milestones:last_play_id', COALESCE((SELECT max(id) FROM plays), 0) +ON CONFLICT (tablename) DO NOTHING; diff --git a/jobs/challenges/play_count_milestones.go b/jobs/challenges/play_count_milestones.go index 4f2b6f42..c2d75cb2 100644 --- a/jobs/challenges/play_count_milestones.go +++ b/jobs/challenges/play_count_milestones.go @@ -7,79 +7,121 @@ import ( "github.com/jackc/pgx/v5" ) -// PlayCountMilestone implements challenges p1/p2/p3 — the 250/1k/10k play -// milestones from 2025 onward, restricted to verified artists. +// PlayCountMilestonesProcessor implements challenges p1/p2/p3 — the 250/1k/10k +// play milestones from 2025 onward, restricted to verified artists. // Mirrors apps/packages/discovery-provider/src/challenges/play_count_milestone_challenge_base.py. // -// The play count is summed from aggregate_monthly_plays where timestamp >= 2025-01-01, -// joined to tracks the user owns. Each milestone has a "previous milestone" -// requirement: p2 requires p1 complete, p3 requires p2 complete (matches -// Python). -type PlayCountMilestone struct { - ID string - Threshold int32 // step_count override (we still load from challenges row) - PreviousMilestone string - SpecifierMilestone string // appended to specifier; matches Python's "hex:250"/"hex:1000"/"hex:10000" -} +// All three tiers derive from the same number: a verified artist's total 2025+ +// play count, summed from aggregate_monthly_plays over the tracks they own. A +// tier is eligible once the count reaches the previous tier's threshold (p2 +// needs >=250, p3 needs >=1000), matching Python's "previous milestone +// complete" gating — since a milestone completes exactly when the count crosses +// its step_count, gating on the live count is equivalent and doesn't need a +// second tick for the p1->p2->p3 cascade to propagate. +// +// Incremental: the old design ran one non-incremental GROUP BY per tier every +// tick — a Parallel Seq Scan of ~485k tracks producing ~1,867 rows in ~49s, +// times three. We instead checkpoint plays.id and recompute only the verified +// artists whose tracks were played since the checkpoint, in a single pass for +// all three tiers. See incremental.go. +type PlayCountMilestonesProcessor struct{} -// Per-milestone factory funcs. Each returns a Processor. -func NewPlayCount250Processor() Processor { - return &PlayCountMilestone{ID: "p1", SpecifierMilestone: "250"} -} -func NewPlayCount1000Processor() Processor { - return &PlayCountMilestone{ID: "p2", PreviousMilestone: "p1", SpecifierMilestone: "1000"} +func NewPlayCountMilestonesProcessor() Processor { return &PlayCountMilestonesProcessor{} } + +// ChallengeID returns p1 as a representative id; it is only used for error +// logging by the umbrella job. Reconcile handles all three tiers. +func (p *PlayCountMilestonesProcessor) ChallengeID() string { return "p1" } + +const playCountMilestonesCheckpoint = "challenges:play_count_milestones:last_play_id" + +// playCountTiers are the milestone challenges in ascending order. specifier is +// appended to the per-user specifier and matches Python's "hex:250" form. +var playCountTiers = []struct { + id string + specifier string +}{ + {id: "p1", specifier: "250"}, + {id: "p2", specifier: "1000"}, + {id: "p3", specifier: "10000"}, } -func NewPlayCount10000Processor() Processor { - return &PlayCountMilestone{ID: "p3", PreviousMilestone: "p2", SpecifierMilestone: "10000"} + +type loadedTier struct { + id string + specifier string + stepCount int32 + amount int32 } -func (p *PlayCountMilestone) ChallengeID() string { return p.ID } +// playCountMilestonesDirtySQL surfaces verified artists whose tracks were played +// since the checkpoint. plays is append-only with a monotonic serial id, so +// plays.id is the high-water mark (aggregate_monthly_plays is updated in place +// and carries no monotonic column). The join filters to verified, non-deactivated +// artists so we only ever recompute users who can actually earn the reward. +const playCountMilestonesDirtySQL = ` + SELECT t.owner_id, p.id + FROM plays p + JOIN tracks t ON t.track_id = p.play_item_id + AND t.is_current = true + AND t.is_delete = false + JOIN users u ON u.user_id = t.owner_id + AND u.is_current = true + AND u.is_verified = true + AND u.is_deactivated = false + WHERE p.id > $1 + ORDER BY p.id ASC + LIMIT $2 +` -func (p *PlayCountMilestone) Reconcile(ctx context.Context, tx pgx.Tx) error { - c, ok, err := LoadChallenge(ctx, tx, p.ID) +func (p *PlayCountMilestonesProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + tiers, err := p.loadTiers(ctx, tx) if err != nil { - return fmt.Errorf("load challenge: %w", err) + return err } - if !ok || !c.Active || c.StepCount == nil { + if len(tiers) == 0 { return nil } - stepCount := *c.StepCount - amount := c.AmountInt() + return reconcileIncrementalUsers(ctx, tx, playCountMilestonesCheckpoint, playCountMilestonesDirtySQL, + func(ctx context.Context, tx pgx.Tx, userIDs []int64) error { + return p.recompute(ctx, tx, userIDs, tiers) + }) +} - // Find verified artists with their total 2025+ play count. We use - // aggregate_monthly_plays which is cheap to scan. - // - // The SQL only takes the (optional) previous-milestone id as a - // parameter; stepCount is applied in Go after the scan. - prevFilter := "true" - var prevArgs []any - if p.PreviousMilestone != "" { - prevFilter = `EXISTS ( - SELECT 1 FROM user_challenges uc - WHERE uc.challenge_id = $1 - AND uc.user_id = u.user_id - AND uc.is_complete = true - )` - prevArgs = []any{p.PreviousMilestone} +// loadTiers loads the active milestone challenges from the challenges table, in +// ascending order. Each tier's threshold and amount come from its row. +func (p *PlayCountMilestonesProcessor) loadTiers(ctx context.Context, tx pgx.Tx) ([]loadedTier, error) { + var out []loadedTier + for _, t := range playCountTiers { + c, ok, err := LoadChallenge(ctx, tx, t.id) + if err != nil { + return nil, fmt.Errorf("load challenge %s: %w", t.id, err) + } + if !ok || !c.Active || c.StepCount == nil { + continue + } + out = append(out, loadedTier{ + id: t.id, + specifier: t.specifier, + stepCount: *c.StepCount, + amount: c.AmountInt(), + }) } + return out, nil +} - query := fmt.Sprintf(` - SELECT u.user_id, COALESCE(SUM(amp.count), 0)::int AS play_count - FROM users u - JOIN tracks t ON t.owner_id = u.user_id - AND t.is_current = true - AND t.is_delete = false +func (p *PlayCountMilestonesProcessor) recompute(ctx context.Context, tx pgx.Tx, userIDs []int64, tiers []loadedTier) error { + // Total 2025+ play count for just the affected verified artists. Bounded by + // the dirty set (a handful of users per tick), this is a cheap indexed + // aggregation rather than the old full-table GROUP BY. + rows, err := tx.Query(ctx, ` + SELECT t.owner_id, COALESCE(SUM(amp.count), 0)::int AS play_count + FROM tracks t JOIN aggregate_monthly_plays amp ON amp.play_item_id = t.track_id - AND amp.timestamp >= DATE '2025-01-01' - WHERE u.is_current = true - AND u.is_verified = true - AND u.is_deactivated = false - AND %s - GROUP BY u.user_id - HAVING COALESCE(SUM(amp.count), 0) > 0 - `, prevFilter) - - rows, err := tx.Query(ctx, query, prevArgs...) + AND amp.timestamp >= DATE '2025-01-01' + WHERE t.owner_id = ANY($1) + AND t.is_current = true + AND t.is_delete = false + GROUP BY t.owner_id + `, userIDs) if err != nil { return fmt.Errorf("scan play counts: %w", err) } @@ -102,15 +144,27 @@ func (p *PlayCountMilestone) Reconcile(ctx context.Context, tx pgx.Tx) error { } for _, r := range results { - report := r.playCount - if report > stepCount { - report = stepCount + if r.playCount <= 0 { + continue } - specifier := SpecifierFromUserID(r.userID) + ":" + p.SpecifierMilestone - if err := UpsertUserChallenge(ctx, tx, - p.ID, specifier, r.userID, report, stepCount, amount, - ); err != nil { - return fmt.Errorf("upsert: %w", err) + // Walk tiers ascending; each tier is eligible only once the count + // reaches the previous tier's threshold (the p1->p2->p3 gate). + prevThreshold := int32(0) + for _, tier := range tiers { + if r.playCount < prevThreshold { + break + } + report := r.playCount + if report > tier.stepCount { + report = tier.stepCount + } + specifier := SpecifierFromUserID(r.userID) + ":" + tier.specifier + if err := UpsertUserChallenge(ctx, tx, + tier.id, specifier, r.userID, report, tier.stepCount, tier.amount, + ); err != nil { + return fmt.Errorf("upsert %s: %w", tier.id, err) + } + prevThreshold = tier.stepCount } } return nil diff --git a/jobs/challenges/play_count_milestones_test.go b/jobs/challenges/play_count_milestones_test.go index 3572bd72..0582327e 100644 --- a/jobs/challenges/play_count_milestones_test.go +++ b/jobs/challenges/play_count_milestones_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestPlayCount250_RequiresVerified(t *testing.T) { +func TestPlayCountMilestones_RequiresVerified(t *testing.T) { pool := withChallengesDB(t) t2025 := time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC) database.Seed(pool, database.FixtureMap{ @@ -27,9 +27,15 @@ func TestPlayCount250_RequiresVerified(t *testing.T) { {"play_item_id": 4001, "timestamp": t2025, "count": 300, "country": ""}, {"play_item_id": 4011, "timestamp": t2025, "count": 300, "country": ""}, }, + // plays drive the incremental dirty scan; one per track is enough to + // mark its owner dirty. + "plays": { + {"id": 1, "user_id": 999, "play_item_id": 4001, "created_at": t2025}, + {"id": 2, "user_id": 999, "play_item_id": 4011, "created_at": t2025}, + }, }) - runProcessor(t, pool, NewPlayCount250Processor()) + runProcessor(t, pool, NewPlayCountMilestonesProcessor()) r1, ok := queryUserChallenge(t, pool, "p1", fmt.Sprintf("%x", 400)+":250") require.True(t, ok, "verified user 400 should get a p1 row") @@ -41,29 +47,63 @@ func TestPlayCount250_RequiresVerified(t *testing.T) { assert.False(t, ok, "unverified user gets no row") } -func TestPlayCount1000_GatedOnPrevious(t *testing.T) { +// A single run reconciles all three tiers; eligibility for each tier is gated on +// the count reaching the previous tier's threshold. +func TestPlayCountMilestones_TierGating(t *testing.T) { pool := withChallengesDB(t) t2025 := time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC) database.Seed(pool, database.FixtureMap{ "blocks": {{"blockhash": "blk_pcm2", "number": 1}}, - "users": {{"user_id": 410, "wallet": "0x410", "is_verified": true}}, - "tracks": {{"track_id": 4101, "owner_id": 410, "title": "T", "blocknumber": 1}}, + "users": { + {"user_id": 410, "wallet": "0x410", "is_verified": true}, // count 1500 + {"user_id": 420, "wallet": "0x420", "is_verified": true}, // count 300 + }, + "tracks": { + {"track_id": 4101, "owner_id": 410, "title": "T", "blocknumber": 1}, + {"track_id": 4201, "owner_id": 420, "title": "T", "blocknumber": 1}, + }, "aggregate_monthly_plays": { {"play_item_id": 4101, "timestamp": t2025, "count": 1500, "country": ""}, + {"play_item_id": 4201, "timestamp": t2025, "count": 300, "country": ""}, + }, + "plays": { + {"id": 1, "user_id": 999, "play_item_id": 4101, "created_at": t2025}, + {"id": 2, "user_id": 999, "play_item_id": 4201, "created_at": t2025}, }, }) - // Without p1 completed, p2 should not create a row. - runProcessor(t, pool, NewPlayCount1000Processor()) - _, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 410)+":1000") - assert.False(t, ok, "p2 gated on p1 completion") + runProcessor(t, pool, NewPlayCountMilestonesProcessor()) - // Complete p1 first. - runProcessor(t, pool, NewPlayCount250Processor()) - // Now p2 should land. - runProcessor(t, pool, NewPlayCount1000Processor()) - r, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 410)+":1000") - if assert.True(t, ok) { - assert.True(t, r.IsComplete) + // User 410 (count 1500): p1 + p2 complete, p3 eligible but incomplete. + r1, ok := queryUserChallenge(t, pool, "p1", fmt.Sprintf("%x", 410)+":250") + if assert.True(t, ok, "p1 row for 410") { + assert.True(t, r1.IsComplete) + } + r2, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 410)+":1000") + if assert.True(t, ok, "p2 row for 410") { + assert.True(t, r2.IsComplete) + } + r3, ok := queryUserChallenge(t, pool, "p3", fmt.Sprintf("%x", 410)+":10000") + if assert.True(t, ok, "p3 row for 410 (eligible at >=1000)") { + assert.False(t, r3.IsComplete) + // Reported progress is the raw count (a plays trigger bumps the + // aggregate by the seeded play, so it lands just over 1500), capped + // below the 10000 step. + assert.Greater(t, *r3.CurrentStepCount, int32(1000)) + assert.Less(t, *r3.CurrentStepCount, int32(10000)) + } + + // User 420 (count 300): p1 complete, p2 eligible but incomplete, p3 gated out. + r1b, ok := queryUserChallenge(t, pool, "p1", fmt.Sprintf("%x", 420)+":250") + if assert.True(t, ok, "p1 row for 420") { + assert.True(t, r1b.IsComplete) + } + r2b, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 420)+":1000") + if assert.True(t, ok, "p2 row for 420 (eligible at >=250)") { + assert.False(t, r2b.IsComplete) + assert.Greater(t, *r2b.CurrentStepCount, int32(250)) + assert.Less(t, *r2b.CurrentStepCount, int32(1000)) } + _, ok = queryUserChallenge(t, pool, "p3", fmt.Sprintf("%x", 420)+":10000") + assert.False(t, ok, "p3 gated out below 1000 plays") } diff --git a/jobs/index_challenges.go b/jobs/index_challenges.go index 7ebaa7cf..6c5171ad 100644 --- a/jobs/index_challenges.go +++ b/jobs/index_challenges.go @@ -43,9 +43,7 @@ func NewIndexChallengesJob(cfg config.Config, pool database.DbPool) *IndexChalle &challenges.ProfileCompletionProcessor{}, &challenges.ProfileVerifiedProcessor{}, &challenges.ListenStreakProcessor{}, - challenges.NewPlayCount250Processor(), - challenges.NewPlayCount1000Processor(), - challenges.NewPlayCount10000Processor(), + challenges.NewPlayCountMilestonesProcessor(), challenges.NewTrendingTrackProcessor(), challenges.NewTrendingUndergroundProcessor(), challenges.NewTrendingPlaylistProcessor(),