diff --git a/ddl/migrations/0215_seed_play_count_milestones_checkpoint.sql b/ddl/migrations/0215_seed_play_count_milestones_checkpoint.sql new file mode 100644 index 00000000..cfa99140 --- /dev/null +++ b/ddl/migrations/0215_seed_play_count_milestones_checkpoint.sql @@ -0,0 +1,25 @@ +-- Seed the incremental checkpoint for the play-count milestone processor. +-- +-- The p1/p2/p3 play milestones (250/1k/10k plays, verified artists only) used to +-- run one non-incremental GROUP BY per tier every tick — a Parallel Seq Scan of +-- ~485k tracks producing ~1,867 rows in ~49s, times three. They were merged into +-- a single processor that checkpoints plays.id and recomputes only the verified +-- artists whose tracks were played since the checkpoint. See +-- jobs/challenges/play_count_milestones.go and jobs/challenges/incremental.go. +-- +-- A fresh checkpoint defaults to 0, which would walk the entire plays table from +-- the beginning the first time the job runs. We don't want that: the legacy +-- Python stack already populated user_challenges and the upserts are idempotent, +-- so a full historical re-derivation is pure redundant write load. Seed the +-- checkpoint to the current max(plays.id) so the processor starts "from now" and +-- only picks up new plays. +-- +-- ON CONFLICT DO NOTHING keeps this idempotent and never rewinds a checkpoint the +-- running job has already advanced. The max(id) probe is index-only against the +-- plays primary key. +-- +-- Checkpoint name must match playCountMilestonesCheckpoint in the processor. + +INSERT INTO indexing_checkpoints (tablename, last_checkpoint) +SELECT 'challenges:play_count_milestones:last_play_id', COALESCE((SELECT max(id) FROM plays), 0) +ON CONFLICT (tablename) DO NOTHING; diff --git a/jobs/challenges/play_count_milestones.go b/jobs/challenges/play_count_milestones.go index 4f2b6f42..c2d75cb2 100644 --- a/jobs/challenges/play_count_milestones.go +++ b/jobs/challenges/play_count_milestones.go @@ -7,79 +7,121 @@ import ( "github.com/jackc/pgx/v5" ) -// PlayCountMilestone implements challenges p1/p2/p3 — the 250/1k/10k play -// milestones from 2025 onward, restricted to verified artists. +// PlayCountMilestonesProcessor implements challenges p1/p2/p3 — the 250/1k/10k +// play milestones from 2025 onward, restricted to verified artists. // Mirrors apps/packages/discovery-provider/src/challenges/play_count_milestone_challenge_base.py. // -// The play count is summed from aggregate_monthly_plays where timestamp >= 2025-01-01, -// joined to tracks the user owns. Each milestone has a "previous milestone" -// requirement: p2 requires p1 complete, p3 requires p2 complete (matches -// Python). -type PlayCountMilestone struct { - ID string - Threshold int32 // step_count override (we still load from challenges row) - PreviousMilestone string - SpecifierMilestone string // appended to specifier; matches Python's "hex:250"/"hex:1000"/"hex:10000" -} +// All three tiers derive from the same number: a verified artist's total 2025+ +// play count, summed from aggregate_monthly_plays over the tracks they own. A +// tier is eligible once the count reaches the previous tier's threshold (p2 +// needs >=250, p3 needs >=1000), matching Python's "previous milestone +// complete" gating — since a milestone completes exactly when the count crosses +// its step_count, gating on the live count is equivalent and doesn't need a +// second tick for the p1->p2->p3 cascade to propagate. +// +// Incremental: the old design ran one non-incremental GROUP BY per tier every +// tick — a Parallel Seq Scan of ~485k tracks producing ~1,867 rows in ~49s, +// times three. We instead checkpoint plays.id and recompute only the verified +// artists whose tracks were played since the checkpoint, in a single pass for +// all three tiers. See incremental.go. +type PlayCountMilestonesProcessor struct{} -// Per-milestone factory funcs. Each returns a Processor. -func NewPlayCount250Processor() Processor { - return &PlayCountMilestone{ID: "p1", SpecifierMilestone: "250"} -} -func NewPlayCount1000Processor() Processor { - return &PlayCountMilestone{ID: "p2", PreviousMilestone: "p1", SpecifierMilestone: "1000"} +func NewPlayCountMilestonesProcessor() Processor { return &PlayCountMilestonesProcessor{} } + +// ChallengeID returns p1 as a representative id; it is only used for error +// logging by the umbrella job. Reconcile handles all three tiers. +func (p *PlayCountMilestonesProcessor) ChallengeID() string { return "p1" } + +const playCountMilestonesCheckpoint = "challenges:play_count_milestones:last_play_id" + +// playCountTiers are the milestone challenges in ascending order. specifier is +// appended to the per-user specifier and matches Python's "hex:250" form. +var playCountTiers = []struct { + id string + specifier string +}{ + {id: "p1", specifier: "250"}, + {id: "p2", specifier: "1000"}, + {id: "p3", specifier: "10000"}, } -func NewPlayCount10000Processor() Processor { - return &PlayCountMilestone{ID: "p3", PreviousMilestone: "p2", SpecifierMilestone: "10000"} + +type loadedTier struct { + id string + specifier string + stepCount int32 + amount int32 } -func (p *PlayCountMilestone) ChallengeID() string { return p.ID } +// playCountMilestonesDirtySQL surfaces verified artists whose tracks were played +// since the checkpoint. plays is append-only with a monotonic serial id, so +// plays.id is the high-water mark (aggregate_monthly_plays is updated in place +// and carries no monotonic column). The join filters to verified, non-deactivated +// artists so we only ever recompute users who can actually earn the reward. +const playCountMilestonesDirtySQL = ` + SELECT t.owner_id, p.id + FROM plays p + JOIN tracks t ON t.track_id = p.play_item_id + AND t.is_current = true + AND t.is_delete = false + JOIN users u ON u.user_id = t.owner_id + AND u.is_current = true + AND u.is_verified = true + AND u.is_deactivated = false + WHERE p.id > $1 + ORDER BY p.id ASC + LIMIT $2 +` -func (p *PlayCountMilestone) Reconcile(ctx context.Context, tx pgx.Tx) error { - c, ok, err := LoadChallenge(ctx, tx, p.ID) +func (p *PlayCountMilestonesProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { + tiers, err := p.loadTiers(ctx, tx) if err != nil { - return fmt.Errorf("load challenge: %w", err) + return err } - if !ok || !c.Active || c.StepCount == nil { + if len(tiers) == 0 { return nil } - stepCount := *c.StepCount - amount := c.AmountInt() + return reconcileIncrementalUsers(ctx, tx, playCountMilestonesCheckpoint, playCountMilestonesDirtySQL, + func(ctx context.Context, tx pgx.Tx, userIDs []int64) error { + return p.recompute(ctx, tx, userIDs, tiers) + }) +} - // Find verified artists with their total 2025+ play count. We use - // aggregate_monthly_plays which is cheap to scan. - // - // The SQL only takes the (optional) previous-milestone id as a - // parameter; stepCount is applied in Go after the scan. - prevFilter := "true" - var prevArgs []any - if p.PreviousMilestone != "" { - prevFilter = `EXISTS ( - SELECT 1 FROM user_challenges uc - WHERE uc.challenge_id = $1 - AND uc.user_id = u.user_id - AND uc.is_complete = true - )` - prevArgs = []any{p.PreviousMilestone} +// loadTiers loads the active milestone challenges from the challenges table, in +// ascending order. Each tier's threshold and amount come from its row. +func (p *PlayCountMilestonesProcessor) loadTiers(ctx context.Context, tx pgx.Tx) ([]loadedTier, error) { + var out []loadedTier + for _, t := range playCountTiers { + c, ok, err := LoadChallenge(ctx, tx, t.id) + if err != nil { + return nil, fmt.Errorf("load challenge %s: %w", t.id, err) + } + if !ok || !c.Active || c.StepCount == nil { + continue + } + out = append(out, loadedTier{ + id: t.id, + specifier: t.specifier, + stepCount: *c.StepCount, + amount: c.AmountInt(), + }) } + return out, nil +} - query := fmt.Sprintf(` - SELECT u.user_id, COALESCE(SUM(amp.count), 0)::int AS play_count - FROM users u - JOIN tracks t ON t.owner_id = u.user_id - AND t.is_current = true - AND t.is_delete = false +func (p *PlayCountMilestonesProcessor) recompute(ctx context.Context, tx pgx.Tx, userIDs []int64, tiers []loadedTier) error { + // Total 2025+ play count for just the affected verified artists. Bounded by + // the dirty set (a handful of users per tick), this is a cheap indexed + // aggregation rather than the old full-table GROUP BY. + rows, err := tx.Query(ctx, ` + SELECT t.owner_id, COALESCE(SUM(amp.count), 0)::int AS play_count + FROM tracks t JOIN aggregate_monthly_plays amp ON amp.play_item_id = t.track_id - AND amp.timestamp >= DATE '2025-01-01' - WHERE u.is_current = true - AND u.is_verified = true - AND u.is_deactivated = false - AND %s - GROUP BY u.user_id - HAVING COALESCE(SUM(amp.count), 0) > 0 - `, prevFilter) - - rows, err := tx.Query(ctx, query, prevArgs...) + AND amp.timestamp >= DATE '2025-01-01' + WHERE t.owner_id = ANY($1) + AND t.is_current = true + AND t.is_delete = false + GROUP BY t.owner_id + `, userIDs) if err != nil { return fmt.Errorf("scan play counts: %w", err) } @@ -102,15 +144,27 @@ func (p *PlayCountMilestone) Reconcile(ctx context.Context, tx pgx.Tx) error { } for _, r := range results { - report := r.playCount - if report > stepCount { - report = stepCount + if r.playCount <= 0 { + continue } - specifier := SpecifierFromUserID(r.userID) + ":" + p.SpecifierMilestone - if err := UpsertUserChallenge(ctx, tx, - p.ID, specifier, r.userID, report, stepCount, amount, - ); err != nil { - return fmt.Errorf("upsert: %w", err) + // Walk tiers ascending; each tier is eligible only once the count + // reaches the previous tier's threshold (the p1->p2->p3 gate). + prevThreshold := int32(0) + for _, tier := range tiers { + if r.playCount < prevThreshold { + break + } + report := r.playCount + if report > tier.stepCount { + report = tier.stepCount + } + specifier := SpecifierFromUserID(r.userID) + ":" + tier.specifier + if err := UpsertUserChallenge(ctx, tx, + tier.id, specifier, r.userID, report, tier.stepCount, tier.amount, + ); err != nil { + return fmt.Errorf("upsert %s: %w", tier.id, err) + } + prevThreshold = tier.stepCount } } return nil diff --git a/jobs/challenges/play_count_milestones_test.go b/jobs/challenges/play_count_milestones_test.go index 3572bd72..0582327e 100644 --- a/jobs/challenges/play_count_milestones_test.go +++ b/jobs/challenges/play_count_milestones_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestPlayCount250_RequiresVerified(t *testing.T) { +func TestPlayCountMilestones_RequiresVerified(t *testing.T) { pool := withChallengesDB(t) t2025 := time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC) database.Seed(pool, database.FixtureMap{ @@ -27,9 +27,15 @@ func TestPlayCount250_RequiresVerified(t *testing.T) { {"play_item_id": 4001, "timestamp": t2025, "count": 300, "country": ""}, {"play_item_id": 4011, "timestamp": t2025, "count": 300, "country": ""}, }, + // plays drive the incremental dirty scan; one per track is enough to + // mark its owner dirty. + "plays": { + {"id": 1, "user_id": 999, "play_item_id": 4001, "created_at": t2025}, + {"id": 2, "user_id": 999, "play_item_id": 4011, "created_at": t2025}, + }, }) - runProcessor(t, pool, NewPlayCount250Processor()) + runProcessor(t, pool, NewPlayCountMilestonesProcessor()) r1, ok := queryUserChallenge(t, pool, "p1", fmt.Sprintf("%x", 400)+":250") require.True(t, ok, "verified user 400 should get a p1 row") @@ -41,29 +47,63 @@ func TestPlayCount250_RequiresVerified(t *testing.T) { assert.False(t, ok, "unverified user gets no row") } -func TestPlayCount1000_GatedOnPrevious(t *testing.T) { +// A single run reconciles all three tiers; eligibility for each tier is gated on +// the count reaching the previous tier's threshold. +func TestPlayCountMilestones_TierGating(t *testing.T) { pool := withChallengesDB(t) t2025 := time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC) database.Seed(pool, database.FixtureMap{ "blocks": {{"blockhash": "blk_pcm2", "number": 1}}, - "users": {{"user_id": 410, "wallet": "0x410", "is_verified": true}}, - "tracks": {{"track_id": 4101, "owner_id": 410, "title": "T", "blocknumber": 1}}, + "users": { + {"user_id": 410, "wallet": "0x410", "is_verified": true}, // count 1500 + {"user_id": 420, "wallet": "0x420", "is_verified": true}, // count 300 + }, + "tracks": { + {"track_id": 4101, "owner_id": 410, "title": "T", "blocknumber": 1}, + {"track_id": 4201, "owner_id": 420, "title": "T", "blocknumber": 1}, + }, "aggregate_monthly_plays": { {"play_item_id": 4101, "timestamp": t2025, "count": 1500, "country": ""}, + {"play_item_id": 4201, "timestamp": t2025, "count": 300, "country": ""}, + }, + "plays": { + {"id": 1, "user_id": 999, "play_item_id": 4101, "created_at": t2025}, + {"id": 2, "user_id": 999, "play_item_id": 4201, "created_at": t2025}, }, }) - // Without p1 completed, p2 should not create a row. - runProcessor(t, pool, NewPlayCount1000Processor()) - _, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 410)+":1000") - assert.False(t, ok, "p2 gated on p1 completion") + runProcessor(t, pool, NewPlayCountMilestonesProcessor()) - // Complete p1 first. - runProcessor(t, pool, NewPlayCount250Processor()) - // Now p2 should land. - runProcessor(t, pool, NewPlayCount1000Processor()) - r, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 410)+":1000") - if assert.True(t, ok) { - assert.True(t, r.IsComplete) + // User 410 (count 1500): p1 + p2 complete, p3 eligible but incomplete. + r1, ok := queryUserChallenge(t, pool, "p1", fmt.Sprintf("%x", 410)+":250") + if assert.True(t, ok, "p1 row for 410") { + assert.True(t, r1.IsComplete) + } + r2, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 410)+":1000") + if assert.True(t, ok, "p2 row for 410") { + assert.True(t, r2.IsComplete) + } + r3, ok := queryUserChallenge(t, pool, "p3", fmt.Sprintf("%x", 410)+":10000") + if assert.True(t, ok, "p3 row for 410 (eligible at >=1000)") { + assert.False(t, r3.IsComplete) + // Reported progress is the raw count (a plays trigger bumps the + // aggregate by the seeded play, so it lands just over 1500), capped + // below the 10000 step. + assert.Greater(t, *r3.CurrentStepCount, int32(1000)) + assert.Less(t, *r3.CurrentStepCount, int32(10000)) + } + + // User 420 (count 300): p1 complete, p2 eligible but incomplete, p3 gated out. + r1b, ok := queryUserChallenge(t, pool, "p1", fmt.Sprintf("%x", 420)+":250") + if assert.True(t, ok, "p1 row for 420") { + assert.True(t, r1b.IsComplete) + } + r2b, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 420)+":1000") + if assert.True(t, ok, "p2 row for 420 (eligible at >=250)") { + assert.False(t, r2b.IsComplete) + assert.Greater(t, *r2b.CurrentStepCount, int32(250)) + assert.Less(t, *r2b.CurrentStepCount, int32(1000)) } + _, ok = queryUserChallenge(t, pool, "p3", fmt.Sprintf("%x", 420)+":10000") + assert.False(t, ok, "p3 gated out below 1000 plays") } diff --git a/jobs/index_challenges.go b/jobs/index_challenges.go index 7ebaa7cf..6c5171ad 100644 --- a/jobs/index_challenges.go +++ b/jobs/index_challenges.go @@ -43,9 +43,7 @@ func NewIndexChallengesJob(cfg config.Config, pool database.DbPool) *IndexChalle &challenges.ProfileCompletionProcessor{}, &challenges.ProfileVerifiedProcessor{}, &challenges.ListenStreakProcessor{}, - challenges.NewPlayCount250Processor(), - challenges.NewPlayCount1000Processor(), - challenges.NewPlayCount10000Processor(), + challenges.NewPlayCountMilestonesProcessor(), challenges.NewTrendingTrackProcessor(), challenges.NewTrendingUndergroundProcessor(), challenges.NewTrendingPlaylistProcessor(),