diff --git a/ddl/migrations/0214_trending_matview_unique_indexes.sql b/ddl/migrations/0214_trending_matview_unique_indexes.sql new file mode 100644 index 00000000..07870541 --- /dev/null +++ b/ddl/migrations/0214_trending_matview_unique_indexes.sql @@ -0,0 +1,18 @@ +-- Promote the track_id indexes on the trending matviews to UNIQUE so +-- TrendingJob can use REFRESH MATERIALIZED VIEW CONCURRENTLY (which avoids the +-- ACCESS EXCLUSIVE lock a plain refresh takes). CONCURRENTLY requires a UNIQUE +-- index with no WHERE clause; both matviews are one row per track (verified in +-- prod: row count == COUNT(DISTINCT track_id)), so this is valid. We replace +-- the existing non-unique indexes in place. +-- +-- Intentionally NOT wrapped in BEGIN/COMMIT: each statement autocommits so the +-- brief ACCESS EXCLUSIVE from DROP/CREATE INDEX is held only for the index +-- build. IF NOT EXISTS keeps re-application idempotent. + +DROP INDEX IF EXISTS public.interval_play_track_id_idx; +CREATE UNIQUE INDEX IF NOT EXISTS interval_play_track_id_idx + ON public.aggregate_interval_plays USING btree (track_id); + +DROP INDEX IF EXISTS public.trending_params_track_id_idx; +CREATE UNIQUE INDEX IF NOT EXISTS trending_params_track_id_idx + ON public.trending_params USING btree (track_id); diff --git a/indexer/indexer.go b/indexer/indexer.go index 94721b5f..fef46e02 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -148,8 +148,11 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) { jobs.NewUserListeningHistoryJob(ci.Config, ci.pool). ScheduleEvery(ctx, 5*time.Second) + // Hourly to match discovery's effective cadence (trending_refresh_seconds + // default 3600). The vendored port dropped that gate, so a 10s schedule ran + // the multi-minute recompute continuously and IO-starved the block loop. jobs.NewTrendingJob(ci.Config, ci.pool). - ScheduleEvery(ctx, 10*time.Second) + ScheduleEvery(ctx, 1*time.Hour) jobs.NewUpdateDelistStatusesJob(ci.Config, ci.pool). ScheduleEvery(ctx, 5*time.Minute) diff --git a/jobs/index_trending.go b/jobs/index_trending.go index fb6ca87a..0d318fd6 100644 --- a/jobs/index_trending.go +++ b/jobs/index_trending.go @@ -15,22 +15,15 @@ import ( // TrendingJob recomputes trending scores. Ports the score-computation half // of apps/packages/discovery-provider/src/tasks/index_trending.py. // -// On each run: -// 1. Refresh aggregate_interval_plays MV (feeds week/month listen counts). -// 2. Refresh trending_params MV (per-track inputs). -// 3. For each (entity_type, version), run a delete-then-bulk-insert into -// track_trending_scores / playlist_trending_scores using the strategy's -// SQL template. +// On each run it refreshes the aggregate_interval_plays and trending_params +// matviews, then for each (entity_type, version) reconciles the *_trending_scores +// tables via a skip-unchanged upsert plus an anti-join delete of rows that no +// longer qualify (instead of the old DELETE-all + bulk-INSERT, which rewrote the +// whole 16GB table and its six indexes every run). // -// What is intentionally NOT ported here: -// - Trending notifications (top-10 mover diff against the notification -// table). Lands with the challenges/notifications work. -// - index_tastemaker (writes challenge events). Depends on the challenge -// bus, which is a separate effort. -// -// Trending score templates are copied verbatim from -// apps/packages/discovery-provider/src/trending_strategies/{pnagD,AnlGe}_*.py -// so the scoring stays bit-identical to discovery. +// Scheduled hourly to match discovery (default trending_refresh_seconds=3600; +// see indexer's startParityJobs). Score templates are copied verbatim from +// apps' trending_strategies/{pnagD,AnlGe}_*.py so scoring stays bit-identical. type TrendingJob struct { pool database.DbPool logger *zap.Logger @@ -88,8 +81,8 @@ func (j *TrendingJob) run(ctx context.Context) error { start := time.Now() for _, mv := range []string{"aggregate_interval_plays", "trending_params"} { - if _, err := j.pool.Exec(ctx, "REFRESH MATERIALIZED VIEW "+mv); err != nil { - return fmt.Errorf("refresh %s: %w", mv, err) + if err := j.refreshMatview(ctx, mv); err != nil { + return err } } @@ -113,6 +106,29 @@ func (j *TrendingJob) run(ctx context.Context) error { return nil } +// refreshMatview prefers REFRESH ... CONCURRENTLY so it doesn't take an ACCESS +// EXCLUSIVE lock that stalls every reader (the block-indexing loop included). +// CONCURRENTLY needs the matview populated and carrying a unique index (migration +// 0214); both can be transiently false (fresh schema-only DB, or pre-migration +// during a rolling deploy), so we fall back to a blocking refresh until they hold. +func (j *TrendingJob) refreshMatview(ctx context.Context, mv string) error { + mvStart := time.Now() + concurrent := true + if _, err := j.pool.Exec(ctx, "REFRESH MATERIALIZED VIEW CONCURRENTLY "+mv); err != nil { + j.logger.Warn("concurrent matview refresh failed; falling back to blocking refresh", + zap.String("matview", mv), zap.Error(err)) + concurrent = false + if _, err := j.pool.Exec(ctx, "REFRESH MATERIALIZED VIEW "+mv); err != nil { + return fmt.Errorf("refresh %s: %w", mv, err) + } + } + j.logger.Info("refreshed matview", + zap.String("matview", mv), + zap.Bool("concurrent", concurrent), + zap.Duration("duration", time.Since(mvStart))) + return nil +} + // trackScoreParams holds the scalar weights used by a tracks strategy. The // only behavior difference between pnagD and AnlGe is whether the trailing // multiplier is `karma` or `(1 + LOG(1 + karma))`. @@ -149,11 +165,22 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v } defer tx.Rollback(ctx) - if _, err := tx.Exec(ctx, - "DELETE FROM track_trending_scores WHERE type = $1 AND version = $2", - trendingType, version, - ); err != nil { - return err + // Stage this cycle's scores in a temp table, then reconcile against the + // live table (upsert changed/new rows, delete dropped-out rows). The score + // expressions below are byte-for-byte the discovery templates; only the + // write target changed (they now feed tmp_track_scores instead of a + // DELETE+INSERT of track_trending_scores), and the bind parameters were + // renumbered without gaps because the (constant) type/version no longer + // appear in the staged SELECT — type/version are applied at the upsert. + if _, err := tx.Exec(ctx, ` + CREATE TEMP TABLE tmp_track_scores ( + track_id integer, + genre varchar, + time_range varchar, + score double precision + ) ON COMMIT DROP + `); err != nil { + return fmt.Errorf("create temp: %w", err) } // Week + month follow the same shape; we generate one query per range. @@ -168,38 +195,36 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v {"month", trendingMo, "aip.month_listen_counts", "tp.repost_month_count", "tp.save_month_count"}, } { q := fmt.Sprintf(` - INSERT INTO track_trending_scores - (track_id, genre, type, version, time_range, score, created_at) + INSERT INTO tmp_track_scores (track_id, genre, time_range, score) SELECT tp.track_id, tp.genre, - $1, $2, $3, + $1, CASE - WHEN tp.owner_follower_count < $4 THEN 0 + WHEN tp.owner_follower_count < $2 THEN 0 WHEN EXTRACT(DAYS FROM now() - ( CASE WHEN tp.release_date > now() THEN aip.created_at ELSE GREATEST(tp.release_date, aip.created_at) END - )) > $5 THEN GREATEST( - 1.0 / $6, - POW($6, GREATEST( + )) > $3 THEN GREATEST( + 1.0 / $4, + POW($4, GREATEST( -10, 1.0 - 1.0 * EXTRACT(DAYS FROM now() - ( CASE WHEN tp.release_date > now() THEN aip.created_at ELSE GREATEST(tp.release_date, aip.created_at) END - )) / $5 + )) / $3 )) ) * ( - $7 * %s + $8 * %s + $9 * %s + $10 * tp.repost_count + $11 * tp.save_count + $5 * %s + $6 * %s + $7 * %s + $8 * tp.repost_count + $9 * tp.save_count ) * %s ELSE ( - $7 * %s + $8 * %s + $9 * %s + $10 * tp.repost_count + $11 * tp.save_count + $5 * %s + $6 * %s + $7 * %s + $8 * tp.repost_count + $9 * tp.save_count ) * %s - END, - now() + END FROM trending_params tp INNER JOIN aggregate_interval_plays aip ON tp.track_id = aip.track_id `, @@ -207,7 +232,7 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v r.listenField, r.repostField, r.saveField, p.karmaExpr, ) if _, err := tx.Exec(ctx, q, - trendingType, version, r.name, + r.name, trendingY, r.days, trendingQ, trendingN, trendingF, trendingO, trendingR, trendingI, ); err != nil { @@ -217,15 +242,13 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v // All-time uses aggregate_plays.count rather than aggregate_interval_plays. q := fmt.Sprintf(` - INSERT INTO track_trending_scores - (track_id, genre, type, version, time_range, score, created_at) + INSERT INTO tmp_track_scores (track_id, genre, time_range, score) SELECT - tp.track_id, tp.genre, $1, $2, 'allTime', + tp.track_id, tp.genre, 'allTime', CASE - WHEN tp.owner_follower_count < $3 THEN 0 - ELSE ($4 * ap.count + $5 * tp.repost_count + $6 * tp.save_count) * %s - END, - now() + WHEN tp.owner_follower_count < $1 THEN 0 + ELSE ($2 * ap.count + $3 * tp.repost_count + $4 * tp.save_count) * %s + END FROM trending_params tp INNER JOIN aggregate_plays ap ON tp.track_id = ap.play_item_id INNER JOIN tracks t ON ap.play_item_id = t.track_id @@ -235,12 +258,41 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v AND t.stem_of IS NULL `, p.karmaExpr) if _, err := tx.Exec(ctx, q, - trendingType, version, trendingY, trendingN, trendingR, trendingI, ); err != nil { return fmt.Errorf("allTime range: %w", err) } + // Upsert new/changed rows; the WHERE guard skips rows whose score and genre + // are unchanged so a no-op cycle writes (and dirties indexes for) nothing. + if _, err := tx.Exec(ctx, ` + INSERT INTO track_trending_scores + (track_id, genre, type, version, time_range, score, created_at) + SELECT s.track_id, s.genre, $1, $2, s.time_range, s.score, now() + FROM tmp_track_scores s + ON CONFLICT (track_id, type, version, time_range) DO UPDATE + SET score = EXCLUDED.score, + genre = EXCLUDED.genre, + created_at = EXCLUDED.created_at + WHERE track_trending_scores.score IS DISTINCT FROM EXCLUDED.score + OR track_trending_scores.genre IS DISTINCT FROM EXCLUDED.genre + `, trendingType, version); err != nil { + return fmt.Errorf("upsert: %w", err) + } + + // Delete rows that no longer qualify this cycle (e.g. track deleted/unlisted + // or aged out of the interval window). + if _, err := tx.Exec(ctx, ` + DELETE FROM track_trending_scores t + WHERE t.type = $1 AND t.version = $2 + AND NOT EXISTS ( + SELECT 1 FROM tmp_track_scores s + WHERE s.track_id = t.track_id AND s.time_range = t.time_range + ) + `, trendingType, version); err != nil { + return fmt.Errorf("prune stale: %w", err) + } + return tx.Commit(ctx) } @@ -253,11 +305,19 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType } defer tx.Rollback(ctx) - if _, err := tx.Exec(ctx, - "DELETE FROM playlist_trending_scores WHERE type = $1 AND version = $2", - trendingType, version, - ); err != nil { - return err + // Stage this cycle's scores, then upsert/prune (see computeTrendingTracks + // for the rationale). The per-range SELECT is the verbatim discovery + // template; only its write target changed, and the bind parameters were + // renumbered without gaps now that the (constant) type/version are applied + // at the upsert instead of in the staged SELECT. + if _, err := tx.Exec(ctx, ` + CREATE TEMP TABLE tmp_playlist_scores ( + playlist_id integer, + time_range varchar, + score double precision + ) ON COMMIT DROP + `); err != nil { + return fmt.Errorf("create temp: %w", err) } // Each time range gets its own query because the interval (':week days' @@ -272,8 +332,7 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType } { intervalLit := fmt.Sprintf("%d days", r.days) q := ` - INSERT INTO playlist_trending_scores - (playlist_id, type, version, time_range, score, created_at) + INSERT INTO tmp_playlist_scores (playlist_id, time_range, score) WITH saves_and_reposts AS ( SELECT user_id, repost_item_id AS item_id FROM reposts @@ -326,31 +385,30 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType ) SELECT p.playlist_id, - $2, $3, $4, + $2, CASE - WHEN au.follower_count < $5 THEN 0 + WHEN au.follower_count < $3 THEN 0 WHEN EXTRACT(DAYS FROM now() - ( CASE WHEN p.release_date > now() THEN p.created_at ELSE GREATEST(p.release_date, p.created_at) END - )) > $6 THEN GREATEST( - 1.0 / $7, - POW($7, GREATEST( + )) > $4 THEN GREATEST( + 1.0 / $5, + POW($5, GREATEST( -10, 1.0 - 1.0 * EXTRACT(DAYS FROM now() - ( CASE WHEN p.release_date > now() THEN p.created_at ELSE GREATEST(p.release_date, p.created_at) END - )) / $6 + )) / $4 )) ) * ( - $8 * 1 + $9 * COALESCE(rp.week_count, 0) + $10 * COALESCE(s.week_count, 0) - + $11 * COALESCE(ap.repost_count, 0) + $12 * COALESCE(ap.save_count, 0) + $6 * 1 + $7 * COALESCE(rp.week_count, 0) + $8 * COALESCE(s.week_count, 0) + + $9 * COALESCE(ap.repost_count, 0) + $10 * COALESCE(ap.save_count, 0) ) * COALESCE(k.karma, 1) ELSE ( - $8 * 1 + $9 * COALESCE(rp.week_count, 0) + $10 * COALESCE(s.week_count, 0) - + $11 * COALESCE(ap.repost_count, 0) + $12 * COALESCE(ap.save_count, 0) + $6 * 1 + $7 * COALESCE(rp.week_count, 0) + $8 * COALESCE(s.week_count, 0) + + $9 * COALESCE(ap.repost_count, 0) + $10 * COALESCE(ap.save_count, 0) ) * COALESCE(k.karma, 1) - END, - now() + END FROM playlists p INNER JOIN aggregate_user au ON p.playlist_owner_id = au.user_id LEFT JOIN aggregate_playlist ap ON p.playlist_id = ap.playlist_id @@ -363,8 +421,7 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType AND jsonb_array_length(p.playlist_contents->'track_ids') >= 3 ` if _, err := tx.Exec(ctx, q, - intervalLit, - trendingType, version, r.name, + intervalLit, r.name, trendingY, r.days, trendingQ, trendingN, trendingF, trendingO, trendingR, trendingI, ); err != nil { @@ -372,6 +429,31 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType } } + // Upsert new/changed rows; skip rows whose score is unchanged. + if _, err := tx.Exec(ctx, ` + INSERT INTO playlist_trending_scores + (playlist_id, type, version, time_range, score, created_at) + SELECT s.playlist_id, $1, $2, s.time_range, s.score, now() + FROM tmp_playlist_scores s + ON CONFLICT (playlist_id, type, version, time_range) DO UPDATE + SET score = EXCLUDED.score, + created_at = EXCLUDED.created_at + WHERE playlist_trending_scores.score IS DISTINCT FROM EXCLUDED.score + `, trendingType, version); err != nil { + return fmt.Errorf("upsert: %w", err) + } + + // Delete rows that no longer qualify this cycle. + if _, err := tx.Exec(ctx, ` + DELETE FROM playlist_trending_scores t + WHERE t.type = $1 AND t.version = $2 + AND NOT EXISTS ( + SELECT 1 FROM tmp_playlist_scores s + WHERE s.playlist_id = t.playlist_id AND s.time_range = t.time_range + ) + `, trendingType, version); err != nil { + return fmt.Errorf("prune stale: %w", err) + } + return tx.Commit(ctx) } - diff --git a/jobs/index_trending_test.go b/jobs/index_trending_test.go index 63a5600d..74159480 100644 --- a/jobs/index_trending_test.go +++ b/jobs/index_trending_test.go @@ -44,10 +44,50 @@ func TestTrendingJob_PopulatesScores(t *testing.T) { require.NoError(t, pool.QueryRow(ctx, "SELECT COUNT(*) FROM track_trending_scores WHERE track_id = 100").Scan(&nTracks)) assert.GreaterOrEqual(t, nTracks, 9, "expected at least 9 score rows for the seeded track") - // Re-running should not duplicate (DELETE-then-INSERT per - // (type, version)). + // Re-running should not duplicate (upsert keyed on the PK). require.NoError(t, job.run(ctx)) var nAfter int require.NoError(t, pool.QueryRow(ctx, "SELECT COUNT(*) FROM track_trending_scores WHERE track_id = 100").Scan(&nAfter)) assert.Equal(t, nTracks, nAfter, "second run must not duplicate rows") } + +// TestTrendingJob_PrunesStaleRows verifies the anti-join prune: a track that +// drops out of the trending source set (here, by being deleted) has its +// previously-written score rows removed on the next run. The old +// DELETE-then-INSERT approach got this for free; the upsert+prune rewrite has +// to do it explicitly, so it's worth a dedicated test. +func TestTrendingJob_PrunesStaleRows(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_jobs") + defer pool.Close() + ctx := context.Background() + + now := time.Now() + database.Seed(pool, database.FixtureMap{ + "users": {{"user_id": 1, "wallet": "0x01", "name": "Alice"}}, + "tracks": {{ + "track_id": 200, "owner_id": 1, "title": "T", + "release_date": now.Add(-2 * 24 * time.Hour), + "created_at": now.Add(-2 * 24 * time.Hour), + }}, + "plays": { + {"id": 1, "user_id": 1, "play_item_id": 200, "created_at": now.Add(-1 * time.Hour)}, + }, + }) + + job := NewTrendingJob(newTestConfig(), pool) + require.NoError(t, job.run(ctx)) + + var nBefore int + require.NoError(t, pool.QueryRow(ctx, "SELECT COUNT(*) FROM track_trending_scores WHERE track_id = 200").Scan(&nBefore)) + require.Greater(t, nBefore, 0, "expected score rows for the seeded track before deletion") + + // Remove the track from the trending source set, then re-run. The + // anti-join prune must clear its now-stale score rows. + _, err := pool.Exec(ctx, "UPDATE tracks SET is_delete = true WHERE track_id = 200") + require.NoError(t, err) + require.NoError(t, job.run(ctx)) + + var nAfter int + require.NoError(t, pool.QueryRow(ctx, "SELECT COUNT(*) FROM track_trending_scores WHERE track_id = 200").Scan(&nAfter)) + assert.Equal(t, 0, nAfter, "stale score rows must be pruned after the track leaves the source set") +}