Skip to content

perf(indexer): size jobs pool, per-cadence challenges, throttle aggregates#889

Open
raymondjacobson wants to merge 2 commits into
mainfrom
api/p2-parity-job-tuning
Open

perf(indexer): size jobs pool, per-cadence challenges, throttle aggregates#889
raymondjacobson wants to merge 2 commits into
mainfrom
api/p2-parity-job-tuning

Conversation

@raymondjacobson
Copy link
Copy Markdown
Member

@raymondjacobson raymondjacobson commented Jun 1, 2026

Summary

The discovery indexer's parity jobs (the periodic jobs in indexer/indexer.go startParityJobs plus the AggregatesCalculator, which replaced Python's celery beat) were saturating the Postgres primary's IO and competing with the ETL block-indexing loop, causing block-diff to oscillate. This PR addresses three contributors.

1. Connection-pool sizing (indexer/indexer.go, NewIndexer)

The single shared write pool set no MaxConns, so pgx defaulted to max(4, runtime.NumCPU()). That pool is shared by ~9 parity jobs, so on a small box they queued for connections behind each other and serialized work that should overlap.

  • Old: implicit pgx default, max(4, NumCPU()).
  • New: pin a floor of MaxConns = 20. We only raise the floor (if connConfig.MaxConns < 20), so a DB URL that already requests more via pool_max_conns is respected. 20 is a conservative ceiling well under Postgres max_connections. Mirrors the existing explicit-MaxConns pattern in solana/indexer/solana_indexer.go.

2. Per-cadence challenge scheduling (jobs/index_challenges.go, indexer/indexer.go)

Previously every challenge processor ran on one shared tick of the umbrella job. That forced a single cadence onto a mix of cheap and expensive work, so a slow processor delayed the cheap real-time ones and all the DB work bunched into one burst. The first cut at fixing this just slowed the whole job (30s → 5m), but that regressed near-real-time challenge completion — the thing it was supposed to preserve.

New approach: each challengeGroup runs on its own goroutine and interval.

  • Checkpoint-incremental processors (track_upload, profile_completion, cosign, listen_streak, audio_matching buyer/seller, referrals, mobile_install, etc.) → 30s. These only scan rows changed since a per-processor checkpoint, so an idle tick is an index range scan (~free) and a just-earned challenge is picked up on the next tick.
  • Bounded full-scan processors (profile_verified, the play-count milestones, tastemaker, comment_pin, first_weekly_comment, remix_contest_winner) → 2m.
  • Weekly trending reward processors (tt/tut/tp, which only mint rows on Friday UTC) → 10m.

Play-count milestones (p2 requires p1 complete, p3 requires p2) share one group so they still run in order within a tick. A per-group goroutine is inherently serial, so the Ticker drops overlapping ticks and the old sync.Mutex/isRunning re-entrancy guard is no longer needed. Startup is staggered (1s per group) so the groups don't all fire at once.

Net effect: the real-time challenges stay real-time, while the heavy/weekly ones stop hot-looping.

3. AggregatesCalculator pacing (indexer/aggregates_calculator.go)

The previous for {} loop kicked off the next full pass (~10 min each) the instant the prior one returned — a continuous hot loop. Now paced to a 10m cadence between pass starts, matching apps' update_aggregates celery beat (src/app.py). If a pass overruns, the next starts immediately (no negative sleep), so we never fall behind. The work is a single serial loop that can't overlap itself, so no mutex is needed.

The other fast jobs (UserListeningHistory 5s, ListenStreakReminder 10s, HourlyPlayCounts/PrunePlays 30s, etc.) already have re-entrancy guards and show no evidence of overrun, so they're left alone.

