Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 68 additions & 10 deletions src/scanner/library_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::db::entities::{books, series};
use crate::db::repositories::{
BookRepository, LibraryRepository, SeriesRepository, TaskRepository,
};
use crate::events::EventBroadcaster;
use crate::events::{EventBroadcaster, TaskProgressEvent};
use crate::models::SeriesStrategy;
use crate::tasks::types::TaskType;

Expand Down Expand Up @@ -154,16 +154,29 @@ struct SharedScanState {
result: Arc<Mutex<ScanResult>>,
/// Progress channel sender
progress_tx: Option<mpsc::Sender<ScanProgress>>,
/// Optional task id + broadcaster for emitting TaskProgressEvent
task_id: Option<Uuid>,
library_name: String,
event_broadcaster: Option<Arc<EventBroadcaster>>,
}

impl SharedScanState {
fn new(library_id: Uuid, progress_tx: Option<mpsc::Sender<ScanProgress>>) -> Self {
fn new(
library_id: Uuid,
library_name: String,
progress_tx: Option<mpsc::Sender<ScanProgress>>,
task_id: Option<Uuid>,
event_broadcaster: Option<Arc<EventBroadcaster>>,
) -> Self {
let mut progress = ScanProgress::new(library_id);
progress.start();
Self {
progress: Arc::new(Mutex::new(progress)),
result: Arc::new(Mutex::new(ScanResult::new())),
progress_tx,
task_id,
library_name,
event_broadcaster,
}
}

Expand Down Expand Up @@ -199,13 +212,39 @@ impl SharedScanState {
}
}

/// Send current progress through the channel
/// Send current progress through the channel and emit a TaskProgressEvent
/// if a task id and broadcaster are available.
async fn send_progress(&self) {
if let Some(ref tx) = self.progress_tx {
let progress = self.progress.lock().await.clone();
if let Err(e) = tx.send(progress).await {
warn!("Failed to send progress update: {}", e);
}
let progress = self.progress.lock().await.clone();

if let (Some(task_id), Some(broadcaster)) = (self.task_id, self.event_broadcaster.as_ref())
{
let total = progress.files_total;
let current = progress.files_processed.min(total.max(1));
let message = if total == 0 {
format!("Scanning {} (discovering files…)", self.library_name)
} else {
format!(
"Scanning {} ({}/{} files, {} series, {} books)",
self.library_name, current, total, progress.series_found, progress.books_found,
)
};
let _ = broadcaster.emit_task(TaskProgressEvent::progress(
task_id,
"scan_library",
current,
total.max(current),
Some(message),
Some(progress.library_id),
None,
None,
));
}

if let Some(ref tx) = self.progress_tx
&& let Err(e) = tx.send(progress).await
{
warn!("Failed to send progress update: {}", e);
}
}

Expand Down Expand Up @@ -244,6 +283,9 @@ impl Clone for SharedScanState {
progress: Arc::clone(&self.progress),
result: Arc::clone(&self.result),
progress_tx: self.progress_tx.clone(),
task_id: self.task_id,
library_name: self.library_name.clone(),
event_broadcaster: self.event_broadcaster.clone(),
}
}
}
Expand Down Expand Up @@ -376,6 +418,7 @@ pub async fn scan_library(
mode: ScanMode,
progress_tx: Option<mpsc::Sender<ScanProgress>>,
event_broadcaster: Option<&Arc<EventBroadcaster>>,
task_id: Option<Uuid>,
) -> Result<ScanResult> {
let scan_start = Instant::now();
info!("Starting {} scan for library {}", mode, library_id);
Expand All @@ -401,7 +444,15 @@ pub async fn scan_library(

// Execute optimized batched scan (handles both Normal and Deep modes)
// The batched scan manages its own progress tracking internally
let result = scan_batched(db, &library, mode, progress_tx.clone(), event_broadcaster).await;
let result = scan_batched(
db,
&library,
mode,
progress_tx.clone(),
event_broadcaster,
task_id,
)
.await;

// Send final progress update
let mut final_progress = ScanProgress::new(library_id);
Expand Down Expand Up @@ -482,6 +533,7 @@ async fn scan_batched(
mode: ScanMode,
progress_tx: Option<mpsc::Sender<ScanProgress>>,
event_broadcaster: Option<&Arc<EventBroadcaster>>,
task_id: Option<Uuid>,
) -> Result<ScanResult> {
// Load scanner configuration from database settings
let config = ScannerConfig::load(db).await;
Expand All @@ -491,7 +543,13 @@ async fn scan_batched(
);

// Create shared state for thread-safe progress tracking
let shared_state = SharedScanState::new(library.id, progress_tx);
let shared_state = SharedScanState::new(
library.id,
library.name.clone(),
progress_tx,
task_id,
event_broadcaster.cloned(),
);

// Always load existing books from database
// - Normal mode: used for change detection (skip unchanged files)
Expand Down
38 changes: 35 additions & 3 deletions src/tasks/handlers/analyze_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing::{error, info};

use crate::db::entities::tasks;
use crate::db::repositories::{BookRepository, TaskRepository};
use crate::events::EventBroadcaster;
use crate::events::{EventBroadcaster, TaskProgressEvent};
use crate::tasks::handlers::TaskHandler;
use crate::tasks::types::{TaskResult, TaskType};

Expand All @@ -29,7 +29,7 @@ impl TaskHandler for AnalyzeSeriesHandler {
&'a self,
task: &'a tasks::Model,
db: &'a DatabaseConnection,
_event_broadcaster: Option<&'a Arc<EventBroadcaster>>,
event_broadcaster: Option<&'a Arc<EventBroadcaster>>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<TaskResult>> + Send + 'a>> {
Box::pin(async move {
let series_id = task
Expand All @@ -53,11 +53,24 @@ impl TaskHandler for AnalyzeSeriesHandler {
));
}

if let Some(broadcaster) = event_broadcaster {
let _ = broadcaster.emit_task(TaskProgressEvent::progress(
task.id,
"analyze_series",
0,
total_books,
Some(format!("Enqueueing analysis for {} book(s)", total_books)),
task.library_id,
Some(series_id),
None,
));
}

// Enqueue individual AnalyzeBook tasks with force=true
let mut enqueued = 0;
let mut errors = Vec::new();

for book in books {
for (idx, book) in books.iter().enumerate() {
match TaskRepository::enqueue(
db,
TaskType::AnalyzeBook {
Expand All @@ -75,6 +88,25 @@ impl TaskHandler for AnalyzeSeriesHandler {
errors.push(err_msg);
}
}

if let Some(broadcaster) = event_broadcaster {
let current = idx + 1;
let _ = broadcaster.emit_task(TaskProgressEvent::progress(
task.id,
"analyze_series",
current,
total_books,
Some(format!(
"Enqueueing analysis ({}/{}, {} failed)",
current,
total_books,
errors.len()
)),
task.library_id,
Some(series_id),
Some(book.id),
));
}
}

info!(
Expand Down
43 changes: 41 additions & 2 deletions src/tasks/handlers/cleanup_series_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::{info, warn};

use crate::db::entities::tasks;
use crate::db::repositories::SeriesExportRepository;
use crate::events::EventBroadcaster;
use crate::events::{EventBroadcaster, TaskProgressEvent};
use crate::services::SettingsService;
use crate::services::export_storage::ExportStorage;
use crate::tasks::handlers::TaskHandler;
Expand Down Expand Up @@ -41,19 +41,40 @@ impl TaskHandler for CleanupSeriesExportsHandler {
&'a self,
task: &'a tasks::Model,
db: &'a DatabaseConnection,
_event_broadcaster: Option<&'a Arc<EventBroadcaster>>,
event_broadcaster: Option<&'a Arc<EventBroadcaster>>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<TaskResult>> + Send + 'a>> {
Box::pin(async move {
let task_id = task.id;
info!("Task {task_id}: Starting series exports cleanup");

// Three sequential phases: expired → stale tmp → cap evict.
// We expose them as 3 high-level steps so the user always sees progress
// even when individual phases are short.
const PHASE_TOTAL: usize = 3;

let emit = |current: usize, message: String| {
if let Some(broadcaster) = event_broadcaster {
let _ = broadcaster.emit_task(TaskProgressEvent::progress(
task.id,
"cleanup_series_exports",
current,
PHASE_TOTAL,
Some(message),
None,
None,
None,
));
}
};

let now = Utc::now();
let mut expired_count = 0u64;
let mut stale_tmp_count = 0u64;
let mut cap_evicted_count = 0u64;

// 1. Delete expired exports (completed + expires_at < now)
let expired = SeriesExportRepository::list_expired(db, now).await?;
emit(0, format!("Deleting {} expired export(s)", expired.len()));
for export in &expired {
// Delete file
let _ = self
Expand All @@ -74,6 +95,10 @@ impl TaskHandler for CleanupSeriesExportsHandler {
if expired_count > 0 {
info!("Task {task_id}: Deleted {expired_count} expired exports");
}
emit(
1,
format!("Sweeping stale .tmp files (deleted {expired_count} expired)"),
);

// 2. Sweep stale .tmp files (older than 1 hour)
let stale_duration = std::time::Duration::from_secs(3600);
Expand Down Expand Up @@ -102,6 +127,13 @@ impl TaskHandler for CleanupSeriesExportsHandler {
}
}

emit(
2,
format!(
"Enforcing storage cap ({expired_count} expired, {stale_tmp_count} stale removed)"
),
);

// 3. Enforce global storage cap
let cap_bytes = self
.settings_service
Expand Down Expand Up @@ -157,6 +189,13 @@ impl TaskHandler for CleanupSeriesExportsHandler {
}
}

emit(
PHASE_TOTAL,
format!(
"Cleanup complete ({expired_count} expired, {stale_tmp_count} stale, {cap_evicted_count} evicted)"
),
);

let total_cleaned = expired_count + stale_tmp_count + cap_evicted_count;
let message = if total_cleaned == 0 {
"No exports needed cleanup".to_string()
Expand Down
41 changes: 38 additions & 3 deletions src/tasks/handlers/generate_series_thumbnails.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::{debug, info, warn};

use crate::db::entities::tasks;
use crate::db::repositories::{SeriesRepository, TaskRepository};
use crate::events::EventBroadcaster;
use crate::events::{EventBroadcaster, TaskProgressEvent};
use crate::services::ThumbnailService;
use crate::tasks::handlers::TaskHandler;
use crate::tasks::types::{TaskResult, TaskType};
Expand All @@ -31,7 +31,7 @@ impl TaskHandler for GenerateSeriesThumbnailsHandler {
&'a self,
task: &'a tasks::Model,
db: &'a DatabaseConnection,
_event_broadcaster: Option<&'a Arc<EventBroadcaster>>,
event_broadcaster: Option<&'a Arc<EventBroadcaster>>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<TaskResult>> + Send + 'a>> {
Box::pin(async move {
info!(
Expand Down Expand Up @@ -114,11 +114,27 @@ impl TaskHandler for GenerateSeriesThumbnailsHandler {
));
}

if let Some(broadcaster) = event_broadcaster {
let _ = broadcaster.emit_task(TaskProgressEvent::progress(
task.id,
"generate_series_thumbnails",
0,
to_process,
Some(format!(
"Enqueueing series thumbnail tasks for {} series ({} skipped)",
to_process, skipped
)),
library_id,
None,
None,
));
}

// Enqueue individual GenerateSeriesThumbnail tasks for each series
let mut enqueued = 0;
let mut errors = Vec::new();

for series in series_to_process {
for (idx, series) in series_to_process.iter().enumerate() {
let task_type = TaskType::GenerateSeriesThumbnail {
series_id: series.id,
force,
Expand All @@ -141,6 +157,25 @@ impl TaskHandler for GenerateSeriesThumbnailsHandler {
errors.push(error_msg);
}
}

if let Some(broadcaster) = event_broadcaster {
let current = idx + 1;
let _ = broadcaster.emit_task(TaskProgressEvent::progress(
task.id,
"generate_series_thumbnails",
current,
to_process,
Some(format!(
"Enqueueing series thumbnail tasks ({}/{}, {} failed)",
current,
to_process,
errors.len()
)),
library_id,
Some(series.id),
None,
));
}
}

info!(
Expand Down
Loading
Loading