From 0091483a8ae23e2361fa92610af5137c94b9fb43 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Mon, 1 Jun 2026 13:34:24 -0700 Subject: [PATCH 1/2] perf(api): stop TrendingJob from continuously rewriting score tables TrendingJob ran on a 10s schedule but each run took minutes: it did two non-concurrent REFRESH MATERIALIZED VIEW (AccessExclusiveLock, blocking all readers including the block-indexing loop) and then a full DELETE+INSERT of the ~16GB track_trending_scores table plus 6 indexes. Effectively continuous, it IO-starved the indexer. Changes: - Refresh matviews with REFRESH MATERIALIZED VIEW CONCURRENTLY, falling back to a blocking refresh if the matview is unpopulated (fresh/test DB) or the unique index isn't present yet (pre-migration rolling deploy). - Migration 0214 promotes the track_id indexes on aggregate_interval_plays and trending_params to UNIQUE so CONCURRENTLY is possible (both matviews are one row per track). - Rewrite the score recompute as upsert-into-staging-temp-table + ON CONFLICT DO UPDATE (skip-unchanged) + anti-join prune, instead of DELETE-then-INSERT of every row each cycle. - Schedule TrendingJob hourly to match discovery's effective cadence (trending_refresh_seconds=3600); the vendored port had dropped that gate. sql/01_schema.sql picks up the unique indexes on the next `make test-schema` regen; the code falls back safely until then. Adds a prune-coverage test. Co-Authored-By: Claude Opus 4.7 --- .../0214_trending_matview_unique_indexes.sql | 27 +++ indexer/indexer.go | 8 +- jobs/index_trending.go | 217 +++++++++++++----- jobs/index_trending_test.go | 44 +++- 4 files changed, 240 insertions(+), 56 deletions(-) create mode 100644 ddl/migrations/0214_trending_matview_unique_indexes.sql diff --git a/ddl/migrations/0214_trending_matview_unique_indexes.sql b/ddl/migrations/0214_trending_matview_unique_indexes.sql new file mode 100644 index 00000000..2249bd8f --- /dev/null +++ b/ddl/migrations/0214_trending_matview_unique_indexes.sql @@ -0,0 +1,27 @@ +-- Promote the track_id indexes on the trending materialized views to UNIQUE. +-- +-- TrendingJob refreshes aggregate_interval_plays and trending_params on every +-- run. Today it uses a plain `REFRESH MATERIALIZED VIEW`, which takes an +-- ACCESS EXCLUSIVE lock on the matview for the duration of the (multi-minute, +-- 64GB-plays-scanning) rebuild — blocking every reader, including the +-- block-indexing loop's downstream queries. +-- +-- `REFRESH MATERIALIZED VIEW CONCURRENTLY` avoids that lock, but Postgres +-- requires a UNIQUE index with no WHERE clause on the matview to use it. Both +-- matviews are exactly one row per track (verified in prod: row count == +-- COUNT(DISTINCT track_id) for each), so a unique index on track_id is valid. +-- We replace the existing NON-unique track_id indexes in place rather than add +-- a second redundant index on the same column. +-- +-- NOTE: intentionally NOT wrapped in BEGIN/COMMIT. Each statement autocommits +-- so the brief ACCESS EXCLUSIVE taken by DROP/CREATE INDEX is held only for the +-- (sub-second, ~1.4M-row) index build, not the whole file. IF NOT EXISTS keeps +-- the CREATE idempotent if the migration is re-applied. + +DROP INDEX IF EXISTS public.interval_play_track_id_idx; +CREATE UNIQUE INDEX IF NOT EXISTS interval_play_track_id_idx + ON public.aggregate_interval_plays USING btree (track_id); + +DROP INDEX IF EXISTS public.trending_params_track_id_idx; +CREATE UNIQUE INDEX IF NOT EXISTS trending_params_track_id_idx + ON public.trending_params USING btree (track_id); diff --git a/indexer/indexer.go b/indexer/indexer.go index 94721b5f..d49df2cd 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -148,8 +148,14 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) { jobs.NewUserListeningHistoryJob(ci.Config, ci.pool). ScheduleEvery(ctx, 5*time.Second) + // Hourly to match discovery's effective cadence: apps' celery beat ticked + // index_trending every 10s but an internal gate (trending_refresh_seconds, + // default 3600) only recomputed once an hour. The vendored port dropped + // that gate, so a 10s schedule meant the multi-minute recompute ran + // continuously, IO-starving the block-indexing loop. Trending leaderboards + // don't need sub-hour freshness. jobs.NewTrendingJob(ci.Config, ci.pool). - ScheduleEvery(ctx, 10*time.Second) + ScheduleEvery(ctx, 1*time.Hour) jobs.NewUpdateDelistStatusesJob(ci.Config, ci.pool). ScheduleEvery(ctx, 5*time.Minute) diff --git a/jobs/index_trending.go b/jobs/index_trending.go index fb6ca87a..e3ece9df 100644 --- a/jobs/index_trending.go +++ b/jobs/index_trending.go @@ -18,9 +18,25 @@ import ( // On each run: // 1. Refresh aggregate_interval_plays MV (feeds week/month listen counts). // 2. Refresh trending_params MV (per-track inputs). -// 3. For each (entity_type, version), run a delete-then-bulk-insert into -// track_trending_scores / playlist_trending_scores using the strategy's -// SQL template. +// 3. For each (entity_type, version), reconcile track_trending_scores / +// playlist_trending_scores against this cycle's computed scores using an +// upsert (skip-unchanged) plus an anti-join delete of rows that no longer +// qualify. +// +// Cadence: discovery ran this hourly (default `trending_refresh_seconds = 3600` +// in apps' default_config.ini — its celery beat ticked every 10s but an +// internal time gate only recomputed once an hour). The vendored port dropped +// that gate, so it must be scheduled at the real cadence (see indexer's +// startParityJobs) rather than every tick. +// +// Write strategy: step 3 used to DELETE every row for a (type, version) and +// bulk-INSERT them all back on every run, rewriting the entire 16GB +// track_trending_scores table (and all six of its indexes) regardless of +// whether any score changed. Because the decay formula has day granularity, +// most per-track scores are stable between hourly runs, so we now stage scores +// in a temp table and UPSERT with a `score IS DISTINCT` guard — turning the +// common case into a near-zero-write no-op — then delete only the rows that +// dropped out of the result set. // // What is intentionally NOT ported here: // - Trending notifications (top-10 mover diff against the notification @@ -88,8 +104,8 @@ func (j *TrendingJob) run(ctx context.Context) error { start := time.Now() for _, mv := range []string{"aggregate_interval_plays", "trending_params"} { - if _, err := j.pool.Exec(ctx, "REFRESH MATERIALIZED VIEW "+mv); err != nil { - return fmt.Errorf("refresh %s: %w", mv, err) + if err := j.refreshMatview(ctx, mv); err != nil { + return err } } @@ -113,6 +129,35 @@ func (j *TrendingJob) run(ctx context.Context) error { return nil } +// refreshMatview refreshes a trending matview, preferring the CONCURRENTLY +// variant so the refresh doesn't take an ACCESS EXCLUSIVE lock that stalls +// every reader (the block-indexing loop included). +// +// CONCURRENTLY has two preconditions: the matview must already be populated, +// and it must carry a unique index (added by migration 0214). Both can be +// transiently false — right after a schema-only bootstrap the matview is +// created WITH NO DATA, and during a rolling deploy the new code can land +// before the index migration. In those windows we fall back to a blocking +// refresh so the job still makes progress; the next cycle uses CONCURRENTLY +// once the preconditions hold. +func (j *TrendingJob) refreshMatview(ctx context.Context, mv string) error { + mvStart := time.Now() + concurrent := true + if _, err := j.pool.Exec(ctx, "REFRESH MATERIALIZED VIEW CONCURRENTLY "+mv); err != nil { + j.logger.Warn("concurrent matview refresh failed; falling back to blocking refresh", + zap.String("matview", mv), zap.Error(err)) + concurrent = false + if _, err := j.pool.Exec(ctx, "REFRESH MATERIALIZED VIEW "+mv); err != nil { + return fmt.Errorf("refresh %s: %w", mv, err) + } + } + j.logger.Info("refreshed matview", + zap.String("matview", mv), + zap.Bool("concurrent", concurrent), + zap.Duration("duration", time.Since(mvStart))) + return nil +} + // trackScoreParams holds the scalar weights used by a tracks strategy. The // only behavior difference between pnagD and AnlGe is whether the trailing // multiplier is `karma` or `(1 + LOG(1 + karma))`. @@ -149,11 +194,22 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v } defer tx.Rollback(ctx) - if _, err := tx.Exec(ctx, - "DELETE FROM track_trending_scores WHERE type = $1 AND version = $2", - trendingType, version, - ); err != nil { - return err + // Stage this cycle's scores in a temp table, then reconcile against the + // live table (upsert changed/new rows, delete dropped-out rows). The score + // expressions below are byte-for-byte the discovery templates; only the + // write target changed (they now feed tmp_track_scores instead of a + // DELETE+INSERT of track_trending_scores), and the bind parameters were + // renumbered without gaps because the (constant) type/version no longer + // appear in the staged SELECT — type/version are applied at the upsert. + if _, err := tx.Exec(ctx, ` + CREATE TEMP TABLE tmp_track_scores ( + track_id integer, + genre varchar, + time_range varchar, + score double precision + ) ON COMMIT DROP + `); err != nil { + return fmt.Errorf("create temp: %w", err) } // Week + month follow the same shape; we generate one query per range. @@ -168,38 +224,36 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v {"month", trendingMo, "aip.month_listen_counts", "tp.repost_month_count", "tp.save_month_count"}, } { q := fmt.Sprintf(` - INSERT INTO track_trending_scores - (track_id, genre, type, version, time_range, score, created_at) + INSERT INTO tmp_track_scores (track_id, genre, time_range, score) SELECT tp.track_id, tp.genre, - $1, $2, $3, + $1, CASE - WHEN tp.owner_follower_count < $4 THEN 0 + WHEN tp.owner_follower_count < $2 THEN 0 WHEN EXTRACT(DAYS FROM now() - ( CASE WHEN tp.release_date > now() THEN aip.created_at ELSE GREATEST(tp.release_date, aip.created_at) END - )) > $5 THEN GREATEST( - 1.0 / $6, - POW($6, GREATEST( + )) > $3 THEN GREATEST( + 1.0 / $4, + POW($4, GREATEST( -10, 1.0 - 1.0 * EXTRACT(DAYS FROM now() - ( CASE WHEN tp.release_date > now() THEN aip.created_at ELSE GREATEST(tp.release_date, aip.created_at) END - )) / $5 + )) / $3 )) ) * ( - $7 * %s + $8 * %s + $9 * %s + $10 * tp.repost_count + $11 * tp.save_count + $5 * %s + $6 * %s + $7 * %s + $8 * tp.repost_count + $9 * tp.save_count ) * %s ELSE ( - $7 * %s + $8 * %s + $9 * %s + $10 * tp.repost_count + $11 * tp.save_count + $5 * %s + $6 * %s + $7 * %s + $8 * tp.repost_count + $9 * tp.save_count ) * %s - END, - now() + END FROM trending_params tp INNER JOIN aggregate_interval_plays aip ON tp.track_id = aip.track_id `, @@ -207,7 +261,7 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v r.listenField, r.repostField, r.saveField, p.karmaExpr, ) if _, err := tx.Exec(ctx, q, - trendingType, version, r.name, + r.name, trendingY, r.days, trendingQ, trendingN, trendingF, trendingO, trendingR, trendingI, ); err != nil { @@ -217,15 +271,13 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v // All-time uses aggregate_plays.count rather than aggregate_interval_plays. q := fmt.Sprintf(` - INSERT INTO track_trending_scores - (track_id, genre, type, version, time_range, score, created_at) + INSERT INTO tmp_track_scores (track_id, genre, time_range, score) SELECT - tp.track_id, tp.genre, $1, $2, 'allTime', + tp.track_id, tp.genre, 'allTime', CASE - WHEN tp.owner_follower_count < $3 THEN 0 - ELSE ($4 * ap.count + $5 * tp.repost_count + $6 * tp.save_count) * %s - END, - now() + WHEN tp.owner_follower_count < $1 THEN 0 + ELSE ($2 * ap.count + $3 * tp.repost_count + $4 * tp.save_count) * %s + END FROM trending_params tp INNER JOIN aggregate_plays ap ON tp.track_id = ap.play_item_id INNER JOIN tracks t ON ap.play_item_id = t.track_id @@ -235,12 +287,41 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v AND t.stem_of IS NULL `, p.karmaExpr) if _, err := tx.Exec(ctx, q, - trendingType, version, trendingY, trendingN, trendingR, trendingI, ); err != nil { return fmt.Errorf("allTime range: %w", err) } + // Upsert new/changed rows; the WHERE guard skips rows whose score and genre + // are unchanged so a no-op cycle writes (and dirties indexes for) nothing. + if _, err := tx.Exec(ctx, ` + INSERT INTO track_trending_scores + (track_id, genre, type, version, time_range, score, created_at) + SELECT s.track_id, s.genre, $1, $2, s.time_range, s.score, now() + FROM tmp_track_scores s + ON CONFLICT (track_id, type, version, time_range) DO UPDATE + SET score = EXCLUDED.score, + genre = EXCLUDED.genre, + created_at = EXCLUDED.created_at + WHERE track_trending_scores.score IS DISTINCT FROM EXCLUDED.score + OR track_trending_scores.genre IS DISTINCT FROM EXCLUDED.genre + `, trendingType, version); err != nil { + return fmt.Errorf("upsert: %w", err) + } + + // Delete rows that no longer qualify this cycle (e.g. track deleted/unlisted + // or aged out of the interval window). + if _, err := tx.Exec(ctx, ` + DELETE FROM track_trending_scores t + WHERE t.type = $1 AND t.version = $2 + AND NOT EXISTS ( + SELECT 1 FROM tmp_track_scores s + WHERE s.track_id = t.track_id AND s.time_range = t.time_range + ) + `, trendingType, version); err != nil { + return fmt.Errorf("prune stale: %w", err) + } + return tx.Commit(ctx) } @@ -253,11 +334,19 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType } defer tx.Rollback(ctx) - if _, err := tx.Exec(ctx, - "DELETE FROM playlist_trending_scores WHERE type = $1 AND version = $2", - trendingType, version, - ); err != nil { - return err + // Stage this cycle's scores, then upsert/prune (see computeTrendingTracks + // for the rationale). The per-range SELECT is the verbatim discovery + // template; only its write target changed, and the bind parameters were + // renumbered without gaps now that the (constant) type/version are applied + // at the upsert instead of in the staged SELECT. + if _, err := tx.Exec(ctx, ` + CREATE TEMP TABLE tmp_playlist_scores ( + playlist_id integer, + time_range varchar, + score double precision + ) ON COMMIT DROP + `); err != nil { + return fmt.Errorf("create temp: %w", err) } // Each time range gets its own query because the interval (':week days' @@ -272,8 +361,7 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType } { intervalLit := fmt.Sprintf("%d days", r.days) q := ` - INSERT INTO playlist_trending_scores - (playlist_id, type, version, time_range, score, created_at) + INSERT INTO tmp_playlist_scores (playlist_id, time_range, score) WITH saves_and_reposts AS ( SELECT user_id, repost_item_id AS item_id FROM reposts @@ -326,31 +414,30 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType ) SELECT p.playlist_id, - $2, $3, $4, + $2, CASE - WHEN au.follower_count < $5 THEN 0 + WHEN au.follower_count < $3 THEN 0 WHEN EXTRACT(DAYS FROM now() - ( CASE WHEN p.release_date > now() THEN p.created_at ELSE GREATEST(p.release_date, p.created_at) END - )) > $6 THEN GREATEST( - 1.0 / $7, - POW($7, GREATEST( + )) > $4 THEN GREATEST( + 1.0 / $5, + POW($5, GREATEST( -10, 1.0 - 1.0 * EXTRACT(DAYS FROM now() - ( CASE WHEN p.release_date > now() THEN p.created_at ELSE GREATEST(p.release_date, p.created_at) END - )) / $6 + )) / $4 )) ) * ( - $8 * 1 + $9 * COALESCE(rp.week_count, 0) + $10 * COALESCE(s.week_count, 0) - + $11 * COALESCE(ap.repost_count, 0) + $12 * COALESCE(ap.save_count, 0) + $6 * 1 + $7 * COALESCE(rp.week_count, 0) + $8 * COALESCE(s.week_count, 0) + + $9 * COALESCE(ap.repost_count, 0) + $10 * COALESCE(ap.save_count, 0) ) * COALESCE(k.karma, 1) ELSE ( - $8 * 1 + $9 * COALESCE(rp.week_count, 0) + $10 * COALESCE(s.week_count, 0) - + $11 * COALESCE(ap.repost_count, 0) + $12 * COALESCE(ap.save_count, 0) + $6 * 1 + $7 * COALESCE(rp.week_count, 0) + $8 * COALESCE(s.week_count, 0) + + $9 * COALESCE(ap.repost_count, 0) + $10 * COALESCE(ap.save_count, 0) ) * COALESCE(k.karma, 1) - END, - now() + END FROM playlists p INNER JOIN aggregate_user au ON p.playlist_owner_id = au.user_id LEFT JOIN aggregate_playlist ap ON p.playlist_id = ap.playlist_id @@ -363,8 +450,7 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType AND jsonb_array_length(p.playlist_contents->'track_ids') >= 3 ` if _, err := tx.Exec(ctx, q, - intervalLit, - trendingType, version, r.name, + intervalLit, r.name, trendingY, r.days, trendingQ, trendingN, trendingF, trendingO, trendingR, trendingI, ); err != nil { @@ -372,6 +458,31 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType } } + // Upsert new/changed rows; skip rows whose score is unchanged. + if _, err := tx.Exec(ctx, ` + INSERT INTO playlist_trending_scores + (playlist_id, type, version, time_range, score, created_at) + SELECT s.playlist_id, $1, $2, s.time_range, s.score, now() + FROM tmp_playlist_scores s + ON CONFLICT (playlist_id, type, version, time_range) DO UPDATE + SET score = EXCLUDED.score, + created_at = EXCLUDED.created_at + WHERE playlist_trending_scores.score IS DISTINCT FROM EXCLUDED.score + `, trendingType, version); err != nil { + return fmt.Errorf("upsert: %w", err) + } + + // Delete rows that no longer qualify this cycle. + if _, err := tx.Exec(ctx, ` + DELETE FROM playlist_trending_scores t + WHERE t.type = $1 AND t.version = $2 + AND NOT EXISTS ( + SELECT 1 FROM tmp_playlist_scores s + WHERE s.playlist_id = t.playlist_id AND s.time_range = t.time_range + ) + `, trendingType, version); err != nil { + return fmt.Errorf("prune stale: %w", err) + } + return tx.Commit(ctx) } - diff --git a/jobs/index_trending_test.go b/jobs/index_trending_test.go index 63a5600d..74159480 100644 --- a/jobs/index_trending_test.go +++ b/jobs/index_trending_test.go @@ -44,10 +44,50 @@ func TestTrendingJob_PopulatesScores(t *testing.T) { require.NoError(t, pool.QueryRow(ctx, "SELECT COUNT(*) FROM track_trending_scores WHERE track_id = 100").Scan(&nTracks)) assert.GreaterOrEqual(t, nTracks, 9, "expected at least 9 score rows for the seeded track") - // Re-running should not duplicate (DELETE-then-INSERT per - // (type, version)). + // Re-running should not duplicate (upsert keyed on the PK). require.NoError(t, job.run(ctx)) var nAfter int require.NoError(t, pool.QueryRow(ctx, "SELECT COUNT(*) FROM track_trending_scores WHERE track_id = 100").Scan(&nAfter)) assert.Equal(t, nTracks, nAfter, "second run must not duplicate rows") } + +// TestTrendingJob_PrunesStaleRows verifies the anti-join prune: a track that +// drops out of the trending source set (here, by being deleted) has its +// previously-written score rows removed on the next run. The old +// DELETE-then-INSERT approach got this for free; the upsert+prune rewrite has +// to do it explicitly, so it's worth a dedicated test. +func TestTrendingJob_PrunesStaleRows(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_jobs") + defer pool.Close() + ctx := context.Background() + + now := time.Now() + database.Seed(pool, database.FixtureMap{ + "users": {{"user_id": 1, "wallet": "0x01", "name": "Alice"}}, + "tracks": {{ + "track_id": 200, "owner_id": 1, "title": "T", + "release_date": now.Add(-2 * 24 * time.Hour), + "created_at": now.Add(-2 * 24 * time.Hour), + }}, + "plays": { + {"id": 1, "user_id": 1, "play_item_id": 200, "created_at": now.Add(-1 * time.Hour)}, + }, + }) + + job := NewTrendingJob(newTestConfig(), pool) + require.NoError(t, job.run(ctx)) + + var nBefore int + require.NoError(t, pool.QueryRow(ctx, "SELECT COUNT(*) FROM track_trending_scores WHERE track_id = 200").Scan(&nBefore)) + require.Greater(t, nBefore, 0, "expected score rows for the seeded track before deletion") + + // Remove the track from the trending source set, then re-run. The + // anti-join prune must clear its now-stale score rows. + _, err := pool.Exec(ctx, "UPDATE tracks SET is_delete = true WHERE track_id = 200") + require.NoError(t, err) + require.NoError(t, job.run(ctx)) + + var nAfter int + require.NoError(t, pool.QueryRow(ctx, "SELECT COUNT(*) FROM track_trending_scores WHERE track_id = 200").Scan(&nAfter)) + assert.Equal(t, 0, nAfter, "stale score rows must be pruned after the track leaves the source set") +} From f1ff6264ee97cf71e47d6977071c47da21e03a6d Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Mon, 1 Jun 2026 15:03:53 -0700 Subject: [PATCH 2/2] docs(trending): trim verbose comments Condense the package doc, refreshMatview doc, migration 0214 header, and the indexer cadence comment to the essential rationale. Co-Authored-By: Claude Opus 4.7 --- .../0214_trending_matview_unique_indexes.sql | 27 +++------ indexer/indexer.go | 9 +-- jobs/index_trending.go | 55 +++++-------------- 3 files changed, 25 insertions(+), 66 deletions(-) diff --git a/ddl/migrations/0214_trending_matview_unique_indexes.sql b/ddl/migrations/0214_trending_matview_unique_indexes.sql index 2249bd8f..07870541 100644 --- a/ddl/migrations/0214_trending_matview_unique_indexes.sql +++ b/ddl/migrations/0214_trending_matview_unique_indexes.sql @@ -1,22 +1,13 @@ --- Promote the track_id indexes on the trending materialized views to UNIQUE. +-- Promote the track_id indexes on the trending matviews to UNIQUE so +-- TrendingJob can use REFRESH MATERIALIZED VIEW CONCURRENTLY (which avoids the +-- ACCESS EXCLUSIVE lock a plain refresh takes). CONCURRENTLY requires a UNIQUE +-- index with no WHERE clause; both matviews are one row per track (verified in +-- prod: row count == COUNT(DISTINCT track_id)), so this is valid. We replace +-- the existing non-unique indexes in place. -- --- TrendingJob refreshes aggregate_interval_plays and trending_params on every --- run. Today it uses a plain `REFRESH MATERIALIZED VIEW`, which takes an --- ACCESS EXCLUSIVE lock on the matview for the duration of the (multi-minute, --- 64GB-plays-scanning) rebuild — blocking every reader, including the --- block-indexing loop's downstream queries. --- --- `REFRESH MATERIALIZED VIEW CONCURRENTLY` avoids that lock, but Postgres --- requires a UNIQUE index with no WHERE clause on the matview to use it. Both --- matviews are exactly one row per track (verified in prod: row count == --- COUNT(DISTINCT track_id) for each), so a unique index on track_id is valid. --- We replace the existing NON-unique track_id indexes in place rather than add --- a second redundant index on the same column. --- --- NOTE: intentionally NOT wrapped in BEGIN/COMMIT. Each statement autocommits --- so the brief ACCESS EXCLUSIVE taken by DROP/CREATE INDEX is held only for the --- (sub-second, ~1.4M-row) index build, not the whole file. IF NOT EXISTS keeps --- the CREATE idempotent if the migration is re-applied. +-- Intentionally NOT wrapped in BEGIN/COMMIT: each statement autocommits so the +-- brief ACCESS EXCLUSIVE from DROP/CREATE INDEX is held only for the index +-- build. IF NOT EXISTS keeps re-application idempotent. DROP INDEX IF EXISTS public.interval_play_track_id_idx; CREATE UNIQUE INDEX IF NOT EXISTS interval_play_track_id_idx diff --git a/indexer/indexer.go b/indexer/indexer.go index d49df2cd..fef46e02 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -148,12 +148,9 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) { jobs.NewUserListeningHistoryJob(ci.Config, ci.pool). ScheduleEvery(ctx, 5*time.Second) - // Hourly to match discovery's effective cadence: apps' celery beat ticked - // index_trending every 10s but an internal gate (trending_refresh_seconds, - // default 3600) only recomputed once an hour. The vendored port dropped - // that gate, so a 10s schedule meant the multi-minute recompute ran - // continuously, IO-starving the block-indexing loop. Trending leaderboards - // don't need sub-hour freshness. + // Hourly to match discovery's effective cadence (trending_refresh_seconds + // default 3600). The vendored port dropped that gate, so a 10s schedule ran + // the multi-minute recompute continuously and IO-starved the block loop. jobs.NewTrendingJob(ci.Config, ci.pool). ScheduleEvery(ctx, 1*time.Hour) diff --git a/jobs/index_trending.go b/jobs/index_trending.go index e3ece9df..0d318fd6 100644 --- a/jobs/index_trending.go +++ b/jobs/index_trending.go @@ -15,38 +15,15 @@ import ( // TrendingJob recomputes trending scores. Ports the score-computation half // of apps/packages/discovery-provider/src/tasks/index_trending.py. // -// On each run: -// 1. Refresh aggregate_interval_plays MV (feeds week/month listen counts). -// 2. Refresh trending_params MV (per-track inputs). -// 3. For each (entity_type, version), reconcile track_trending_scores / -// playlist_trending_scores against this cycle's computed scores using an -// upsert (skip-unchanged) plus an anti-join delete of rows that no longer -// qualify. +// On each run it refreshes the aggregate_interval_plays and trending_params +// matviews, then for each (entity_type, version) reconciles the *_trending_scores +// tables via a skip-unchanged upsert plus an anti-join delete of rows that no +// longer qualify (instead of the old DELETE-all + bulk-INSERT, which rewrote the +// whole 16GB table and its six indexes every run). // -// Cadence: discovery ran this hourly (default `trending_refresh_seconds = 3600` -// in apps' default_config.ini — its celery beat ticked every 10s but an -// internal time gate only recomputed once an hour). The vendored port dropped -// that gate, so it must be scheduled at the real cadence (see indexer's -// startParityJobs) rather than every tick. -// -// Write strategy: step 3 used to DELETE every row for a (type, version) and -// bulk-INSERT them all back on every run, rewriting the entire 16GB -// track_trending_scores table (and all six of its indexes) regardless of -// whether any score changed. Because the decay formula has day granularity, -// most per-track scores are stable between hourly runs, so we now stage scores -// in a temp table and UPSERT with a `score IS DISTINCT` guard — turning the -// common case into a near-zero-write no-op — then delete only the rows that -// dropped out of the result set. -// -// What is intentionally NOT ported here: -// - Trending notifications (top-10 mover diff against the notification -// table). Lands with the challenges/notifications work. -// - index_tastemaker (writes challenge events). Depends on the challenge -// bus, which is a separate effort. -// -// Trending score templates are copied verbatim from -// apps/packages/discovery-provider/src/trending_strategies/{pnagD,AnlGe}_*.py -// so the scoring stays bit-identical to discovery. +// Scheduled hourly to match discovery (default trending_refresh_seconds=3600; +// see indexer's startParityJobs). Score templates are copied verbatim from +// apps' trending_strategies/{pnagD,AnlGe}_*.py so scoring stays bit-identical. type TrendingJob struct { pool database.DbPool logger *zap.Logger @@ -129,17 +106,11 @@ func (j *TrendingJob) run(ctx context.Context) error { return nil } -// refreshMatview refreshes a trending matview, preferring the CONCURRENTLY -// variant so the refresh doesn't take an ACCESS EXCLUSIVE lock that stalls -// every reader (the block-indexing loop included). -// -// CONCURRENTLY has two preconditions: the matview must already be populated, -// and it must carry a unique index (added by migration 0214). Both can be -// transiently false — right after a schema-only bootstrap the matview is -// created WITH NO DATA, and during a rolling deploy the new code can land -// before the index migration. In those windows we fall back to a blocking -// refresh so the job still makes progress; the next cycle uses CONCURRENTLY -// once the preconditions hold. +// refreshMatview prefers REFRESH ... CONCURRENTLY so it doesn't take an ACCESS +// EXCLUSIVE lock that stalls every reader (the block-indexing loop included). +// CONCURRENTLY needs the matview populated and carrying a unique index (migration +// 0214); both can be transiently false (fresh schema-only DB, or pre-migration +// during a rolling deploy), so we fall back to a blocking refresh until they hold. func (j *TrendingJob) refreshMatview(ctx context.Context, mv string) error { mvStart := time.Now() concurrent := true