The single biggest offender — the TrendingJob — is fixed in a separate PR (#890) and intentionally untouched here. Both PRs edit startParityJobs; minor merge conflict expected, whoever merges second rebases.

Test plan

  • go build ./...
  • go vet ./...
  • gofmt -l on changed files (clean)
  • go test ./jobs/... ./indexer/... (all packages pass)
  • Watch challenge-completion latency + indexer block-diff after deploy

🤖 Generated with Claude Code

The shared write pool used no explicit MaxConns, so pgx defaulted to
max(4, NumCPU()) and the ~9 parity jobs starved each other for
connections. Two jobs were also scheduled far more frequently than they
can finish, so they ran back-to-back and IO-starved the ETL block loop.

- Pin the parity-jobs write pool MaxConns floor to 20 (was an implicit
  4/NumCPU default), mirroring solana/indexer's explicit-MaxConns
  pattern; only raises the floor so a DB URL pool_max_conns is honored.
- IndexChallenges: 30s -> 5m tick (full reconcile observed ~330-360s).
  Re-entrancy guard already present in jobs/index_challenges.go.
- AggregatesCalculator: pace the previously unbounded `for {}` hot loop
  to a 10m cadence (matching apps' update_aggregates beat); serial loop
  can't overlap itself so no mutex needed.

The TrendingJob fix ships in a separate PR.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Replace the single shared tick (where one slow processor delayed the cheap
real-time ones and all DB work bunched into one burst) with per-group
scheduling. Each challengeGroup runs on its own goroutine and interval:

- checkpoint-incremental processors (track_upload, profile_completion,
  cosign, listen_streak, audio_matching, referrals, etc.) at 30s, so a
  just-earned challenge is picked up on the next tick — idle ticks are
  index range scans, ~free.
- bounded full-scan processors (profile_verified, play-count milestones,
  tastemaker, comment_pin, first_weekly_comment, remix_contest_winner) at 2m.
- weekly trending reward processors at 10m.

Play-count milestones (p2 requires p1 complete, p3 requires p2) share one
group so they still run in order within a tick. A per-group goroutine is
inherently serial, so the Ticker drops overlapping ticks and no re-entrancy
mutex is needed. Startup is staggered so groups don't all fire at once.

Supersedes the earlier 5m shared-tick change: that fixed the IO bunching by
slowing everything, regressing near-real-time challenge completion; this keeps
the real-time challenges real-time without the heavy ones hot-looping.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@raymondjacobson raymondjacobson changed the title perf(indexer): size jobs pool + guard/throttle overrunning parity jobs perf(indexer): size jobs pool, per-cadence challenges, throttle aggregates Jun 1, 2026
raymondjacobson added a commit that referenced this pull request Jun 1, 2026
…890)

## Summary

`TrendingJob` was the dominant source of IO starvation on the indexer.
It ran on a **10s schedule**, but each run took minutes and did two
expensive things on every cycle:

1. Two **non-concurrent** `REFRESH MATERIALIZED VIEW` calls
(`aggregate_interval_plays`, `trending_params`). A plain refresh takes
an `ACCESS EXCLUSIVE` lock for the duration of the (multi-minute,
plays-table-scanning) rebuild — blocking *every* reader, including the
block-indexing loop's downstream queries.
2. A full **DELETE-then-INSERT** of the ~16GB `track_trending_scores`
table (and the analogous playlist table), rewriting ~12.7M rows × 6
indexes per cycle.

On a 10s schedule with multi-minute runtimes, this ran effectively
continuously and IO-starved block indexing.

## Changes

- **Concurrent matview refresh.** Use `REFRESH MATERIALIZED VIEW
CONCURRENTLY`, with a graceful fallback to a blocking refresh when the
matview is unpopulated (fresh/test DB → SQLSTATE `0A000`) or the
required unique index isn't present yet (pre-migration window of a
rolling deploy → SQLSTATE `55000`). This avoids a hard ordering
dependency on the migration.
- **Migration `0214`** promotes the `track_id` indexes on
`aggregate_interval_plays` and `trending_params` to `UNIQUE` (a
prerequisite for `CONCURRENTLY`). Verified in prod that both matviews
are exactly one row per track (`COUNT(*) == COUNT(DISTINCT track_id)`),
so the unique constraint is valid. Statements are intentionally not
wrapped in a transaction so each index build autocommits.
- **Upsert + prune instead of delete + insert.** Scores are staged into
an `ON COMMIT DROP` temp table, then merged with `INSERT ... ON CONFLICT
(...) DO UPDATE ... WHERE col IS DISTINCT FROM EXCLUDED.col`
(skip-unchanged), followed by an anti-join `DELETE ... WHERE NOT EXISTS`
to prune rows that left the source set. This stops the per-cycle
full-table churn that bloated the table and its indexes.
- **Hourly cadence.** Schedule `TrendingJob` every `1h` instead of
`10s`, matching discovery's *effective* cadence: apps' celery beat
ticked `index_trending` every 10s but an internal gate
(`trending_refresh_seconds`, default `3600`) only recomputed once an
hour. The vendored ETL port had dropped that gate.

