Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions ddl/migrations/0215_seed_play_count_milestones_checkpoint.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- Seed the incremental checkpoint for the play-count milestone processor.
--
-- The p1/p2/p3 play milestones (250/1k/10k plays, verified artists only) used to
-- run one non-incremental GROUP BY per tier every tick — a Parallel Seq Scan of
-- ~485k tracks producing ~1,867 rows in ~49s, times three. They were merged into
-- a single processor that checkpoints plays.id and recomputes only the verified
-- artists whose tracks were played since the checkpoint. See
-- jobs/challenges/play_count_milestones.go and jobs/challenges/incremental.go.
--
-- A fresh checkpoint defaults to 0, which would walk the entire plays table from
-- the beginning the first time the job runs. We don't want that: the legacy
-- Python stack already populated user_challenges and the upserts are idempotent,
-- so a full historical re-derivation is pure redundant write load. Seed the
-- checkpoint to the current max(plays.id) so the processor starts "from now" and
-- only picks up new plays.
--
-- ON CONFLICT DO NOTHING keeps this idempotent and never rewinds a checkpoint the
-- running job has already advanced. The max(id) probe is index-only against the
-- plays primary key.
--
-- Checkpoint name must match playCountMilestonesCheckpoint in the processor.

INSERT INTO indexing_checkpoints (tablename, last_checkpoint)
SELECT 'challenges:play_count_milestones:last_play_id', COALESCE((SELECT max(id) FROM plays), 0)
ON CONFLICT (tablename) DO NOTHING;
186 changes: 120 additions & 66 deletions jobs/challenges/play_count_milestones.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,79 +7,121 @@ import (
"github.com/jackc/pgx/v5"
)

// PlayCountMilestone implements challenges p1/p2/p3 — the 250/1k/10k play
// milestones from 2025 onward, restricted to verified artists.
// PlayCountMilestonesProcessor implements challenges p1/p2/p3 — the 250/1k/10k
// play milestones from 2025 onward, restricted to verified artists.
// Mirrors apps/packages/discovery-provider/src/challenges/play_count_milestone_challenge_base.py.
//
// The play count is summed from aggregate_monthly_plays where timestamp >= 2025-01-01,
// joined to tracks the user owns. Each milestone has a "previous milestone"
// requirement: p2 requires p1 complete, p3 requires p2 complete (matches
// Python).
type PlayCountMilestone struct {
ID string
Threshold int32 // step_count override (we still load from challenges row)
PreviousMilestone string
SpecifierMilestone string // appended to specifier; matches Python's "hex:250"/"hex:1000"/"hex:10000"
}
// All three tiers derive from the same number: a verified artist's total 2025+
// play count, summed from aggregate_monthly_plays over the tracks they own. A
// tier is eligible once the count reaches the previous tier's threshold (p2
// needs >=250, p3 needs >=1000), matching Python's "previous milestone
// complete" gating — since a milestone completes exactly when the count crosses
// its step_count, gating on the live count is equivalent and doesn't need a
// second tick for the p1->p2->p3 cascade to propagate.
//
// Incremental: the old design ran one non-incremental GROUP BY per tier every
// tick — a Parallel Seq Scan of ~485k tracks producing ~1,867 rows in ~49s,
// times three. We instead checkpoint plays.id and recompute only the verified
// artists whose tracks were played since the checkpoint, in a single pass for
// all three tiers. See incremental.go.
type PlayCountMilestonesProcessor struct{}

