From 5eeeb59053242cac64e706f0bb33680663c9d961 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Mon, 1 Jun 2026 20:02:33 -0700 Subject: [PATCH 1/3] perf(api): delta-based social aggregate counts + reconciliation backstop handle_repost/handle_save now maintain aggregate counts with O(1) deltas instead of recounting with count(*) on every social write (consistency with handle_follow). The delta is transition-aware (active = not is_delete) so it is correct for upsert-in-place toggles and idempotent (delta 0) on re-delivery. All three triggers move to AFTER INSERT OR UPDATE to support the entity_manager upsert. Adds ReconcileAggregatesJob, a low-priority drift backstop (every 10m, matching discovery's update_aggregates celery cadence) that recomputes the count columns from source tables. Column-disjoint from the score-only AggregatesCalculator. NOTE: requires the go-openaudio entity_manager social-upsert change to be deployed FIRST. The AFTER UPDATE triggers only behave correctly once the demote-then-insert writes are replaced by in-place upserts. Co-Authored-By: Claude Opus 4.7 --- ddl/functions/handle_follow.sql | 20 +-- ddl/functions/handle_repost.sql | 52 ++---- ddl/functions/handle_save.sql | 55 ++----- indexer/indexer.go | 7 + jobs/reconcile_aggregates.go | 275 ++++++++++++++++++++++++++++++++ 5 files changed, 314 insertions(+), 95 deletions(-) create mode 100644 jobs/reconcile_aggregates.go diff --git a/ddl/functions/handle_follow.sql b/ddl/functions/handle_follow.sql index c2fdf01e..35057de8 100644 --- a/ddl/functions/handle_follow.sql +++ b/ddl/functions/handle_follow.sql @@ -8,12 +8,9 @@ begin insert into aggregate_user (user_id) values (new.followee_user_id) on conflict do nothing; insert into aggregate_user (user_id) values (new.follower_user_id) on conflict do nothing; - -- increment or decrement? - if new.is_delete then - delta := -1; - else - delta := 1; - end if; + -- transition-aware delta (active = not is_delete); 0 on no-op re-delivery + delta := (case when new.is_delete then 0 else 1 end) + - (case when tg_op = 'UPDATE' and old.is_delete is false then 1 else 0 end); update aggregate_user set following_count = following_count + delta @@ -80,10 +77,7 @@ exception end; $$ language plpgsql; -do $$ begin - create trigger on_follow - after insert on follows - for each row execute procedure handle_follow(); -exception - when others then null; -end $$; +drop trigger if exists on_follow on follows; +create trigger on_follow +after insert or update on follows +for each row execute procedure handle_follow(); diff --git a/ddl/functions/handle_repost.sql b/ddl/functions/handle_repost.sql index e0136426..364d1c4f 100644 --- a/ddl/functions/handle_repost.sql +++ b/ddl/functions/handle_repost.sql @@ -33,37 +33,20 @@ begin where ap.playlist_id = new.repost_item_id; end if; - -- increment or decrement? - if new.is_delete then - delta := -1; - else - delta := 1; - end if; + -- transition-aware delta (active = not is_delete); 0 on no-op re-delivery + delta := (case when new.is_delete then 0 else 1 end) + - (case when tg_op = 'UPDATE' and old.is_delete is false then 1 else 0 end); -- update agg user - update aggregate_user - set repost_count = ( - select count(*) - from reposts r - where r.is_current is true - and r.is_delete is false - and r.user_id = new.user_id - ) + update aggregate_user + set repost_count = repost_count + delta where user_id = new.user_id; -- update agg track or playlist if new.repost_type = 'track' then milestone_name := 'TRACK_REPOST_COUNT'; - update aggregate_track - set repost_count = ( - select count(*) - from reposts r - where - r.is_current is true - and r.is_delete is false - and r.repost_type = new.repost_type - and r.repost_item_id = new.repost_item_id - ) + update aggregate_track + set repost_count = repost_count + delta where track_id = new.repost_item_id returning repost_count into new_val; if new.is_delete IS FALSE then @@ -72,15 +55,7 @@ begin else milestone_name := 'PLAYLIST_REPOST_COUNT'; update aggregate_playlist - set repost_count = ( - select count(*) - from reposts r - where - r.is_current is true - and r.is_delete is false - and r.repost_type = new.repost_type - and r.repost_item_id = new.repost_item_id - ) + set repost_count = repost_count + delta where playlist_id = new.repost_item_id returning repost_count into new_val; @@ -243,10 +218,7 @@ end; $$ language plpgsql; -do $$ begin - create trigger on_repost - after insert on reposts - for each row execute procedure handle_repost(); -exception - when others then null; -end $$; \ No newline at end of file +drop trigger if exists on_repost on reposts; +create trigger on_repost +after insert or update on reposts +for each row execute procedure handle_repost(); \ No newline at end of file diff --git a/ddl/functions/handle_save.sql b/ddl/functions/handle_save.sql index 21afc0f8..3119fda3 100644 --- a/ddl/functions/handle_save.sql +++ b/ddl/functions/handle_save.sql @@ -63,42 +63,24 @@ begin ) into is_purchased; end if; - -- increment or decrement? - if new.is_delete then - delta := -1; - else - delta := 1; - end if; + -- transition-aware delta (active = not is_delete); 0 on no-op re-delivery + delta := (case when new.is_delete then 0 else 1 end) + - (case when tg_op = 'UPDATE' and old.is_delete is false then 1 else 0 end); -- update agg track or playlist if new.save_type = 'track' then milestone_name := 'TRACK_SAVE_COUNT'; - update aggregate_track - set save_count = ( - select count(*) - from saves r - where - r.is_current is true - and r.is_delete is false - and r.save_type = new.save_type - and r.save_item_id = new.save_item_id - ) + update aggregate_track + set save_count = save_count + delta where track_id = new.save_item_id returning save_count into new_val; -- update agg user - update aggregate_user - set track_save_count = ( - select count(*) - from saves r - where r.is_current is true - and r.is_delete is false - and r.user_id = new.user_id - and r.save_type = new.save_type - ) + update aggregate_user + set track_save_count = track_save_count + delta where user_id = new.user_id; - + if new.is_delete IS FALSE then select tracks.owner_id, tracks.remix_of into owner_user_id, track_remix_of from tracks where is_current and track_id = new.save_item_id; end if; @@ -106,15 +88,7 @@ begin milestone_name := 'PLAYLIST_SAVE_COUNT'; update aggregate_playlist - set save_count = ( - select count(*) - from saves r - where - r.is_current is true - and r.is_delete is false - and r.save_type = new.save_type - and r.save_item_id = new.save_item_id - ) + set save_count = save_count + delta where playlist_id = new.save_item_id returning save_count into new_val; @@ -284,10 +258,7 @@ end; $$ language plpgsql; -do $$ begin - create trigger on_save - after insert on saves - for each row execute procedure handle_save(); -exception - when others then null; -end $$; +drop trigger if exists on_save on saves; +create trigger on_save +after insert or update on saves +for each row execute procedure handle_save(); diff --git a/indexer/indexer.go b/indexer/indexer.go index fef46e02..0291434e 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -157,6 +157,13 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) { jobs.NewUpdateDelistStatusesJob(ci.Config, ci.pool). ScheduleEvery(ctx, 5*time.Minute) + // Drift backstop: recompute aggregate counts from source tables to correct + // any hot-path trigger delta divergence. 10m matches discovery's + // update_aggregates celery cadence. Column-disjoint from the score-only + // AggregatesCalculator, so they can run concurrently. + jobs.NewReconcileAggregatesJob(ci.Config, ci.pool). + ScheduleEvery(ctx, 10*time.Minute) + // Reconcile derived challenge state from source tables. Per-challenge // scanners live in api/jobs/challenges/. jobs.NewIndexChallengesJob(ci.Config, ci.pool). diff --git a/jobs/reconcile_aggregates.go b/jobs/reconcile_aggregates.go new file mode 100644 index 00000000..1e9cd14c --- /dev/null +++ b/jobs/reconcile_aggregates.go @@ -0,0 +1,275 @@ +package jobs + +import ( + "context" + "fmt" + "sync" + "time" + + "api.audius.co/config" + "api.audius.co/database" + "api.audius.co/logging" + "go.uber.org/zap" +) + +// ReconcileAggregatesJob is the drift backstop for aggregate counts. The +// per-action triggers (handle_repost/handle_save/handle_follow) maintain the +// counts on the hot path with O(1) deltas; this job periodically recomputes +// them from the source tables and corrects any divergence. Ported from +// discovery-provider's update_aggregates.py (celery, every 10 minutes). +// +// It only writes the count columns and is column-disjoint from +// UpdateAggregatesJob, which owns aggregate_user.score. +type ReconcileAggregatesJob struct { + pool database.DbPool + logger *zap.Logger + + mutex sync.Mutex + isRunning bool +} + +func NewReconcileAggregatesJob(cfg config.Config, pool database.DbPool) *ReconcileAggregatesJob { + return &ReconcileAggregatesJob{ + pool: pool, + logger: logging.NewZapLogger(cfg).Named("ReconcileAggregatesJob"), + } +} + +// ScheduleEvery runs the job every `interval` until the context is cancelled. +func (j *ReconcileAggregatesJob) ScheduleEvery(ctx context.Context, interval time.Duration) *ReconcileAggregatesJob { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + j.Run(ctx) + case <-ctx.Done(): + j.logger.Info("Job shutting down") + return + } + } + }() + return j +} + +// Run executes the job once. +func (j *ReconcileAggregatesJob) Run(ctx context.Context) { + if err := j.run(ctx); err != nil { + j.logger.Error("Job run failed", zap.Error(err)) + } +} + +func (j *ReconcileAggregatesJob) run(ctx context.Context) error { + j.mutex.Lock() + if j.isRunning { + j.mutex.Unlock() + return fmt.Errorf("job is already running") + } + j.isRunning = true + j.mutex.Unlock() + defer func() { + j.mutex.Lock() + j.isRunning = false + j.mutex.Unlock() + }() + + for _, step := range []struct { + name string + query string + }{ + {"aggregate_user", reconcileAggregateUserQuery}, + {"aggregate_track", reconcileAggregateTrackQuery}, + {"aggregate_playlist", reconcileAggregatePlaylistQuery}, + } { + res, err := j.pool.Exec(ctx, step.query) + if err != nil { + return fmt.Errorf("reconcile %s: %w", step.name, err) + } + if n := res.RowsAffected(); n > 0 { + j.logger.Info("Reconciled aggregates", zap.String("table", step.name), zap.Int64("corrected", n)) + } + } + return nil +} + +const reconcileAggregateUserQuery = ` +with user_repost as ( + select user_id, count(*) as repost_count + from reposts r + where r.is_current is true and r.is_delete is false + group by user_id +), +user_save as ( + select user_id, count(*) as track_save_count + from saves s + where s.is_current is true and s.is_delete is false and s.save_type = 'track' + group by user_id +), +user_following as ( + select follower_user_id as user_id, count(*) as following_count + from follows + where is_current = true and is_delete = false + group by follower_user_id +), +user_follower as ( + select followee_user_id as user_id, count(*) as follower_count + from follows + where is_current = true and is_delete = false + group by followee_user_id +), +user_album as ( + select playlist_owner_id as user_id, count(*) as album_count + from playlists p + where p.is_album is true and p.is_current is true and p.is_delete is false and p.is_private is false + group by playlist_owner_id +), +user_playlist as ( + select playlist_owner_id as user_id, count(*) as playlist_count + from playlists p + where p.is_album is false and p.is_current is true and p.is_delete is false and p.is_private is false + group by playlist_owner_id +), +user_track as ( + select owner_id as user_id, count(*) as track_count + from tracks t + where t.is_current is true and t.is_delete is false and t.is_unlisted is false + and t.is_available is true and t.stem_of is null + group by owner_id +), +genre_counts as ( + select owner_id as user_id, genre, count(*) as count, max(created_at) as latest_upload + from tracks t + where t.is_current is true and t.is_delete is false and t.is_unlisted is false + and t.is_available is true and t.stem_of is null + group by genre, owner_id +), +ranked_genres as ( + select user_id, genre, count, + rank() over (partition by user_id order by count desc, latest_upload desc) as genre_rank + from genre_counts +), +new_aggregate_user as ( + select + ap.user_id, + coalesce(ut.track_count, 0) as track_count, + coalesce(up.playlist_count, 0) as playlist_count, + coalesce(ua.album_count, 0) as album_count, + coalesce(ufollower.follower_count, 0) as follower_count, + coalesce(ufollowing.following_count, 0) as following_count, + coalesce(ur.repost_count, 0) as repost_count, + coalesce(us.track_save_count, 0) as track_save_count, + rg.genre as dominant_genre, + rg.count as dominant_genre_count + from aggregate_user ap + left join user_track ut on ap.user_id = ut.user_id + left join user_playlist up on ap.user_id = up.user_id + left join user_album ua on ap.user_id = ua.user_id + left join user_follower ufollower on ap.user_id = ufollower.user_id + left join user_following ufollowing on ap.user_id = ufollowing.user_id + left join user_save us on ap.user_id = us.user_id + left join user_repost ur on ap.user_id = ur.user_id + left join ranked_genres rg on ap.user_id = rg.user_id and rg.genre_rank = 1 +) +update aggregate_user au +set + track_count = nau.track_count, + playlist_count = nau.playlist_count, + album_count = nau.album_count, + follower_count = nau.follower_count, + following_count = nau.following_count, + repost_count = nau.repost_count, + track_save_count = nau.track_save_count, + dominant_genre = nau.dominant_genre, + dominant_genre_count = nau.dominant_genre_count +from new_aggregate_user nau +where au.user_id = nau.user_id + and ( + au.track_count != nau.track_count + or au.playlist_count != nau.playlist_count + or au.album_count != nau.album_count + or au.follower_count != nau.follower_count + or au.following_count != nau.following_count + or au.repost_count != nau.repost_count + or au.track_save_count != nau.track_save_count + or au.dominant_genre is distinct from nau.dominant_genre + or au.dominant_genre_count is distinct from nau.dominant_genre_count + ); +` + +const reconcileAggregateTrackQuery = ` +with track_saves as ( + select save_item_id, count(*) as save_count + from saves s + where s.is_current is true and s.is_delete is false and s.save_type = 'track' + group by save_item_id +), +track_reposts as ( + select repost_item_id, count(*) as repost_count + from reposts r + where r.is_current is true and r.is_delete is false and r.repost_type = 'track' + group by repost_item_id +), +track_comments as ( + select entity_id as comment_entity_id, count(*) as comment_count + from comments c + where c.is_delete is false and c.is_visible is true and c.entity_type = 'Track' + group by comment_entity_id +), +new_aggregate_track as ( + select + ap.track_id, + coalesce(ps.save_count, 0) as save_count, + coalesce(pr.repost_count, 0) as repost_count, + coalesce(pc.comment_count, 0) as comment_count + from aggregate_track ap + left join track_saves ps on ap.track_id = ps.save_item_id + left join track_reposts pr on ap.track_id = pr.repost_item_id + left join track_comments pc on ap.track_id = pc.comment_entity_id +) +update aggregate_track at +set + save_count = nat.save_count, + repost_count = nat.repost_count, + comment_count = nat.comment_count +from new_aggregate_track nat +where at.track_id = nat.track_id + and ( + at.save_count != nat.save_count + or at.repost_count != nat.repost_count + or at.comment_count != nat.comment_count + ); +` + +const reconcileAggregatePlaylistQuery = ` +with playlist_saves as ( + select save_item_id, count(*) as save_count + from saves s + where s.is_current is true and s.is_delete is false + and (s.save_type = 'playlist' or s.save_type = 'album') + group by save_item_id +), +playlist_reposts as ( + select repost_item_id, count(*) as repost_count + from reposts r + where r.is_current is true and r.is_delete is false + and (r.repost_type = 'playlist' or r.repost_type = 'album') + group by repost_item_id +), +new_aggregate_playlist as ( + select + ap.playlist_id, + coalesce(ps.save_count, 0) as save_count, + coalesce(pr.repost_count, 0) as repost_count + from aggregate_playlist ap + left join playlist_saves ps on ap.playlist_id = ps.save_item_id + left join playlist_reposts pr on ap.playlist_id = pr.repost_item_id +) +update aggregate_playlist ap +set + save_count = nap.save_count, + repost_count = nap.repost_count +from new_aggregate_playlist nap +where ap.playlist_id = nap.playlist_id + and (ap.save_count != nap.save_count or ap.repost_count != nap.repost_count); +` From 468212b839a5912b869f604b3798774e1a00e8e0 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Mon, 1 Jun 2026 21:29:34 -0700 Subject: [PATCH 2/3] chore(api): bump go-openaudio/etl to include in-place social upserts Packages OpenAudio/go-openaudio#331 (merged), which switches the etl indexer from demote-then-insert to upsert-in-place for reposts/saves/ follows/subscriptions. The delta-based aggregate triggers in this PR require that behavior to track is_delete transitions correctly. Co-Authored-By: Claude Opus 4.7 --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 00a4af3a..d217b54e 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( connectrpc.com/connect v1.18.1 github.com/AlecAivazis/survey/v2 v2.3.7 github.com/Doist/unfurlist v0.0.0-20250409100812-515f2735f8e5 - github.com/OpenAudio/go-openaudio v1.3.1-0.20260602021941-ea6c925d2455 - github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602021941-ea6c925d2455 + github.com/OpenAudio/go-openaudio v1.3.1-0.20260602042514-5ed068b34326 + github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602042514-5ed068b34326 github.com/aquasecurity/esquery v0.2.0 github.com/axiomhq/axiom-go v0.23.0 github.com/axiomhq/hyperloglog v0.2.5 diff --git a/go.sum b/go.sum index 803b361f..b6e7f0a7 100644 --- a/go.sum +++ b/go.sum @@ -20,10 +20,10 @@ github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63n github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= -github.com/OpenAudio/go-openaudio v1.3.1-0.20260602021941-ea6c925d2455 h1:O/gjjTOPwlbXVwgdYqPMt6gsAq65p5wUl/qXudws6L0= -github.com/OpenAudio/go-openaudio v1.3.1-0.20260602021941-ea6c925d2455/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs= -github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602021941-ea6c925d2455 h1:yhAeLKeuIxLb8EKfXV1Y2xNO3V9vgkbKO/NFIqTzD08= -github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602021941-ea6c925d2455/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= +github.com/OpenAudio/go-openaudio v1.3.1-0.20260602042514-5ed068b34326 h1:7pUQd5PwFFPltQ/2jRWOFllLTI0NZ7zeoOj/BBh9eUo= +github.com/OpenAudio/go-openaudio v1.3.1-0.20260602042514-5ed068b34326/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs= +github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602042514-5ed068b34326 h1:/HLzhNWnzRxFUOd+AtxyLUb23lEb/6LUbawOaRhtIOM= +github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602042514-5ed068b34326/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= From 3fac9fe2cae42054fcaa896c6d6acc9c7a4e5340 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Mon, 1 Jun 2026 21:38:48 -0700 Subject: [PATCH 3/3] test(api): fix flaky play-milestone seed ordering aggregate_monthly_plays was seeded in the randomized map-order phase of Seed(), so it raced the plays fixtures. When plays seeded first, the handle_play trigger created the aggregate_monthly_plays row and the explicit fixture insert then collided (aggregate_monthly_plays_pkey). Seed it in the deterministic entity phase, before plays, so the trigger upserts the seeded row instead. Co-Authored-By: Claude Opus 4.7 --- database/seed.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/database/seed.go b/database/seed.go index e3d5b27c..8563580d 100644 --- a/database/seed.go +++ b/database/seed.go @@ -824,8 +824,10 @@ func Seed(pool *pgxpool.Pool, fixtures FixtureMap) { // because map key iteration order is randomized... // explicitly do the "entity" tables first // so that data dependencies exist before attempting to do saves, follows, etc. - // (also do aggregates first so we can override the ones the entities autocreate) - entityTables := []string{"blocks", "aggregate_user", "aggregate_track", "aggregate_playlist", "users", "tracks", "playlists", "sol_token_account_balances", "chat", "chat_member", "chat_message", "chat_blast", "sol_user_balances", "chat_blocked_users", "chat_permissions"} + // (also do aggregates first so we can override the ones the entities autocreate; + // aggregate_monthly_plays must precede plays so the handle_play trigger upserts + // the seeded row instead of racing a duplicate insert against it) + entityTables := []string{"blocks", "aggregate_user", "aggregate_track", "aggregate_playlist", "users", "tracks", "playlists", "aggregate_monthly_plays", "sol_token_account_balances", "chat", "chat_member", "chat_message", "chat_blast", "sol_user_balances", "chat_blocked_users", "chat_permissions"} for _, tableName := range entityTables { if rows, ok := fixtures[tableName]; ok { SeedTable(pool, tableName, rows)