## Notes

- `sql/01_schema.sql` (the pg_dump used to build the test template) will
pick up the unique indexes on the next `make test-schema` regen. Until
then, the code's CONCURRENTLY→blocking fallback keeps tests green.
- **Overlaps with #889** on `indexer/indexer.go` — both edit
`startParityJobs`. Minor merge conflict expected; whoever merges second
rebases. #889 leaves `TrendingJob` untouched, so the two changes are
complementary.

## Test plan

- [x] `go test ./jobs/...` — existing `TestTrendingJob_PopulatesScores`
passes; new `TestTrendingJob_PrunesStaleRows` covers the anti-join prune
(seed track, run, delete track, re-run, assert rows pruned).
- [x] `go vet ./jobs/...` clean; touched files gofmt-clean.
- [ ] Watch indexer IO / block lag after deploy to confirm starvation is
resolved.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
raymondjacobson added a commit that referenced this pull request Jun 2, 2026
## Summary

The p1/p2/p3 play milestones (250/1k/10k plays, verified artists only)
each ran a non-incremental `GROUP BY` every tick — a Parallel Seq Scan
of ~485k tracks producing ~1,867 rows in **~49s, times three tiers**.
Measuring `pg_stat_statements`, this one processor was the dominant cost
of `IndexChallengesJob` and the reason a per-cadence "fast/medium/slow"
job taxonomy looked tempting (see #889). Fix the processor instead and
the single serial loop stays simple.

- Merge `NewPlayCount250/1000/10000Processor` into one
`PlayCountMilestonesProcessor` that checkpoints `plays.id` and
recomputes only the verified artists whose tracks were played since the
checkpoint (the `reconcileIncrementalUsers` dirty-set pattern), deriving
all three tiers in a single pass.
- Tier eligibility gates on the live count reaching the previous tier's
threshold (p2 ≥250, p3 ≥1000). A milestone completes exactly when the
count crosses its `step_count`, so this is equivalent to Python's
"previous milestone complete" check — and it removes the old cross-tick
cascade (p2 no longer has to wait a tick for p1 to commit).
- Migration `0215` seeds the new checkpoint to `max(plays.id)` so prod
starts "from now" rather than backfilling the whole `plays` table
(mirrors `0208`).
- `IndexChallengesJob` keeps its single serial loop — three
registrations collapse to one.

`aggregate_monthly_plays` has no monotonic column (updated in place), so
`plays.id` is the high-water mark; the dirty scan joins through
`tracks`/`users` to surface only verified, non-deactivated artists.

## Measurements (read-only `EXPLAIN ANALYZE` on the write primary)

| | before | after |
|---|---|---|
| dirty scan (200k-play catch-up window, LIMIT 5000) | — | **~1.4s**
(far less per normal 30s tick) |
| recompute (50 sampled artists — larger than a typical dirty set) | — |
**~113ms** |
| per tick | 48.7s × 3 tiers, every tick | proportional to new plays |

## Test plan

- [x] `go build ./...`, `go vet ./jobs/...`
- [x] `go test ./jobs/challenges/` — updated
`play_count_milestones_test.go` to seed `plays` rows (drive the dirty
scan) and assert the merged single-pass tier gating: verified-only,
p1→p2→p3 thresholds in one run
- [ ] After deploy: confirm `IndexChallengesJob` "Reconciled challenges"
duration drops and milestone `user_challenges` rows keep landing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant