Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions ddl/migrations/0214_trending_matview_unique_indexes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- Promote the track_id indexes on the trending matviews to UNIQUE so
-- TrendingJob can use REFRESH MATERIALIZED VIEW CONCURRENTLY (which avoids the
-- ACCESS EXCLUSIVE lock a plain refresh takes). CONCURRENTLY requires a UNIQUE
-- index with no WHERE clause; both matviews are one row per track (verified in
-- prod: row count == COUNT(DISTINCT track_id)), so this is valid. We replace
-- the existing non-unique indexes in place.
--
-- Intentionally NOT wrapped in BEGIN/COMMIT: each statement autocommits so the
-- brief ACCESS EXCLUSIVE from DROP/CREATE INDEX is held only for the index
-- build. IF NOT EXISTS keeps re-application idempotent.

DROP INDEX IF EXISTS public.interval_play_track_id_idx;
CREATE UNIQUE INDEX IF NOT EXISTS interval_play_track_id_idx
ON public.aggregate_interval_plays USING btree (track_id);

DROP INDEX IF EXISTS public.trending_params_track_id_idx;
CREATE UNIQUE INDEX IF NOT EXISTS trending_params_track_id_idx
ON public.trending_params USING btree (track_id);
5 changes: 4 additions & 1 deletion indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) {
jobs.NewUserListeningHistoryJob(ci.Config, ci.pool).
ScheduleEvery(ctx, 5*time.Second)

// Hourly to match discovery's effective cadence (trending_refresh_seconds
// default 3600). The vendored port dropped that gate, so a 10s schedule ran
// the multi-minute recompute continuously and IO-starved the block loop.
jobs.NewTrendingJob(ci.Config, ci.pool).
ScheduleEvery(ctx, 10*time.Second)
ScheduleEvery(ctx, 1*time.Hour)

jobs.NewUpdateDelistStatusesJob(ci.Config, ci.pool).
ScheduleEvery(ctx, 5*time.Minute)
Expand Down
212 changes: 147 additions & 65 deletions jobs/index_trending.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,15 @@ import (
// TrendingJob recomputes trending scores. Ports the score-computation half
// of apps/packages/discovery-provider/src/tasks/index_trending.py.
//
// On each run:
// 1. Refresh aggregate_interval_plays MV (feeds week/month listen counts).
// 2. Refresh trending_params MV (per-track inputs).
// 3. For each (entity_type, version), run a delete-then-bulk-insert into
// track_trending_scores / playlist_trending_scores using the strategy's
// SQL template.
// On each run it refreshes the aggregate_interval_plays and trending_params
// matviews, then for each (entity_type, version) reconciles the *_trending_scores
// tables via a skip-unchanged upsert plus an anti-join delete of rows that no
// longer qualify (instead of the old DELETE-all + bulk-INSERT, which rewrote the
// whole 16GB table and its six indexes every run).
//
// What is intentionally NOT ported here:
// - Trending notifications (top-10 mover diff against the notification
// table). Lands with the challenges/notifications work.
// - index_tastemaker (writes challenge events). Depends on the challenge
// bus, which is a separate effort.
//
// Trending score templates are copied verbatim from
// apps/packages/discovery-provider/src/trending_strategies/{pnagD,AnlGe}_*.py
// so the scoring stays bit-identical to discovery.
// Scheduled hourly to match discovery (default trending_refresh_seconds=3600;
// see indexer's startParityJobs). Score templates are copied verbatim from
// apps' trending_strategies/{pnagD,AnlGe}_*.py so scoring stays bit-identical.
type TrendingJob struct {
pool database.DbPool
logger *zap.Logger
Expand Down Expand Up @@ -88,8 +81,8 @@ func (j *TrendingJob) run(ctx context.Context) error {
start := time.Now()

for _, mv := range []string{"aggregate_interval_plays", "trending_params"} {
if _, err := j.pool.Exec(ctx, "REFRESH MATERIALIZED VIEW "+mv); err != nil {
return fmt.Errorf("refresh %s: %w", mv, err)
if err := j.refreshMatview(ctx, mv); err != nil {
return err
}
}

Expand All @@ -113,6 +106,29 @@ func (j *TrendingJob) run(ctx context.Context) error {
return nil
}

// refreshMatview prefers REFRESH ... CONCURRENTLY so it doesn't take an ACCESS
// EXCLUSIVE lock that stalls every reader (the block-indexing loop included).
// CONCURRENTLY needs the matview populated and carrying a unique index (migration
// 0214); both can be transiently false (fresh schema-only DB, or pre-migration
// during a rolling deploy), so we fall back to a blocking refresh until they hold.
func (j *TrendingJob) refreshMatview(ctx context.Context, mv string) error {
mvStart := time.Now()
concurrent := true
if _, err := j.pool.Exec(ctx, "REFRESH MATERIALIZED VIEW CONCURRENTLY "+mv); err != nil {
j.logger.Warn("concurrent matview refresh failed; falling back to blocking refresh",
zap.String("matview", mv), zap.Error(err))
concurrent = false
if _, err := j.pool.Exec(ctx, "REFRESH MATERIALIZED VIEW "+mv); err != nil {
return fmt.Errorf("refresh %s: %w", mv, err)
}
}
j.logger.Info("refreshed matview",
zap.String("matview", mv),
zap.Bool("concurrent", concurrent),
zap.Duration("duration", time.Since(mvStart)))
return nil
}

// trackScoreParams holds the scalar weights used by a tracks strategy. The
// only behavior difference between pnagD and AnlGe is whether the trailing
// multiplier is `karma` or `(1 + LOG(1 + karma))`.
Expand Down Expand Up @@ -149,11 +165,22 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v
}
defer tx.Rollback(ctx)

if _, err := tx.Exec(ctx,
"DELETE FROM track_trending_scores WHERE type = $1 AND version = $2",
trendingType, version,
); err != nil {
return err
// Stage this cycle's scores in a temp table, then reconcile against the
// live table (upsert changed/new rows, delete dropped-out rows). The score
// expressions below are byte-for-byte the discovery templates; only the
// write target changed (they now feed tmp_track_scores instead of a
// DELETE+INSERT of track_trending_scores), and the bind parameters were
// renumbered without gaps because the (constant) type/version no longer
// appear in the staged SELECT — type/version are applied at the upsert.
if _, err := tx.Exec(ctx, `
CREATE TEMP TABLE tmp_track_scores (
track_id integer,
genre varchar,
time_range varchar,
score double precision
) ON COMMIT DROP
`); err != nil {
return fmt.Errorf("create temp: %w", err)
}

// Week + month follow the same shape; we generate one query per range.
Expand All @@ -168,46 +195,44 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v
{"month", trendingMo, "aip.month_listen_counts", "tp.repost_month_count", "tp.save_month_count"},
} {
q := fmt.Sprintf(`
INSERT INTO track_trending_scores
(track_id, genre, type, version, time_range, score, created_at)
INSERT INTO tmp_track_scores (track_id, genre, time_range, score)
SELECT
tp.track_id,
tp.genre,
$1, $2, $3,
$1,
CASE
WHEN tp.owner_follower_count < $4 THEN 0
WHEN tp.owner_follower_count < $2 THEN 0
WHEN EXTRACT(DAYS FROM now() - (
CASE
WHEN tp.release_date > now() THEN aip.created_at
ELSE GREATEST(tp.release_date, aip.created_at)
END
)) > $5 THEN GREATEST(
1.0 / $6,
POW($6, GREATEST(
)) > $3 THEN GREATEST(
1.0 / $4,
POW($4, GREATEST(
-10,
1.0 - 1.0 * EXTRACT(DAYS FROM now() - (
CASE
WHEN tp.release_date > now() THEN aip.created_at
ELSE GREATEST(tp.release_date, aip.created_at)
END
)) / $5
)) / $3
))
) * (
$7 * %s + $8 * %s + $9 * %s + $10 * tp.repost_count + $11 * tp.save_count
$5 * %s + $6 * %s + $7 * %s + $8 * tp.repost_count + $9 * tp.save_count
) * %s
ELSE (
$7 * %s + $8 * %s + $9 * %s + $10 * tp.repost_count + $11 * tp.save_count
$5 * %s + $6 * %s + $7 * %s + $8 * tp.repost_count + $9 * tp.save_count
) * %s
END,
now()
END
FROM trending_params tp
INNER JOIN aggregate_interval_plays aip ON tp.track_id = aip.track_id
`,
r.listenField, r.repostField, r.saveField, p.karmaExpr,
r.listenField, r.repostField, r.saveField, p.karmaExpr,
)
if _, err := tx.Exec(ctx, q,
trendingType, version, r.name,
r.name,
trendingY, r.days, trendingQ,
trendingN, trendingF, trendingO, trendingR, trendingI,
); err != nil {
Expand All @@ -217,15 +242,13 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v

// All-time uses aggregate_plays.count rather than aggregate_interval_plays.
q := fmt.Sprintf(`
INSERT INTO track_trending_scores
(track_id, genre, type, version, time_range, score, created_at)
INSERT INTO tmp_track_scores (track_id, genre, time_range, score)
SELECT
tp.track_id, tp.genre, $1, $2, 'allTime',
tp.track_id, tp.genre, 'allTime',
CASE
WHEN tp.owner_follower_count < $3 THEN 0
ELSE ($4 * ap.count + $5 * tp.repost_count + $6 * tp.save_count) * %s
END,
now()
WHEN tp.owner_follower_count < $1 THEN 0
ELSE ($2 * ap.count + $3 * tp.repost_count + $4 * tp.save_count) * %s
END
FROM trending_params tp
INNER JOIN aggregate_plays ap ON tp.track_id = ap.play_item_id
INNER JOIN tracks t ON ap.play_item_id = t.track_id
Expand All @@ -235,12 +258,41 @@ func (j *TrendingJob) computeTrendingTracks(ctx context.Context, trendingType, v
AND t.stem_of IS NULL
`, p.karmaExpr)
if _, err := tx.Exec(ctx, q,
trendingType, version,
trendingY, trendingN, trendingR, trendingI,
); err != nil {
return fmt.Errorf("allTime range: %w", err)
}

// Upsert new/changed rows; the WHERE guard skips rows whose score and genre
// are unchanged so a no-op cycle writes (and dirties indexes for) nothing.
if _, err := tx.Exec(ctx, `
INSERT INTO track_trending_scores
(track_id, genre, type, version, time_range, score, created_at)
SELECT s.track_id, s.genre, $1, $2, s.time_range, s.score, now()
FROM tmp_track_scores s
ON CONFLICT (track_id, type, version, time_range) DO UPDATE
SET score = EXCLUDED.score,
genre = EXCLUDED.genre,
created_at = EXCLUDED.created_at
WHERE track_trending_scores.score IS DISTINCT FROM EXCLUDED.score
OR track_trending_scores.genre IS DISTINCT FROM EXCLUDED.genre
`, trendingType, version); err != nil {
return fmt.Errorf("upsert: %w", err)
}

// Delete rows that no longer qualify this cycle (e.g. track deleted/unlisted
// or aged out of the interval window).
if _, err := tx.Exec(ctx, `
DELETE FROM track_trending_scores t
WHERE t.type = $1 AND t.version = $2
AND NOT EXISTS (
SELECT 1 FROM tmp_track_scores s
WHERE s.track_id = t.track_id AND s.time_range = t.time_range
)
`, trendingType, version); err != nil {
return fmt.Errorf("prune stale: %w", err)
}

return tx.Commit(ctx)
}

Expand All @@ -253,11 +305,19 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType
}
defer tx.Rollback(ctx)

if _, err := tx.Exec(ctx,
"DELETE FROM playlist_trending_scores WHERE type = $1 AND version = $2",
trendingType, version,
); err != nil {
return err
// Stage this cycle's scores, then upsert/prune (see computeTrendingTracks
// for the rationale). The per-range SELECT is the verbatim discovery
// template; only its write target changed, and the bind parameters were
// renumbered without gaps now that the (constant) type/version are applied
// at the upsert instead of in the staged SELECT.
if _, err := tx.Exec(ctx, `
CREATE TEMP TABLE tmp_playlist_scores (
playlist_id integer,
time_range varchar,
score double precision
) ON COMMIT DROP
`); err != nil {
return fmt.Errorf("create temp: %w", err)
}

// Each time range gets its own query because the interval (':week days'
Expand All @@ -272,8 +332,7 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType
} {
intervalLit := fmt.Sprintf("%d days", r.days)
q := `
INSERT INTO playlist_trending_scores
(playlist_id, type, version, time_range, score, created_at)
INSERT INTO tmp_playlist_scores (playlist_id, time_range, score)
WITH saves_and_reposts AS (
SELECT user_id, repost_item_id AS item_id
FROM reposts
Expand Down Expand Up @@ -326,31 +385,30 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType
)
SELECT
p.playlist_id,
$2, $3, $4,
$2,
CASE
WHEN au.follower_count < $5 THEN 0
WHEN au.follower_count < $3 THEN 0
WHEN EXTRACT(DAYS FROM now() - (
CASE WHEN p.release_date > now() THEN p.created_at
ELSE GREATEST(p.release_date, p.created_at) END
)) > $6 THEN GREATEST(
1.0 / $7,
POW($7, GREATEST(
)) > $4 THEN GREATEST(
1.0 / $5,
POW($5, GREATEST(
-10,
1.0 - 1.0 * EXTRACT(DAYS FROM now() - (
CASE WHEN p.release_date > now() THEN p.created_at
ELSE GREATEST(p.release_date, p.created_at) END
)) / $6
)) / $4
))
) * (
$8 * 1 + $9 * COALESCE(rp.week_count, 0) + $10 * COALESCE(s.week_count, 0)
+ $11 * COALESCE(ap.repost_count, 0) + $12 * COALESCE(ap.save_count, 0)
$6 * 1 + $7 * COALESCE(rp.week_count, 0) + $8 * COALESCE(s.week_count, 0)
+ $9 * COALESCE(ap.repost_count, 0) + $10 * COALESCE(ap.save_count, 0)
) * COALESCE(k.karma, 1)
ELSE (
$8 * 1 + $9 * COALESCE(rp.week_count, 0) + $10 * COALESCE(s.week_count, 0)
+ $11 * COALESCE(ap.repost_count, 0) + $12 * COALESCE(ap.save_count, 0)
$6 * 1 + $7 * COALESCE(rp.week_count, 0) + $8 * COALESCE(s.week_count, 0)
+ $9 * COALESCE(ap.repost_count, 0) + $10 * COALESCE(ap.save_count, 0)
) * COALESCE(k.karma, 1)
END,
now()
END
FROM playlists p
INNER JOIN aggregate_user au ON p.playlist_owner_id = au.user_id
LEFT JOIN aggregate_playlist ap ON p.playlist_id = ap.playlist_id
Expand All @@ -363,15 +421,39 @@ func (j *TrendingJob) computeTrendingPlaylists(ctx context.Context, trendingType
AND jsonb_array_length(p.playlist_contents->'track_ids') >= 3
`
if _, err := tx.Exec(ctx, q,
intervalLit,
trendingType, version, r.name,
intervalLit, r.name,
trendingY, r.days, trendingQ,
trendingN, trendingF, trendingO, trendingR, trendingI,
); err != nil {
return fmt.Errorf("%s range: %w", r.name, err)
}
}

// Upsert new/changed rows; skip rows whose score is unchanged.
if _, err := tx.Exec(ctx, `
INSERT INTO playlist_trending_scores
(playlist_id, type, version, time_range, score, created_at)
SELECT s.playlist_id, $1, $2, s.time_range, s.score, now()
FROM tmp_playlist_scores s
ON CONFLICT (playlist_id, type, version, time_range) DO UPDATE
SET score = EXCLUDED.score,
created_at = EXCLUDED.created_at
WHERE playlist_trending_scores.score IS DISTINCT FROM EXCLUDED.score
`, trendingType, version); err != nil {
return fmt.Errorf("upsert: %w", err)
}

// Delete rows that no longer qualify this cycle.
if _, err := tx.Exec(ctx, `
DELETE FROM playlist_trending_scores t
WHERE t.type = $1 AND t.version = $2
AND NOT EXISTS (
SELECT 1 FROM tmp_playlist_scores s
WHERE s.playlist_id = t.playlist_id AND s.time_range = t.time_range
)
`, trendingType, version); err != nil {
return fmt.Errorf("prune stale: %w", err)
}

return tx.Commit(ctx)
}

Loading
Loading