From 17fa061869694c2fc62ce86a312ebb6f0562b5c7 Mon Sep 17 00:00:00 2001 From: Kevin Heneveld <1192102+kevinheneveld@users.noreply.github.com> Date: Sun, 31 May 2026 16:54:33 -0800 Subject: [PATCH 1/2] fix(downloads): purge old processing jobs via a hosted cleanup worker IDownloadProcessingJobService.CleanupOldJobsAsync existed but had no caller, so the DownloadProcessingJobs table grew unbounded on long-running instances. Every queue-snapshot reconciliation queries this table (pending/all job download IDs for completed DDL items), so the backlog also added steady per-snapshot cost. Add DownloadProcessingJobCleanupService, a hosted BackgroundService that purges terminal (Completed/Failed) jobs older than a 7-day retention window shortly after startup and then daily. Retention is a constant to keep this change migration-free; making it configurable is a follow-up. Tests: terminal jobs past retention are removed while recent and non-terminal jobs are retained; no-op when nothing is eligible; service is registered as a hosted service. Co-Authored-By: Claude Opus 4.8 --- CHANGELOG.md | 1 + .../DownloadProcessingJobCleanupService.cs | 108 ++++++++++++++++++ .../HostedServiceRegistrationExtensions.cs | 4 + tests/Builders/ServiceCollectionBuilder.cs | 1 + .../HostedServicesRegistrationTests.cs | 1 + ...ownloadProcessingJobCleanupServiceTests.cs | 78 +++++++++++++ 6 files changed, 193 insertions(+) create mode 100644 listenarr.application/Downloads/DownloadProcessingJobCleanupService.cs create mode 100644 tests/Features/Application/Downloads/DownloadProcessingJobCleanupServiceTests.cs diff --git a/CHANGELOG.md b/CHANGELOG.md index 982b7c6df..cf0344534 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Authentication settings: admin provisioning failures no longer silently let the auth-required toggle proceed.** `ConfigurationService.SaveApplicationSettingsAsync` previously caught any exception from `CreateUserAsync` / `UpdatePasswordAsync`, logged it, and returned successfully — so when admin credentials were supplied but the user-service rejected them (password policy violation, repo I/O error, concurrent-write race), `SettingsView.saveSettings()` would still go on to persist `AuthenticationRequired=true` on its second request. The result was an instance that required login but had no working admin account — exactly the lockout shape the credential-visibility fix below was meant to prevent. The catch now re-throws the failure so the caller aborts before the auth-toggle write. The settings row itself is still saved before the admin block (non-admin changes like notification triggers and webhooks shouldn't disappear because admin provisioning failed), and the no-credentials path remains an unchanged silent skip. - **Authentication settings: corrected misleading description on the "Enable login screen" toggle.** Previously said *"Changes here are local and will not modify server files — edit config/config.json on the host to persist"*, which was demonstrably wrong: `SettingsView` actually writes `authenticationRequired` back to the server's startup config on save. The description now accurately states the toggle persists, and prompts the user to set admin credentials in the same save. - **Authentication settings: admin credential fields are always visible.** Previously the *Admin Account Management* row was gated by `v-if="authEnabledComputed"` in `AuthenticationSection.vue`, which meant the only way to surface the username/password inputs was to first toggle on the login screen. If a user enabled `AuthenticationRequired` via `config.json` on the host (e.g., for the very first time) and then opened settings, the toggle reflected the server state (on), but if they instead opened settings *with auth still off*, the fields were hidden — and once they ticked the toggle and saved, the login screen activated immediately on the next page load, locking them out before they could create a user. The fields now render unconditionally so credentials can be configured before or after enabling auth. Help text and the password placeholder were updated to reflect the create-or-update semantics (blank password = keep existing). +- **Download processing jobs grew unbounded:** `IDownloadProcessingJobService.CleanupOldJobsAsync` existed but was never invoked, so the `DownloadProcessingJobs` table accumulated completed/failed rows indefinitely on long-running instances — inflating every queue-snapshot reconciliation that queries it. A new `DownloadProcessingJobCleanupService` hosted worker now purges terminal jobs older than the retention window (7 days) shortly after startup and then daily. ## [0.2.71] - 2026-04-17 diff --git a/listenarr.application/Downloads/DownloadProcessingJobCleanupService.cs b/listenarr.application/Downloads/DownloadProcessingJobCleanupService.cs new file mode 100644 index 000000000..e709d5272 --- /dev/null +++ b/listenarr.application/Downloads/DownloadProcessingJobCleanupService.cs @@ -0,0 +1,108 @@ +/* + * Listenarr - Audiobook Management System + * Copyright (C) 2024-2026 Listenarr Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +using Listenarr.Application.Interfaces; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Listenarr.Application.Downloads +{ + /// + /// Background service that periodically purges old completed/failed + /// DownloadProcessingJobs rows. + /// already existed but had no caller, so the table grew unbounded on long-running + /// instances and inflated every queue-snapshot reconciliation that queries it. + /// Runs shortly after startup (to drain any accumulated backlog) and then daily. + /// + public class DownloadProcessingJobCleanupService : BackgroundService + { + private readonly IServiceScopeFactory _scopeFactory; + private readonly ILogger _logger; + + // Terminal jobs older than this are eligible for deletion. Mirrors the existing + // default on CleanupOldJobsAsync; kept as a constant to avoid a settings migration. + internal const int RetentionDays = 7; + + private readonly TimeSpan _initialDelay = TimeSpan.FromMinutes(5); + private readonly TimeSpan _cleanupInterval = TimeSpan.FromHours(24); + + public DownloadProcessingJobCleanupService( + IServiceScopeFactory scopeFactory, + ILogger logger) + { + _scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation( + "DownloadProcessingJob cleanup service starting (retention {RetentionDays}d, interval {IntervalHours}h)", + RetentionDays, + _cleanupInterval.TotalHours); + + try + { + await Task.Delay(_initialDelay, stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + return; + } + + while (!stoppingToken.IsCancellationRequested) + { + await RunCleanupAsync(stoppingToken); + + try + { + await Task.Delay(_cleanupInterval, stoppingToken); + } + catch (OperationCanceledException) + { + break; + } + } + + _logger.LogInformation("DownloadProcessingJob cleanup service stopping"); + } + + /// + /// Runs a single cleanup pass. Internal so it can be exercised directly in tests + /// without driving the background loop. + /// + internal async Task RunCleanupAsync(CancellationToken cancellationToken) + { + try + { + using var scope = _scopeFactory.CreateScope(); + var jobService = scope.ServiceProvider.GetRequiredService(); + await jobService.CleanupOldJobsAsync(RetentionDays); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) when (ex is not OperationCanceledException && ex is not OutOfMemoryException && ex is not StackOverflowException) + { + _logger.LogError(ex, "Error during DownloadProcessingJob cleanup pass"); + } + } + } +} diff --git a/listenarr.infrastructure/Extensions/HostedServiceRegistrationExtensions.cs b/listenarr.infrastructure/Extensions/HostedServiceRegistrationExtensions.cs index 6ddf5c36d..ebaa25e72 100644 --- a/listenarr.infrastructure/Extensions/HostedServiceRegistrationExtensions.cs +++ b/listenarr.infrastructure/Extensions/HostedServiceRegistrationExtensions.cs @@ -79,6 +79,10 @@ public static IServiceCollection AddListenarrHostedServices(this IServiceCollect // Register background service for download processing queue services.AddHostedService(); + // Periodically purge old completed/failed processing jobs so the table + // doesn't grow unbounded (CleanupOldJobsAsync previously had no caller) + services.AddHostedService(); + // Background worker that processes unmatched-file scan jobs services.AddHostedService(); diff --git a/tests/Builders/ServiceCollectionBuilder.cs b/tests/Builders/ServiceCollectionBuilder.cs index 63bfd501e..28775f471 100644 --- a/tests/Builders/ServiceCollectionBuilder.cs +++ b/tests/Builders/ServiceCollectionBuilder.cs @@ -228,6 +228,7 @@ private ServiceCollection BuildServices() // Background services services.AddSingleton(); // FIXME: This should be a processor services.AddSingleton(); + services.AddSingleton(); return services; } diff --git a/tests/Features/Api/Extensions/HostedServicesRegistrationTests.cs b/tests/Features/Api/Extensions/HostedServicesRegistrationTests.cs index 39f035440..6a199488f 100644 --- a/tests/Features/Api/Extensions/HostedServicesRegistrationTests.cs +++ b/tests/Features/Api/Extensions/HostedServicesRegistrationTests.cs @@ -54,6 +54,7 @@ public void AddListenarrHostedServices_RegistersHostedServicesAndSingletons() Assert.Contains(services, d => d.ServiceType == typeof(IHostedService) && d.ImplementationType == typeof(SeriesMonitoringBackgroundService)); Assert.Contains(services, d => d.ServiceType == typeof(IHostedService) && d.ImplementationType == typeof(FfmpegInstallBackgroundService)); Assert.Contains(services, d => d.ServiceType == typeof(IHostedService) && d.ImplementationType == typeof(MetadataRescanService)); + Assert.Contains(services, d => d.ServiceType == typeof(IHostedService) && d.ImplementationType == typeof(DownloadProcessingJobCleanupService)); // Assert - singletons / supporting services registered Assert.Contains(services, d => d.ServiceType == typeof(IScanQueueService) && d.Lifetime == ServiceLifetime.Singleton); diff --git a/tests/Features/Application/Downloads/DownloadProcessingJobCleanupServiceTests.cs b/tests/Features/Application/Downloads/DownloadProcessingJobCleanupServiceTests.cs new file mode 100644 index 000000000..2bd081d6f --- /dev/null +++ b/tests/Features/Application/Downloads/DownloadProcessingJobCleanupServiceTests.cs @@ -0,0 +1,78 @@ +/* + * Listenarr - Audiobook Management System + * Copyright (C) 2024-2026 Listenarr Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +using Listenarr.Application.Downloads; +using Listenarr.Tests.Builders; +using Listenarr.Tests.Common; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace Listenarr.Tests.Features.Application.Downloads +{ + [Trait("Name", "DownloadProcessingJobCleanupServiceTests")] + [Trait("Category", "DownloadProcessingJob")] + public class DownloadProcessingJobCleanupServiceTests : BaseTests + { + [Fact] + [Trait("Scenario", "Cleanup removes terminal jobs past the retention window")] + public async Task RunCleanupAsync_RemovesOldTerminalJobs_KeepsRecentAndActive() + { + var beyondRetention = DateTime.UtcNow.AddDays(-(DownloadProcessingJobCleanupService.RetentionDays + 1)); + var withinRetention = DateTime.UtcNow.AddDays(-1); + + // Old completed job -> should be purged + await _downloadProcessingJobRepository.AddAsync(new DownloadProcessingJobBuilder() + .WithId("job-old-completed") + .WithCompleted(at: beyondRetention) + .Build()); + + // Recent completed job -> should be retained + await _downloadProcessingJobRepository.AddAsync(new DownloadProcessingJobBuilder() + .WithId("job-recent-completed") + .WithCompleted(at: withinRetention) + .Build()); + + // Pending job -> never purged regardless of age + await _downloadProcessingJobRepository.AddAsync(new DownloadProcessingJobBuilder() + .WithId("job-pending") + .WithPending(at: beyondRetention) + .Build()); + + var service = _provider.GetRequiredService(); + await service.RunCleanupAsync(CancellationToken.None); + + Assert.Null(await _downloadProcessingJobRepository.GetByIdAsync("job-old-completed")); + Assert.NotNull(await _downloadProcessingJobRepository.GetByIdAsync("job-recent-completed")); + Assert.NotNull(await _downloadProcessingJobRepository.GetByIdAsync("job-pending")); + } + + [Fact] + [Trait("Scenario", "Cleanup is a no-op when nothing is eligible")] + public async Task RunCleanupAsync_NoEligibleJobs_DoesNothing() + { + await _downloadProcessingJobRepository.AddAsync(new DownloadProcessingJobBuilder() + .WithId("job-recent-completed") + .WithCompleted(at: DateTime.UtcNow.AddHours(-1)) + .Build()); + + var service = _provider.GetRequiredService(); + await service.RunCleanupAsync(CancellationToken.None); + + Assert.NotNull(await _downloadProcessingJobRepository.GetByIdAsync("job-recent-completed")); + } + } +} From d8990ef19ecfeef9d98ae822a726dd0ae41d836d Mon Sep 17 00:00:00 2001 From: Kevin Heneveld <1192102+kevinheneveld@users.noreply.github.com> Date: Mon, 8 Jun 2026 09:17:33 -0800 Subject: [PATCH 2/2] refactor(downloads): fold job retention cleanup into the processor Addresses review feedback on #641: - Remove the standalone DownloadProcessingJobCleanupService and run the daily retention purge from DownloadProcessingJobProcessor, which already owns job-table maintenance (startup stuck-job reset). One worker, not two. - Move the retention policy (terminal statuses + age cutoff) out of the EF repository and into the application service; the repository method is now a thin DeleteCompletedBeforeAsync parameterized by that policy. Cleanup tests are rehomed to drive the processor end-to-end through the moved policy. Behavior is unchanged: terminal jobs older than 7 days are purged shortly after startup and then daily. Co-Authored-By: Claude Opus 4.8 --- CHANGELOG.md | 2 +- .../DownloadProcessingJobCleanupService.cs | 108 ------------------ .../DownloadProcessingJobProcessor.cs | 38 ++++++ .../Downloads/DownloadProcessingJobService.cs | 14 ++- .../IDownloadProcessingJobRepository.cs | 9 +- .../HostedServiceRegistrationExtensions.cs | 8 +- .../EfDownloadProcessingJobRepository.cs | 16 +-- tests/Builders/ServiceCollectionBuilder.cs | 1 - .../HostedServicesRegistrationTests.cs | 2 +- ...s => DownloadProcessingJobCleanupTests.cs} | 19 +-- 10 files changed, 84 insertions(+), 133 deletions(-) delete mode 100644 listenarr.application/Downloads/DownloadProcessingJobCleanupService.cs rename tests/Features/Application/Downloads/{DownloadProcessingJobCleanupServiceTests.cs => DownloadProcessingJobCleanupTests.cs} (78%) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf0344534..d970cb981 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Authentication settings: admin provisioning failures no longer silently let the auth-required toggle proceed.** `ConfigurationService.SaveApplicationSettingsAsync` previously caught any exception from `CreateUserAsync` / `UpdatePasswordAsync`, logged it, and returned successfully — so when admin credentials were supplied but the user-service rejected them (password policy violation, repo I/O error, concurrent-write race), `SettingsView.saveSettings()` would still go on to persist `AuthenticationRequired=true` on its second request. The result was an instance that required login but had no working admin account — exactly the lockout shape the credential-visibility fix below was meant to prevent. The catch now re-throws the failure so the caller aborts before the auth-toggle write. The settings row itself is still saved before the admin block (non-admin changes like notification triggers and webhooks shouldn't disappear because admin provisioning failed), and the no-credentials path remains an unchanged silent skip. - **Authentication settings: corrected misleading description on the "Enable login screen" toggle.** Previously said *"Changes here are local and will not modify server files — edit config/config.json on the host to persist"*, which was demonstrably wrong: `SettingsView` actually writes `authenticationRequired` back to the server's startup config on save. The description now accurately states the toggle persists, and prompts the user to set admin credentials in the same save. - **Authentication settings: admin credential fields are always visible.** Previously the *Admin Account Management* row was gated by `v-if="authEnabledComputed"` in `AuthenticationSection.vue`, which meant the only way to surface the username/password inputs was to first toggle on the login screen. If a user enabled `AuthenticationRequired` via `config.json` on the host (e.g., for the very first time) and then opened settings, the toggle reflected the server state (on), but if they instead opened settings *with auth still off*, the fields were hidden — and once they ticked the toggle and saved, the login screen activated immediately on the next page load, locking them out before they could create a user. The fields now render unconditionally so credentials can be configured before or after enabling auth. Help text and the password placeholder were updated to reflect the create-or-update semantics (blank password = keep existing). -- **Download processing jobs grew unbounded:** `IDownloadProcessingJobService.CleanupOldJobsAsync` existed but was never invoked, so the `DownloadProcessingJobs` table accumulated completed/failed rows indefinitely on long-running instances — inflating every queue-snapshot reconciliation that queries it. A new `DownloadProcessingJobCleanupService` hosted worker now purges terminal jobs older than the retention window (7 days) shortly after startup and then daily. +- **Download processing jobs grew unbounded:** `IDownloadProcessingJobService.CleanupOldJobsAsync` existed but was never invoked, so the `DownloadProcessingJobs` table accumulated completed/failed rows indefinitely on long-running instances — inflating every queue-snapshot reconciliation that queries it. The existing `DownloadProcessingJobProcessor` worker (which already owns job-table maintenance) now purges terminal jobs older than the retention window (7 days) shortly after startup and then daily. The retention policy lives in the application layer; the repository was reduced to a thin delete. ## [0.2.71] - 2026-04-17 diff --git a/listenarr.application/Downloads/DownloadProcessingJobCleanupService.cs b/listenarr.application/Downloads/DownloadProcessingJobCleanupService.cs deleted file mode 100644 index e709d5272..000000000 --- a/listenarr.application/Downloads/DownloadProcessingJobCleanupService.cs +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Listenarr - Audiobook Management System - * Copyright (C) 2024-2026 Listenarr Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -using Listenarr.Application.Interfaces; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; - -namespace Listenarr.Application.Downloads -{ - /// - /// Background service that periodically purges old completed/failed - /// DownloadProcessingJobs rows. - /// already existed but had no caller, so the table grew unbounded on long-running - /// instances and inflated every queue-snapshot reconciliation that queries it. - /// Runs shortly after startup (to drain any accumulated backlog) and then daily. - /// - public class DownloadProcessingJobCleanupService : BackgroundService - { - private readonly IServiceScopeFactory _scopeFactory; - private readonly ILogger _logger; - - // Terminal jobs older than this are eligible for deletion. Mirrors the existing - // default on CleanupOldJobsAsync; kept as a constant to avoid a settings migration. - internal const int RetentionDays = 7; - - private readonly TimeSpan _initialDelay = TimeSpan.FromMinutes(5); - private readonly TimeSpan _cleanupInterval = TimeSpan.FromHours(24); - - public DownloadProcessingJobCleanupService( - IServiceScopeFactory scopeFactory, - ILogger logger) - { - _scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory)); - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - _logger.LogInformation( - "DownloadProcessingJob cleanup service starting (retention {RetentionDays}d, interval {IntervalHours}h)", - RetentionDays, - _cleanupInterval.TotalHours); - - try - { - await Task.Delay(_initialDelay, stoppingToken); - } - catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) - { - return; - } - - while (!stoppingToken.IsCancellationRequested) - { - await RunCleanupAsync(stoppingToken); - - try - { - await Task.Delay(_cleanupInterval, stoppingToken); - } - catch (OperationCanceledException) - { - break; - } - } - - _logger.LogInformation("DownloadProcessingJob cleanup service stopping"); - } - - /// - /// Runs a single cleanup pass. Internal so it can be exercised directly in tests - /// without driving the background loop. - /// - internal async Task RunCleanupAsync(CancellationToken cancellationToken) - { - try - { - using var scope = _scopeFactory.CreateScope(); - var jobService = scope.ServiceProvider.GetRequiredService(); - await jobService.CleanupOldJobsAsync(RetentionDays); - } - catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) - { - throw; - } - catch (Exception ex) when (ex is not OperationCanceledException && ex is not OutOfMemoryException && ex is not StackOverflowException) - { - _logger.LogError(ex, "Error during DownloadProcessingJob cleanup pass"); - } - } - } -} diff --git a/listenarr.application/Downloads/DownloadProcessingJobProcessor.cs b/listenarr.application/Downloads/DownloadProcessingJobProcessor.cs index 7f7edb800..5f4e25aa7 100644 --- a/listenarr.application/Downloads/DownloadProcessingJobProcessor.cs +++ b/listenarr.application/Downloads/DownloadProcessingJobProcessor.cs @@ -36,6 +36,15 @@ public class DownloadProcessingJobProcessor( { private readonly TimeSpan _processingInterval = TimeSpan.FromSeconds(10); // Check every 10 seconds + // Retention cleanup is folded into this worker (rather than a separate hosted service) + // since the processor already owns job-table maintenance — see the stuck-job reset below. + // Terminal jobs older than the retention window are purged shortly after startup and daily + // thereafter so DownloadProcessingJobs doesn't grow unbounded and inflate every + // queue-snapshot reconciliation that queries it. + private readonly TimeSpan _cleanupInterval = TimeSpan.FromHours(24); + internal const int JobRetentionDays = 7; + private DateTime _nextCleanupAtUtc = DateTime.MinValue; // due on the first loop iteration + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { logger.LogInformation("Download Processing Background Service started"); @@ -79,6 +88,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) logger.LogError(ex, "Error processing download queue"); } + if (DateTime.UtcNow >= _nextCleanupAtUtc) + { + await RunCleanupAsync(stoppingToken); + _nextCleanupAtUtc = DateTime.UtcNow.Add(_cleanupInterval); + } + try { await Task.Delay(_processingInterval, stoppingToken); @@ -92,6 +107,29 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) logger.LogInformation("Download Processing Background Service stopped"); } + /// + /// Runs a single retention-cleanup pass, delegating the policy to the application layer. + /// Internal so it can be exercised directly in tests without driving the background loop. + /// Failures are swallowed (logged) so a cleanup error never tears down the processor loop. + /// + internal async Task RunCleanupAsync(CancellationToken cancellationToken) + { + try + { + using var scope = scopeFactory.CreateScope(); + var downloadProcessingJobService = scope.ServiceProvider.GetRequiredService(); + await downloadProcessingJobService.CleanupOldJobsAsync(JobRetentionDays); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) when (ex is not OperationCanceledException && ex is not OutOfMemoryException && ex is not StackOverflowException) + { + logger.LogError(ex, "Error during DownloadProcessingJob cleanup pass"); + } + } + internal async Task ProcessQueueAsync(CancellationToken cancellationToken) { using var scope = scopeFactory.CreateScope(); diff --git a/listenarr.application/Downloads/DownloadProcessingJobService.cs b/listenarr.application/Downloads/DownloadProcessingJobService.cs index 69baed6cd..303a911cb 100644 --- a/listenarr.application/Downloads/DownloadProcessingJobService.cs +++ b/listenarr.application/Downloads/DownloadProcessingJobService.cs @@ -101,8 +101,20 @@ public async Task> GetJobsForDownloadAsync(string do public async Task GetStatsAsync() => await jobRepository.GetStatsAsync(); + // Jobs in a terminal state are eligible for retention cleanup; in-flight states + // (Pending/Processing/Retry) are never purged regardless of age. + private static readonly ProcessingJobStatus[] TerminalStatuses = + [ProcessingJobStatus.Completed, ProcessingJobStatus.Failed]; + public async Task CleanupOldJobsAsync(int retentionDays = 7) - => await jobRepository.CleanupOldJobsAsync(retentionDays); + { + var cutoffUtc = DateTime.UtcNow.AddDays(-retentionDays); + var removed = await jobRepository.DeleteCompletedBeforeAsync(TerminalStatuses, cutoffUtc); + if (removed > 0) + { + logger.LogInformation("Cleaned up {Count} old processing jobs older than {Days} days", removed, retentionDays); + } + } public async Task> GetRecentActivityAsync(int count = 50) => await jobRepository.GetRecentAsync(count); diff --git a/listenarr.application/Interfaces/Repositories/IDownloadProcessingJobRepository.cs b/listenarr.application/Interfaces/Repositories/IDownloadProcessingJobRepository.cs index ddb53944f..b49fe9324 100644 --- a/listenarr.application/Interfaces/Repositories/IDownloadProcessingJobRepository.cs +++ b/listenarr.application/Interfaces/Repositories/IDownloadProcessingJobRepository.cs @@ -37,7 +37,14 @@ public interface IDownloadProcessingJobRepository Task GetByIdAsync(string jobId); Task> GetByDownloadIdAsync(string downloadId); Task GetStatsAsync(); - Task CleanupOldJobsAsync(int retentionDays); + + /// + /// Deletes jobs in the given whose CompletedAt is before + /// , returning the number of rows removed. Pure data access: the + /// retention policy (which statuses are terminal, how the cutoff is derived) is owned by the + /// application layer and passed in. + /// + Task DeleteCompletedBeforeAsync(IReadOnlyCollection statuses, DateTime cutoffUtc); Task> GetRecentAsync(int count); Task> GetStuckProcessingJobsAsync(CancellationToken cancellationToken = default); } diff --git a/listenarr.infrastructure/Extensions/HostedServiceRegistrationExtensions.cs b/listenarr.infrastructure/Extensions/HostedServiceRegistrationExtensions.cs index ebaa25e72..089dadfc1 100644 --- a/listenarr.infrastructure/Extensions/HostedServiceRegistrationExtensions.cs +++ b/listenarr.infrastructure/Extensions/HostedServiceRegistrationExtensions.cs @@ -76,13 +76,11 @@ public static IServiceCollection AddListenarrHostedServices(this IServiceCollect // Background service to rescan files missing metadata services.AddHostedService(); - // Register background service for download processing queue + // Register background service for download processing queue. This worker also + // periodically purges old completed/failed processing jobs so the table doesn't + // grow unbounded (CleanupOldJobsAsync previously had no caller). services.AddHostedService(); - // Periodically purge old completed/failed processing jobs so the table - // doesn't grow unbounded (CleanupOldJobsAsync previously had no caller) - services.AddHostedService(); - // Background worker that processes unmatched-file scan jobs services.AddHostedService(); diff --git a/listenarr.infrastructure/Persistence/Repositories/EfDownloadProcessingJobRepository.cs b/listenarr.infrastructure/Persistence/Repositories/EfDownloadProcessingJobRepository.cs index 9712c535a..effa9fccb 100644 --- a/listenarr.infrastructure/Persistence/Repositories/EfDownloadProcessingJobRepository.cs +++ b/listenarr.infrastructure/Persistence/Repositories/EfDownloadProcessingJobRepository.cs @@ -158,21 +158,21 @@ public async Task GetStatsAsync() return result; } - public async Task CleanupOldJobsAsync(int retentionDays) + public async Task DeleteCompletedBeforeAsync(IReadOnlyCollection statuses, DateTime cutoffUtc) { - var cutoffDate = DateTime.UtcNow.AddDays(-retentionDays); await using var ctx = await _dbFactory.CreateDbContextAsync(); var oldJobs = await ctx.DownloadProcessingJobs - .Where(j => (j.Status == ProcessingJobStatus.Completed || j.Status == ProcessingJobStatus.Failed) && - j.CompletedAt.HasValue && j.CompletedAt < cutoffDate) + .Where(j => statuses.Contains(j.Status) && j.CompletedAt.HasValue && j.CompletedAt < cutoffUtc) .ToListAsync(); - if (oldJobs.Any()) + if (oldJobs.Count == 0) { - ctx.DownloadProcessingJobs.RemoveRange(oldJobs); - await ctx.SaveChangesAsync(); - _logger.LogInformation("Cleaned up {Count} old processing jobs older than {Days} days", oldJobs.Count, retentionDays); + return 0; } + + ctx.DownloadProcessingJobs.RemoveRange(oldJobs); + await ctx.SaveChangesAsync(); + return oldJobs.Count; } public async Task> GetRecentAsync(int count) diff --git a/tests/Builders/ServiceCollectionBuilder.cs b/tests/Builders/ServiceCollectionBuilder.cs index 28775f471..63bfd501e 100644 --- a/tests/Builders/ServiceCollectionBuilder.cs +++ b/tests/Builders/ServiceCollectionBuilder.cs @@ -228,7 +228,6 @@ private ServiceCollection BuildServices() // Background services services.AddSingleton(); // FIXME: This should be a processor services.AddSingleton(); - services.AddSingleton(); return services; } diff --git a/tests/Features/Api/Extensions/HostedServicesRegistrationTests.cs b/tests/Features/Api/Extensions/HostedServicesRegistrationTests.cs index 6a199488f..26e121dd0 100644 --- a/tests/Features/Api/Extensions/HostedServicesRegistrationTests.cs +++ b/tests/Features/Api/Extensions/HostedServicesRegistrationTests.cs @@ -54,7 +54,7 @@ public void AddListenarrHostedServices_RegistersHostedServicesAndSingletons() Assert.Contains(services, d => d.ServiceType == typeof(IHostedService) && d.ImplementationType == typeof(SeriesMonitoringBackgroundService)); Assert.Contains(services, d => d.ServiceType == typeof(IHostedService) && d.ImplementationType == typeof(FfmpegInstallBackgroundService)); Assert.Contains(services, d => d.ServiceType == typeof(IHostedService) && d.ImplementationType == typeof(MetadataRescanService)); - Assert.Contains(services, d => d.ServiceType == typeof(IHostedService) && d.ImplementationType == typeof(DownloadProcessingJobCleanupService)); + Assert.Contains(services, d => d.ServiceType == typeof(IHostedService) && d.ImplementationType == typeof(DownloadProcessingJobProcessor)); // Assert - singletons / supporting services registered Assert.Contains(services, d => d.ServiceType == typeof(IScanQueueService) && d.Lifetime == ServiceLifetime.Singleton); diff --git a/tests/Features/Application/Downloads/DownloadProcessingJobCleanupServiceTests.cs b/tests/Features/Application/Downloads/DownloadProcessingJobCleanupTests.cs similarity index 78% rename from tests/Features/Application/Downloads/DownloadProcessingJobCleanupServiceTests.cs rename to tests/Features/Application/Downloads/DownloadProcessingJobCleanupTests.cs index 2bd081d6f..f4f323142 100644 --- a/tests/Features/Application/Downloads/DownloadProcessingJobCleanupServiceTests.cs +++ b/tests/Features/Application/Downloads/DownloadProcessingJobCleanupTests.cs @@ -23,15 +23,20 @@ namespace Listenarr.Tests.Features.Application.Downloads { - [Trait("Name", "DownloadProcessingJobCleanupServiceTests")] + /// + /// Retention cleanup is folded into the rather than + /// living in a separate hosted service. These tests drive its cleanup entry point end-to-end so + /// the retention policy (now owned by the application layer) is exercised through to the repository. + /// + [Trait("Name", "DownloadProcessingJobCleanupTests")] [Trait("Category", "DownloadProcessingJob")] - public class DownloadProcessingJobCleanupServiceTests : BaseTests + public class DownloadProcessingJobCleanupTests : BaseTests { [Fact] [Trait("Scenario", "Cleanup removes terminal jobs past the retention window")] public async Task RunCleanupAsync_RemovesOldTerminalJobs_KeepsRecentAndActive() { - var beyondRetention = DateTime.UtcNow.AddDays(-(DownloadProcessingJobCleanupService.RetentionDays + 1)); + var beyondRetention = DateTime.UtcNow.AddDays(-(DownloadProcessingJobProcessor.JobRetentionDays + 1)); var withinRetention = DateTime.UtcNow.AddDays(-1); // Old completed job -> should be purged @@ -52,8 +57,8 @@ await _downloadProcessingJobRepository.AddAsync(new DownloadProcessingJobBuilder .WithPending(at: beyondRetention) .Build()); - var service = _provider.GetRequiredService(); - await service.RunCleanupAsync(CancellationToken.None); + var processor = _provider.GetRequiredService(); + await processor.RunCleanupAsync(CancellationToken.None); Assert.Null(await _downloadProcessingJobRepository.GetByIdAsync("job-old-completed")); Assert.NotNull(await _downloadProcessingJobRepository.GetByIdAsync("job-recent-completed")); @@ -69,8 +74,8 @@ await _downloadProcessingJobRepository.AddAsync(new DownloadProcessingJobBuilder .WithCompleted(at: DateTime.UtcNow.AddHours(-1)) .Build()); - var service = _provider.GetRequiredService(); - await service.RunCleanupAsync(CancellationToken.None); + var processor = _provider.GetRequiredService(); + await processor.RunCleanupAsync(CancellationToken.None); Assert.NotNull(await _downloadProcessingJobRepository.GetByIdAsync("job-recent-completed")); }