// Per-milestone factory funcs. Each returns a Processor.
func NewPlayCount250Processor() Processor {
return &PlayCountMilestone{ID: "p1", SpecifierMilestone: "250"}
}
func NewPlayCount1000Processor() Processor {
return &PlayCountMilestone{ID: "p2", PreviousMilestone: "p1", SpecifierMilestone: "1000"}
func NewPlayCountMilestonesProcessor() Processor { return &PlayCountMilestonesProcessor{} }

// ChallengeID returns p1 as a representative id; it is only used for error
// logging by the umbrella job. Reconcile handles all three tiers.
func (p *PlayCountMilestonesProcessor) ChallengeID() string { return "p1" }

const playCountMilestonesCheckpoint = "challenges:play_count_milestones:last_play_id"

// playCountTiers are the milestone challenges in ascending order. specifier is
// appended to the per-user specifier and matches Python's "hex:250" form.
var playCountTiers = []struct {
id string
specifier string
}{
{id: "p1", specifier: "250"},
{id: "p2", specifier: "1000"},
{id: "p3", specifier: "10000"},
}
func NewPlayCount10000Processor() Processor {
return &PlayCountMilestone{ID: "p3", PreviousMilestone: "p2", SpecifierMilestone: "10000"}

type loadedTier struct {
id string
specifier string
stepCount int32
amount int32
}

func (p *PlayCountMilestone) ChallengeID() string { return p.ID }
// playCountMilestonesDirtySQL surfaces verified artists whose tracks were played
// since the checkpoint. plays is append-only with a monotonic serial id, so
// plays.id is the high-water mark (aggregate_monthly_plays is updated in place
// and carries no monotonic column). The join filters to verified, non-deactivated
// artists so we only ever recompute users who can actually earn the reward.
const playCountMilestonesDirtySQL = `
SELECT t.owner_id, p.id
FROM plays p
JOIN tracks t ON t.track_id = p.play_item_id
AND t.is_current = true
AND t.is_delete = false
JOIN users u ON u.user_id = t.owner_id
AND u.is_current = true
AND u.is_verified = true
AND u.is_deactivated = false
WHERE p.id > $1
ORDER BY p.id ASC
LIMIT $2
`

func (p *PlayCountMilestone) Reconcile(ctx context.Context, tx pgx.Tx) error {
c, ok, err := LoadChallenge(ctx, tx, p.ID)
func (p *PlayCountMilestonesProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
tiers, err := p.loadTiers(ctx, tx)
if err != nil {
return fmt.Errorf("load challenge: %w", err)
return err
}
if !ok || !c.Active || c.StepCount == nil {
if len(tiers) == 0 {
return nil
}
stepCount := *c.StepCount
amount := c.AmountInt()
return reconcileIncrementalUsers(ctx, tx, playCountMilestonesCheckpoint, playCountMilestonesDirtySQL,
func(ctx context.Context, tx pgx.Tx, userIDs []int64) error {
return p.recompute(ctx, tx, userIDs, tiers)
})
}

// Find verified artists with their total 2025+ play count. We use
// aggregate_monthly_plays which is cheap to scan.
//
// The SQL only takes the (optional) previous-milestone id as a
// parameter; stepCount is applied in Go after the scan.
prevFilter := "true"
var prevArgs []any
if p.PreviousMilestone != "" {
prevFilter = `EXISTS (
SELECT 1 FROM user_challenges uc
WHERE uc.challenge_id = $1
AND uc.user_id = u.user_id
AND uc.is_complete = true
)`
prevArgs = []any{p.PreviousMilestone}
// loadTiers loads the active milestone challenges from the challenges table, in
// ascending order. Each tier's threshold and amount come from its row.
func (p *PlayCountMilestonesProcessor) loadTiers(ctx context.Context, tx pgx.Tx) ([]loadedTier, error) {
var out []loadedTier
for _, t := range playCountTiers {
c, ok, err := LoadChallenge(ctx, tx, t.id)
if err != nil {
return nil, fmt.Errorf("load challenge %s: %w", t.id, err)
}
if !ok || !c.Active || c.StepCount == nil {
continue
}
out = append(out, loadedTier{
id: t.id,
specifier: t.specifier,
stepCount: *c.StepCount,
amount: c.AmountInt(),
})
}
return out, nil
}

query := fmt.Sprintf(`
SELECT u.user_id, COALESCE(SUM(amp.count), 0)::int AS play_count
FROM users u
JOIN tracks t ON t.owner_id = u.user_id
AND t.is_current = true
AND t.is_delete = false
func (p *PlayCountMilestonesProcessor) recompute(ctx context.Context, tx pgx.Tx, userIDs []int64, tiers []loadedTier) error {
// Total 2025+ play count for just the affected verified artists. Bounded by
// the dirty set (a handful of users per tick), this is a cheap indexed
// aggregation rather than the old full-table GROUP BY.
rows, err := tx.Query(ctx, `
SELECT t.owner_id, COALESCE(SUM(amp.count), 0)::int AS play_count
FROM tracks t
JOIN aggregate_monthly_plays amp ON amp.play_item_id = t.track_id
AND amp.timestamp >= DATE '2025-01-01'
WHERE u.is_current = true
AND u.is_verified = true
AND u.is_deactivated = false
AND %s
GROUP BY u.user_id
HAVING COALESCE(SUM(amp.count), 0) > 0
`, prevFilter)

rows, err := tx.Query(ctx, query, prevArgs...)
AND amp.timestamp >= DATE '2025-01-01'
WHERE t.owner_id = ANY($1)
AND t.is_current = true
AND t.is_delete = false
GROUP BY t.owner_id
`, userIDs)
if err != nil {
return fmt.Errorf("scan play counts: %w", err)
}
Expand All @@ -102,15 +144,27 @@ func (p *PlayCountMilestone) Reconcile(ctx context.Context, tx pgx.Tx) error {
}

for _, r := range results {
report := r.playCount
if report > stepCount {
report = stepCount
if r.playCount <= 0 {
continue
}
specifier := SpecifierFromUserID(r.userID) + ":" + p.SpecifierMilestone
if err := UpsertUserChallenge(ctx, tx,
p.ID, specifier, r.userID, report, stepCount, amount,
); err != nil {
return fmt.Errorf("upsert: %w", err)
// Walk tiers ascending; each tier is eligible only once the count
// reaches the previous tier's threshold (the p1->p2->p3 gate).
prevThreshold := int32(0)
for _, tier := range tiers {
if r.playCount < prevThreshold {
break
}
report := r.playCount
if report > tier.stepCount {
report = tier.stepCount
}
specifier := SpecifierFromUserID(r.userID) + ":" + tier.specifier
if err := UpsertUserChallenge(ctx, tx,
tier.id, specifier, r.userID, report, tier.stepCount, tier.amount,
); err != nil {
return fmt.Errorf("upsert %s: %w", tier.id, err)
}
prevThreshold = tier.stepCount
}
}
return nil
Expand Down
72 changes: 56 additions & 16 deletions jobs/challenges/play_count_milestones_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestPlayCount250_RequiresVerified(t *testing.T) {
func TestPlayCountMilestones_RequiresVerified(t *testing.T) {
pool := withChallengesDB(t)
t2025 := time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC)
database.Seed(pool, database.FixtureMap{
Expand All @@ -27,9 +27,15 @@ func TestPlayCount250_RequiresVerified(t *testing.T) {
{"play_item_id": 4001, "timestamp": t2025, "count": 300, "country": ""},
{"play_item_id": 4011, "timestamp": t2025, "count": 300, "country": ""},
},
// plays drive the incremental dirty scan; one per track is enough to
// mark its owner dirty.
"plays": {
{"id": 1, "user_id": 999, "play_item_id": 4001, "created_at": t2025},
{"id": 2, "user_id": 999, "play_item_id": 4011, "created_at": t2025},
},
})

runProcessor(t, pool, NewPlayCount250Processor())
runProcessor(t, pool, NewPlayCountMilestonesProcessor())

r1, ok := queryUserChallenge(t, pool, "p1", fmt.Sprintf("%x", 400)+":250")
require.True(t, ok, "verified user 400 should get a p1 row")
Expand All @@ -41,29 +47,63 @@ func TestPlayCount250_RequiresVerified(t *testing.T) {
assert.False(t, ok, "unverified user gets no row")
}

func TestPlayCount1000_GatedOnPrevious(t *testing.T) {
// A single run reconciles all three tiers; eligibility for each tier is gated on
// the count reaching the previous tier's threshold.
func TestPlayCountMilestones_TierGating(t *testing.T) {
pool := withChallengesDB(t)
t2025 := time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC)
database.Seed(pool, database.FixtureMap{
"blocks": {{"blockhash": "blk_pcm2", "number": 1}},
"users": {{"user_id": 410, "wallet": "0x410", "is_verified": true}},
"tracks": {{"track_id": 4101, "owner_id": 410, "title": "T", "blocknumber": 1}},
"users": {
{"user_id": 410, "wallet": "0x410", "is_verified": true}, // count 1500
{"user_id": 420, "wallet": "0x420", "is_verified": true}, // count 300
},
"tracks": {
{"track_id": 4101, "owner_id": 410, "title": "T", "blocknumber": 1},
{"track_id": 4201, "owner_id": 420, "title": "T", "blocknumber": 1},
},
"aggregate_monthly_plays": {
{"play_item_id": 4101, "timestamp": t2025, "count": 1500, "country": ""},
{"play_item_id": 4201, "timestamp": t2025, "count": 300, "country": ""},
},
"plays": {
{"id": 1, "user_id": 999, "play_item_id": 4101, "created_at": t2025},
{"id": 2, "user_id": 999, "play_item_id": 4201, "created_at": t2025},
},
})

// Without p1 completed, p2 should not create a row.
runProcessor(t, pool, NewPlayCount1000Processor())
_, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 410)+":1000")
assert.False(t, ok, "p2 gated on p1 completion")
runProcessor(t, pool, NewPlayCountMilestonesProcessor())

// Complete p1 first.
runProcessor(t, pool, NewPlayCount250Processor())
// Now p2 should land.
runProcessor(t, pool, NewPlayCount1000Processor())
r, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 410)+":1000")
if assert.True(t, ok) {
assert.True(t, r.IsComplete)
// User 410 (count 1500): p1 + p2 complete, p3 eligible but incomplete.
r1, ok := queryUserChallenge(t, pool, "p1", fmt.Sprintf("%x", 410)+":250")
if assert.True(t, ok, "p1 row for 410") {
assert.True(t, r1.IsComplete)
}
r2, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 410)+":1000")
if assert.True(t, ok, "p2 row for 410") {
assert.True(t, r2.IsComplete)
}
r3, ok := queryUserChallenge(t, pool, "p3", fmt.Sprintf("%x", 410)+":10000")
if assert.True(t, ok, "p3 row for 410 (eligible at >=1000)") {
assert.False(t, r3.IsComplete)
// Reported progress is the raw count (a plays trigger bumps the
// aggregate by the seeded play, so it lands just over 1500), capped
// below the 10000 step.
assert.Greater(t, *r3.CurrentStepCount, int32(1000))
assert.Less(t, *r3.CurrentStepCount, int32(10000))
}

// User 420 (count 300): p1 complete, p2 eligible but incomplete, p3 gated out.
r1b, ok := queryUserChallenge(t, pool, "p1", fmt.Sprintf("%x", 420)+":250")
if assert.True(t, ok, "p1 row for 420") {
assert.True(t, r1b.IsComplete)
}
r2b, ok := queryUserChallenge(t, pool, "p2", fmt.Sprintf("%x", 420)+":1000")
if assert.True(t, ok, "p2 row for 420 (eligible at >=250)") {
assert.False(t, r2b.IsComplete)
assert.Greater(t, *r2b.CurrentStepCount, int32(250))
assert.Less(t, *r2b.CurrentStepCount, int32(1000))
}
_, ok = queryUserChallenge(t, pool, "p3", fmt.Sprintf("%x", 420)+":10000")
assert.False(t, ok, "p3 gated out below 1000 plays")
}
4 changes: 1 addition & 3 deletions jobs/index_challenges.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ func NewIndexChallengesJob(cfg config.Config, pool database.DbPool) *IndexChalle
&challenges.ProfileCompletionProcessor{},
&challenges.ProfileVerifiedProcessor{},
&challenges.ListenStreakProcessor{},
challenges.NewPlayCount250Processor(),
challenges.NewPlayCount1000Processor(),
challenges.NewPlayCount10000Processor(),
challenges.NewPlayCountMilestonesProcessor(),
challenges.NewTrendingTrackProcessor(),
challenges.NewTrendingUndergroundProcessor(),
challenges.NewTrendingPlaylistProcessor(),
Expand Down
Loading