diff --git a/go.mod b/go.mod index d217b54e..6b960b67 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( connectrpc.com/connect v1.18.1 github.com/AlecAivazis/survey/v2 v2.3.7 github.com/Doist/unfurlist v0.0.0-20250409100812-515f2735f8e5 - github.com/OpenAudio/go-openaudio v1.3.1-0.20260602042514-5ed068b34326 - github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602042514-5ed068b34326 + github.com/OpenAudio/go-openaudio v1.3.1-0.20260602052835-dca79f030639 + github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602052835-dca79f030639 github.com/aquasecurity/esquery v0.2.0 github.com/axiomhq/axiom-go v0.23.0 github.com/axiomhq/hyperloglog v0.2.5 diff --git a/go.sum b/go.sum index b6e7f0a7..7ceb7123 100644 --- a/go.sum +++ b/go.sum @@ -20,10 +20,10 @@ github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63n github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= -github.com/OpenAudio/go-openaudio v1.3.1-0.20260602042514-5ed068b34326 h1:7pUQd5PwFFPltQ/2jRWOFllLTI0NZ7zeoOj/BBh9eUo= -github.com/OpenAudio/go-openaudio v1.3.1-0.20260602042514-5ed068b34326/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs= -github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602042514-5ed068b34326 h1:/HLzhNWnzRxFUOd+AtxyLUb23lEb/6LUbawOaRhtIOM= -github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602042514-5ed068b34326/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= +github.com/OpenAudio/go-openaudio v1.3.1-0.20260602052835-dca79f030639 h1:vhhEm9AW1KKfZiR1+pXdLwbWKaEa6MBNTRv8xvWsd9c= +github.com/OpenAudio/go-openaudio v1.3.1-0.20260602052835-dca79f030639/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs= +github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602052835-dca79f030639 h1:lWFDHYYA4lXQbt8bbRlJf9vVRyw/ZCrNFzkugbq+oVc= +github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260602052835-dca79f030639/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= diff --git a/indexer/indexer.go b/indexer/indexer.go index 0291434e..0216c471 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -180,6 +180,12 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) { jobs.NewRemixContestNotificationsJob(ci.Config, ci.pool). ScheduleEvery(ctx, 30*time.Second) + + // Backfill missing track bpm / musical_key from content-node audio + // analyses. Mirrors apps' repair_audio_analyses celery task, whose beat + // schedule ran every 3 minutes. Needs the SDK for content-node discovery. + jobs.NewRepairAudioAnalysesJob(ci.Config, ci.pool, ci.openAudioSDK). + ScheduleEvery(ctx, 3*time.Minute) } func (ci *CoreIndexer) Close() { diff --git a/jobs/musical_key.go b/jobs/musical_key.go new file mode 100644 index 00000000..8cf1d33c --- /dev/null +++ b/jobs/musical_key.go @@ -0,0 +1,26 @@ +package jobs + +// validMusicalKeys mirrors the apps indexer's MusicalKey enum +// (discovery-provider src/tasks/metadata.py). Note: flats only, no sharps — +// "C# minor" is NOT a valid key; the equivalent is "D flat minor". +var validMusicalKeys = map[string]bool{ + "A major": true, "A minor": true, + "B flat major": true, "B flat minor": true, + "B major": true, "B minor": true, + "C major": true, "C minor": true, + "D flat major": true, "D flat minor": true, + "D major": true, "D minor": true, + "E flat major": true, "E flat minor": true, + "E major": true, "E minor": true, + "F major": true, "F minor": true, + "G flat major": true, "G flat minor": true, + "G major": true, "G minor": true, + "A flat major": true, "A flat minor": true, + "Silence": true, +} + +// isValidMusicalKey reports whether s is one of the recognized musical keys. +// Mirrors apps' is_valid_musical_key (src/tasks/metadata.py). +func isValidMusicalKey(s string) bool { + return validMusicalKeys[s] +} diff --git a/jobs/repair_audio_analyses.go b/jobs/repair_audio_analyses.go new file mode 100644 index 00000000..ec64524a --- /dev/null +++ b/jobs/repair_audio_analyses.go @@ -0,0 +1,399 @@ +package jobs + +import ( + "context" + "encoding/json" + "fmt" + "io" + "math/rand" + "net/http" + "strings" + "sync" + "time" + + "api.audius.co/config" + "api.audius.co/database" + "api.audius.co/logging" + connect "connectrpc.com/connect" + ethv1 "github.com/OpenAudio/go-openaudio/pkg/api/eth/v1" + "github.com/OpenAudio/go-openaudio/pkg/sdk" + "go.uber.org/zap" +) + +// RepairAudioAnalysesJob backfills missing track bpm / musical_key by polling +// content nodes for their stored audio-analysis results. It is a direct port +// of apps' src/tasks/repair_audio_analyses.py celery task. +// +// The entity-manager indexer only records bpm / musical_key when the uploader +// supplies them in track metadata (or marks them custom). Tracks whose +// analysis finished after indexing — or which were indexed before the indexer +// learned to persist these fields — are left with NULL bpm / musical_key. This +// recurring job reconciles those gaps from the authoritative source: the +// content node that performed the analysis. +// +// Each pass: +// 1. Selects up to repairBatchSize current tracks missing a non-custom bpm or +// musical_key, with fewer than 3 prior analysis errors and a streamable +// track_cid (newest first). +// 2. Picks up to repairMaxNodes random registered content nodes. +// 3. For each track, queries a node for the analysis (modern uploads endpoint +// when audio_upload_id is set, else the legacy blob-analysis endpoint), +// falling back to the next node on transport error. +// 4. Fills in any missing+valid bpm / musical_key and syncs the stored error +// count, committing per track. +type RepairAudioAnalysesJob struct { + pool database.DbPool + logger *zap.Logger + sdk *sdk.OpenAudioSDK + httpClient *http.Client + + mutex sync.Mutex + isRunning bool +} + +const ( + // repairBatchSize mirrors apps' BATCH_SIZE. + repairBatchSize = 1000 + // repairMaxNodes mirrors apps' random.sample(endpoints, min(5, ...)). + repairMaxNodes = 5 + // repairNodeTimeout mirrors apps' requests.get(..., timeout=5). + repairNodeTimeout = 5 * time.Second + // repairMaxErrorCount mirrors apps' audio_analysis_error_count < 3 filter. + repairMaxErrorCount = 3 +) + +func NewRepairAudioAnalysesJob(cfg config.Config, pool database.DbPool, oaSDK *sdk.OpenAudioSDK) *RepairAudioAnalysesJob { + return &RepairAudioAnalysesJob{ + pool: pool, + logger: logging.NewZapLogger(cfg).Named("RepairAudioAnalysesJob"), + sdk: oaSDK, + httpClient: &http.Client{Timeout: repairNodeTimeout}, + } +} + +// ScheduleEvery runs the job every `interval` until the context is cancelled. +func (j *RepairAudioAnalysesJob) ScheduleEvery(ctx context.Context, interval time.Duration) *RepairAudioAnalysesJob { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + j.Run(ctx) + case <-ctx.Done(): + j.logger.Info("Job shutting down") + return + } + } + }() + return j +} + +// Run executes the job once. +func (j *RepairAudioAnalysesJob) Run(ctx context.Context) { + if err := j.run(ctx); err != nil { + j.logger.Error("Job run failed", zap.Error(err)) + } +} + +func (j *RepairAudioAnalysesJob) run(ctx context.Context) error { + j.mutex.Lock() + if j.isRunning { + j.mutex.Unlock() + // apps guards with a redis lock + non-blocking acquire; the in-process + // mutex is the equivalent here (single indexer process). + return fmt.Errorf("job is already running") + } + j.isRunning = true + j.mutex.Unlock() + defer func() { + j.mutex.Lock() + j.isRunning = false + j.mutex.Unlock() + }() + + tracks, err := j.queryTracks(ctx) + if err != nil { + return fmt.Errorf("query tracks: %w", err) + } + if len(tracks) == 0 { + return nil + } + + nodes, err := j.selectContentNodes(ctx) + if err != nil { + return fmt.Errorf("select content nodes: %w", err) + } + if len(nodes) == 0 { + j.logger.Warn("no content nodes available; skipping pass") + return nil + } + + updated := 0 + for _, t := range tracks { + ok, err := j.repairTrack(ctx, t, nodes) + if err != nil { + j.logger.Error("repairing track failed", + zap.Int64("track_id", t.TrackID), zap.Error(err)) + continue + } + if ok { + updated++ + } + } + + j.logger.Info("Repaired audio analyses", + zap.Int("candidates", len(tracks)), + zap.Int("updated", updated), + zap.Int64("last_track_id", tracks[len(tracks)-1].TrackID)) + return nil +} + +type repairTrack struct { + TrackID int64 + TrackCID string + AudioUploadID string + Bpm *float64 + IsCustomBpm bool + MusicalKey *string + IsCustomMusicalKey bool + ErrorCount int +} + +// queryTracks mirrors apps' query_tracks: current tracks missing a non-custom +// bpm or musical_key, error count < 3, streamable (track_cid not null), and not +// a podcast/audiobook, newest first. +func (j *RepairAudioAnalysesJob) queryTracks(ctx context.Context) ([]repairTrack, error) { + rows, err := j.pool.Query(ctx, ` + SELECT track_id, track_cid, COALESCE(audio_upload_id, ''), + bpm, COALESCE(is_custom_bpm, false), + musical_key, COALESCE(is_custom_musical_key, false), + audio_analysis_error_count + FROM tracks + WHERE is_current = true + AND ( + (musical_key IS NULL AND is_custom_musical_key = false) + OR (bpm IS NULL AND is_custom_bpm = false) + ) + AND audio_analysis_error_count < $1 + AND track_cid IS NOT NULL + AND genre NOT IN ('Podcasts', 'Podcast', 'Audiobooks') + ORDER BY created_at DESC + LIMIT $2 + `, repairMaxErrorCount, repairBatchSize) + if err != nil { + return nil, err + } + defer rows.Close() + + var out []repairTrack + for rows.Next() { + var t repairTrack + if err := rows.Scan( + &t.TrackID, &t.TrackCID, &t.AudioUploadID, + &t.Bpm, &t.IsCustomBpm, + &t.MusicalKey, &t.IsCustomMusicalKey, + &t.ErrorCount, + ); err != nil { + return nil, err + } + out = append(out, t) + } + return out, rows.Err() +} + +// selectContentNodes mirrors apps' select_content_nodes: take up to +// repairMaxNodes random registered content-node endpoints (lowercased, trailing +// slash trimmed). apps samples from redis-cached *healthy* nodes; we don't have +// that cache here, so we sample from all registered content nodes and rely on +// the per-request timeout + next-node fallback to skip unhealthy ones. +func (j *RepairAudioAnalysesJob) selectContentNodes(ctx context.Context) ([]string, error) { + resp, err := j.sdk.Eth.GetRegisteredEndpoints(ctx, connect.NewRequest(ðv1.GetRegisteredEndpointsRequest{})) + if err != nil { + return nil, err + } + if resp == nil || resp.Msg == nil { + return nil, fmt.Errorf("GetRegisteredEndpoints returned nil response") + } + + var endpoints []string + for _, node := range resp.Msg.Endpoints { + // Content nodes serve the audio-analysis endpoints (mediorum storage). + if node.ServiceType != "content-node" { + continue + } + ep := strings.TrimRight(strings.ToLower(strings.TrimSpace(node.Endpoint)), "/") + if ep != "" { + endpoints = append(endpoints, ep) + } + } + + rand.Shuffle(len(endpoints), func(a, b int) { + endpoints[a], endpoints[b] = endpoints[b], endpoints[a] + }) + if len(endpoints) > repairMaxNodes { + endpoints = endpoints[:repairMaxNodes] + } + return endpoints, nil +} + +// analysisResult is the shape served by mediorum for both the modern uploads +// endpoint and the legacy blob-analysis endpoint. The legacy python content +// node used capitalized keys (Key/BPM); we accept both for parity. +type analysisResult struct { + BPM float64 `json:"bpm"` + BPMCap float64 `json:"BPM"` + Key string `json:"key"` + KeyCap string `json:"Key"` +} + +func (a analysisResult) bpm() float64 { + if a.BPM != 0 { + return a.BPM + } + return a.BPMCap +} + +func (a analysisResult) key() string { + if a.Key != "" { + return a.Key + } + return a.KeyCap +} + +// modernAnalysis matches mediorum's /uploads/:id payload. +type modernAnalysis struct { + Results *analysisResult `json:"audio_analysis_results"` + ErrorCount int `json:"audio_analysis_error_count"` +} + +// legacyAnalysis matches mediorum's /tracks/legacy/:cid/analysis payload. +type legacyAnalysis struct { + Results *analysisResult `json:"results"` + ErrorCount int `json:"error_count"` +} + +// repairTrack queries content nodes for one track's analysis and applies any +// missing+valid bpm / musical_key plus the latest error count. Mirrors the +// per-track body of apps' repair(). Returns true when the track was updated. +func (j *RepairAudioAnalysesJob) repairTrack(ctx context.Context, t repairTrack, nodes []string) (bool, error) { + if t.TrackCID == "" { + // Only analyze streamable tracks. + return false, nil + } + legacy := t.AudioUploadID == "" + + for _, node := range nodes { + var endpoint string + if legacy { + endpoint = fmt.Sprintf("%s/tracks/legacy/%s/analysis", node, t.TrackCID) + } else { + endpoint = fmt.Sprintf("%s/uploads/%s", node, t.AudioUploadID) + } + + result, errorCount, ok := j.fetchAnalysis(ctx, endpoint, legacy) + if !ok { + // Transport/non-2xx error: fall back to the next node. + continue + } + + var key string + var bpm float64 + if result != nil { + key = result.key() + bpm = result.bpm() + } + + // Build the update: fill missing+valid fields and sync error count. + setMusicalKey := key != "" && t.MusicalKey == nil && !t.IsCustomMusicalKey && isValidMusicalKey(key) + setBpm := bpm != 0 && t.Bpm == nil && !t.IsCustomBpm && isValidBpm(bpm) + setErrorCount := errorCount != t.ErrorCount + + if setMusicalKey || setBpm || setErrorCount { + if err := j.applyUpdate(ctx, t.TrackID, setMusicalKey, key, setBpm, bpm, setErrorCount, errorCount); err != nil { + return false, fmt.Errorf("update track %d: %w", t.TrackID, err) + } + } + + if errorCount >= repairMaxErrorCount { + j.logger.Warn("track failed audio analysis >= 3 times", + zap.Int64("track_id", t.TrackID), + zap.String("track_cid", t.TrackCID), + zap.String("audio_upload_id", t.AudioUploadID)) + } + + // apps breaks after the first node that responds (success or empty). + return setMusicalKey || setBpm || setErrorCount, nil + } + + return false, nil +} + +// fetchAnalysis GETs the endpoint and parses the analysis. ok=false signals a +// transport/non-2xx error (caller should try the next node); ok=true with a nil +// result means the node responded but had no analysis yet. +func (j *RepairAudioAnalysesJob) fetchAnalysis(ctx context.Context, endpoint string, legacy bool) (result *analysisResult, errorCount int, ok bool) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) + if err != nil { + return nil, 0, false + } + resp, err := j.httpClient.Do(req) + if err != nil { + return nil, 0, false + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + io.Copy(io.Discard, io.LimitReader(resp.Body, 512)) + return nil, 0, false + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, 0, false + } + + if legacy { + var parsed legacyAnalysis + if err := json.Unmarshal(body, &parsed); err != nil { + return nil, 0, false + } + return parsed.Results, parsed.ErrorCount, true + } + var parsed modernAnalysis + if err := json.Unmarshal(body, &parsed); err != nil { + return nil, 0, false + } + return parsed.Results, parsed.ErrorCount, true +} + +// applyUpdate writes only the fields that changed, mirroring apps' selective +// setattr + commit. Each track commits independently. +func (j *RepairAudioAnalysesJob) applyUpdate(ctx context.Context, trackID int64, setKey bool, key string, setBpm bool, bpm float64, setErr bool, errorCount int) error { + sets := make([]string, 0, 3) + args := make([]any, 0, 4) + args = append(args, trackID) + if setKey { + args = append(args, key) + sets = append(sets, fmt.Sprintf("musical_key = $%d", len(args))) + } + if setBpm { + args = append(args, bpm) + sets = append(sets, fmt.Sprintf("bpm = $%d", len(args))) + } + if setErr { + args = append(args, errorCount) + sets = append(sets, fmt.Sprintf("audio_analysis_error_count = $%d", len(args))) + } + if len(sets) == 0 { + return nil + } + sql := fmt.Sprintf( + "UPDATE tracks SET %s WHERE track_id = $1 AND is_current = true", + strings.Join(sets, ", ")) + _, err := j.pool.Exec(ctx, sql, args...) + return err +} + +// isValidBpm mirrors apps' valid_bpm: a float strictly between 0 and 999. +func isValidBpm(bpm float64) bool { + return bpm > 0 && bpm < 999 +} diff --git a/jobs/repair_audio_analyses_test.go b/jobs/repair_audio_analyses_test.go new file mode 100644 index 00000000..10a386fd --- /dev/null +++ b/jobs/repair_audio_analyses_test.go @@ -0,0 +1,127 @@ +package jobs + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestIsValidBpm(t *testing.T) { + cases := []struct { + bpm float64 + want bool + }{ + {0, false}, + {-1, false}, + {0.5, true}, + {128, true}, + {998.9, true}, + {999, false}, + {1000, false}, + } + for _, c := range cases { + if got := isValidBpm(c.bpm); got != c.want { + t.Errorf("isValidBpm(%v) = %v, want %v", c.bpm, got, c.want) + } + } +} + +func TestIsValidMusicalKey(t *testing.T) { + valid := []string{"A major", "D flat minor", "Silence", "G flat major"} + for _, k := range valid { + if !isValidMusicalKey(k) { + t.Errorf("isValidMusicalKey(%q) = false, want true", k) + } + } + // Sharps are not part of the enum (flats only). + invalid := []string{"", "C# minor", "H sharp", "D# major", "a major"} + for _, k := range invalid { + if isValidMusicalKey(k) { + t.Errorf("isValidMusicalKey(%q) = true, want false", k) + } + } +} + +func TestAnalysisResultCaseFallback(t *testing.T) { + // Modern mediorum serves lowercase keys. + lower := analysisResult{BPM: 120, Key: "A major"} + if lower.bpm() != 120 || lower.key() != "A major" { + t.Errorf("lowercase: bpm=%v key=%q", lower.bpm(), lower.key()) + } + // Legacy python content node served capitalized keys. + upper := analysisResult{BPMCap: 90, KeyCap: "C minor"} + if upper.bpm() != 90 || upper.key() != "C minor" { + t.Errorf("uppercase: bpm=%v key=%q", upper.bpm(), upper.key()) + } +} + +func TestFetchAnalysisModern(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"audio_analysis_results":{"bpm":128.5,"key":"D flat minor"},"audio_analysis_error_count":1}`)) + })) + defer srv.Close() + + j := &RepairAudioAnalysesJob{httpClient: &http.Client{Timeout: 2 * time.Second}} + result, errorCount, ok := j.fetchAnalysis(context.Background(), srv.URL+"/uploads/up-1", false) + if !ok { + t.Fatal("expected ok=true") + } + if result == nil || result.bpm() != 128.5 || result.key() != "D flat minor" { + t.Fatalf("unexpected result: %+v", result) + } + if errorCount != 1 { + t.Errorf("errorCount = %d, want 1", errorCount) + } +} + +func TestFetchAnalysisLegacy(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Legacy payload uses "results" + "error_count" and capitalized fields. + w.Write([]byte(`{"results":{"BPM":95,"Key":"C minor"},"error_count":2}`)) + })) + defer srv.Close() + + j := &RepairAudioAnalysesJob{httpClient: &http.Client{Timeout: 2 * time.Second}} + result, errorCount, ok := j.fetchAnalysis(context.Background(), srv.URL+"/tracks/legacy/Qm123/analysis", true) + if !ok { + t.Fatal("expected ok=true") + } + if result == nil || result.bpm() != 95 || result.key() != "C minor" { + t.Fatalf("unexpected result: %+v", result) + } + if errorCount != 2 { + t.Errorf("errorCount = %d, want 2", errorCount) + } +} + +func TestFetchAnalysisNon2xxIsNotOk(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "not found", http.StatusNotFound) + })) + defer srv.Close() + + j := &RepairAudioAnalysesJob{httpClient: &http.Client{Timeout: 2 * time.Second}} + _, _, ok := j.fetchAnalysis(context.Background(), srv.URL+"/uploads/missing", false) + if ok { + t.Error("expected ok=false for non-2xx response") + } +} + +func TestFetchAnalysisEmptyResultsOkButNil(t *testing.T) { + // Node responded but has no analysis yet: ok=true, result=nil. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"audio_analysis_error_count":0}`)) + })) + defer srv.Close() + + j := &RepairAudioAnalysesJob{httpClient: &http.Client{Timeout: 2 * time.Second}} + result, _, ok := j.fetchAnalysis(context.Background(), srv.URL+"/uploads/up-2", false) + if !ok { + t.Fatal("expected ok=true") + } + if result != nil { + t.Errorf("expected nil result, got %+v", result) + } +}