Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions database/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,8 +824,10 @@ func Seed(pool *pgxpool.Pool, fixtures FixtureMap) {
// because map key iteration order is randomized...
// explicitly do the "entity" tables first
// so that data dependencies exist before attempting to do saves, follows, etc.
// (also do aggregates first so we can override the ones the entities autocreate)
entityTables := []string{"blocks", "aggregate_user", "aggregate_track", "aggregate_playlist", "users", "tracks", "playlists", "sol_token_account_balances", "chat", "chat_member", "chat_message", "chat_blast", "sol_user_balances", "chat_blocked_users", "chat_permissions"}
// (also do aggregates first so we can override the ones the entities autocreate;
// aggregate_monthly_plays must precede plays so the handle_play trigger upserts
// the seeded row instead of racing a duplicate insert against it)
entityTables := []string{"blocks", "aggregate_user", "aggregate_track", "aggregate_playlist", "users", "tracks", "playlists", "aggregate_monthly_plays", "sol_token_account_balances", "chat", "chat_member", "chat_message", "chat_blast", "sol_user_balances", "chat_blocked_users", "chat_permissions"}
for _, tableName := range entityTables {
if rows, ok := fixtures[tableName]; ok {
SeedTable(pool, tableName, rows)
Expand Down
20 changes: 7 additions & 13 deletions ddl/functions/handle_follow.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ begin
insert into aggregate_user (user_id) values (new.followee_user_id) on conflict do nothing;
insert into aggregate_user (user_id) values (new.follower_user_id) on conflict do nothing;

-- increment or decrement?
if new.is_delete then
delta := -1;
else
delta := 1;
end if;
-- transition-aware delta (active = not is_delete); 0 on no-op re-delivery
delta := (case when new.is_delete then 0 else 1 end)
- (case when tg_op = 'UPDATE' and old.is_delete is false then 1 else 0 end);

update aggregate_user
set following_count = following_count + delta
Expand Down Expand Up @@ -80,10 +77,7 @@ exception
end;
$$ language plpgsql;

do $$ begin
create trigger on_follow
after insert on follows
for each row execute procedure handle_follow();
exception
when others then null;
end $$;
drop trigger if exists on_follow on follows;
create trigger on_follow
after insert or update on follows
for each row execute procedure handle_follow();
52 changes: 12 additions & 40 deletions ddl/functions/handle_repost.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,20 @@ begin
where ap.playlist_id = new.repost_item_id;
end if;

-- increment or decrement?
if new.is_delete then
delta := -1;
else
delta := 1;
end if;
-- transition-aware delta (active = not is_delete); 0 on no-op re-delivery
delta := (case when new.is_delete then 0 else 1 end)
- (case when tg_op = 'UPDATE' and old.is_delete is false then 1 else 0 end);

-- update agg user
update aggregate_user
set repost_count = (
select count(*)
from reposts r
where r.is_current is true
and r.is_delete is false
and r.user_id = new.user_id
)
update aggregate_user
set repost_count = repost_count + delta
where user_id = new.user_id;

-- update agg track or playlist
if new.repost_type = 'track' then
milestone_name := 'TRACK_REPOST_COUNT';
update aggregate_track
set repost_count = (
select count(*)
from reposts r
where
r.is_current is true
and r.is_delete is false
and r.repost_type = new.repost_type
and r.repost_item_id = new.repost_item_id
)
update aggregate_track
set repost_count = repost_count + delta
where track_id = new.repost_item_id
returning repost_count into new_val;
if new.is_delete IS FALSE then
Expand All @@ -72,15 +55,7 @@ begin
else
milestone_name := 'PLAYLIST_REPOST_COUNT';
update aggregate_playlist
set repost_count = (
select count(*)
from reposts r
where
r.is_current is true
and r.is_delete is false
and r.repost_type = new.repost_type
and r.repost_item_id = new.repost_item_id
)
set repost_count = repost_count + delta
where playlist_id = new.repost_item_id
returning repost_count into new_val;

Expand Down Expand Up @@ -243,10 +218,7 @@ end;
$$ language plpgsql;


do $$ begin
create trigger on_repost
after insert on reposts
for each row execute procedure handle_repost();
exception
when others then null;
end $$;
drop trigger if exists on_repost on reposts;
create trigger on_repost
after insert or update on reposts
for each row execute procedure handle_repost();
55 changes: 13 additions & 42 deletions ddl/functions/handle_save.sql
Original file line number Diff line number Diff line change
Expand Up @@ -63,58 +63,32 @@ begin
) into is_purchased;
end if;

-- increment or decrement?
if new.is_delete then
delta := -1;
else
delta := 1;
end if;
-- transition-aware delta (active = not is_delete); 0 on no-op re-delivery
delta := (case when new.is_delete then 0 else 1 end)
- (case when tg_op = 'UPDATE' and old.is_delete is false then 1 else 0 end);

-- update agg track or playlist
if new.save_type = 'track' then
milestone_name := 'TRACK_SAVE_COUNT';

update aggregate_track
set save_count = (
select count(*)
from saves r
where
r.is_current is true
and r.is_delete is false
and r.save_type = new.save_type
and r.save_item_id = new.save_item_id
)
update aggregate_track
set save_count = save_count + delta
where track_id = new.save_item_id
returning save_count into new_val;

-- update agg user
update aggregate_user
set track_save_count = (
select count(*)
from saves r
where r.is_current is true
and r.is_delete is false
and r.user_id = new.user_id
and r.save_type = new.save_type
)
update aggregate_user
set track_save_count = track_save_count + delta
where user_id = new.user_id;

if new.is_delete IS FALSE then
select tracks.owner_id, tracks.remix_of into owner_user_id, track_remix_of from tracks where is_current and track_id = new.save_item_id;
end if;
else
milestone_name := 'PLAYLIST_SAVE_COUNT';

update aggregate_playlist
set save_count = (
select count(*)
from saves r
where
r.is_current is true
and r.is_delete is false
and r.save_type = new.save_type
and r.save_item_id = new.save_item_id
)
set save_count = save_count + delta
where playlist_id = new.save_item_id
returning save_count into new_val;

Expand Down Expand Up @@ -284,10 +258,7 @@ end;
$$ language plpgsql;


do $$ begin
create trigger on_save
after insert on saves
for each row execute procedure handle_save();
exception
when others then null;
end $$;
drop trigger if exists on_save on saves;
create trigger on_save
after insert or update on saves
for each row execute procedure handle_save();
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
connectrpc.com/connect v1.18.1
github.com/AlecAivazis/survey/v2 v2.3.7
github.com/Doist/unfurlist v0.0.0-20250409100812-515f2735f8e5
github.com/OpenAudio/go-openaudio v1.3.1-0.20260602021941-ea6c925d2455
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602021941-ea6c925d2455
github.com/OpenAudio/go-openaudio v1.3.1-0.20260602042514-5ed068b34326
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602042514-5ed068b34326
github.com/aquasecurity/esquery v0.2.0
github.com/axiomhq/axiom-go v0.23.0
github.com/axiomhq/hyperloglog v0.2.5
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63n
github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260602021941-ea6c925d2455 h1:O/gjjTOPwlbXVwgdYqPMt6gsAq65p5wUl/qXudws6L0=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260602021941-ea6c925d2455/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602021941-ea6c925d2455 h1:yhAeLKeuIxLb8EKfXV1Y2xNO3V9vgkbKO/NFIqTzD08=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602021941-ea6c925d2455/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260602042514-5ed068b34326 h1:7pUQd5PwFFPltQ/2jRWOFllLTI0NZ7zeoOj/BBh9eUo=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260602042514-5ed068b34326/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602042514-5ed068b34326 h1:/HLzhNWnzRxFUOd+AtxyLUb23lEb/6LUbawOaRhtIOM=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602042514-5ed068b34326/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8=
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=
Expand Down
7 changes: 7 additions & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) {
jobs.NewUpdateDelistStatusesJob(ci.Config, ci.pool).
ScheduleEvery(ctx, 5*time.Minute)

// Drift backstop: recompute aggregate counts from source tables to correct
// any hot-path trigger delta divergence. 10m matches discovery's
// update_aggregates celery cadence. Column-disjoint from the score-only
// AggregatesCalculator, so they can run concurrently.
jobs.NewReconcileAggregatesJob(ci.Config, ci.pool).
ScheduleEvery(ctx, 10*time.Minute)

// Reconcile derived challenge state from source tables. Per-challenge
// scanners live in api/jobs/challenges/.
jobs.NewIndexChallengesJob(ci.Config, ci.pool).
Expand Down
Loading
Loading