From e6b35742bfe46eb6bea4de4b309303dd1a5e99b6 Mon Sep 17 00:00:00 2001 From: Smyile <84925446+davidetacchini@users.noreply.github.com> Date: Sat, 18 Apr 2026 22:31:09 +0200 Subject: [PATCH] test(server): extract retention tick and cover its behavior Splits the retention background loop into run_retention_tick so tests can drive the actual scheduler logic deterministically by injecting the "now" timestamp. Adds unit tests covering: - no-op when both retention_hours and max_messages are disabled - time-based purge removes rows older than the cutoff - max_messages trim removes the expected oldest rows - MessageDelete WsEvents match the deleted ids, including combined purge + trim passes Previously these paths were only exercised via repository-level tests, leaving the scheduler's event emission and branching untested. --- crates/rustmail-server/src/main.rs | 220 +++++++++++++++++++++++++---- 1 file changed, 190 insertions(+), 30 deletions(-) diff --git a/crates/rustmail-server/src/main.rs b/crates/rustmail-server/src/main.rs index b5f598e..35cfc67 100644 --- a/crates/rustmail-server/src/main.rs +++ b/crates/rustmail-server/src/main.rs @@ -458,6 +458,53 @@ fn parse_release_host(s: &str) -> (String, Option) { (s.to_string(), None) } +/// Runs a single retention sweep: purges messages older than `retention_hours` +/// and trims to `max_messages` when configured, broadcasting `MessageDelete` +/// events for each removed id. +/// +/// `now` is injected so callers can drive deterministic cutoffs in tests. +async fn run_retention_tick( + repo: &MessageRepository, + state: &AppState, + retention_hours: u64, + max_messages: i64, + now: OffsetDateTime, +) { + if retention_hours == 0 && max_messages == 0 { + return; + } + if retention_hours > 0 { + let cutoff = now - time::Duration::hours(retention_hours as i64); + let cutoff_str = format_iso8601(cutoff); + match repo.delete_older_than(&cutoff_str).await { + Ok(ids) if !ids.is_empty() => { + tracing::info!(deleted = ids.len(), "Retention: purged old messages"); + for id in ids { + state.broadcast(WsEvent::MessageDelete { id }); + } + } + Err(e) => { + tracing::error!(error = %e, "Retention: failed to purge"); + } + _ => {} + } + } + if max_messages > 0 { + match repo.trim_to_max(max_messages).await { + Ok(ids) if !ids.is_empty() => { + tracing::info!(deleted = ids.len(), "Retention: trimmed to max"); + for id in ids { + state.broadcast(WsEvent::MessageDelete { id }); + } + } + Err(e) => { + tracing::error!(error = %e, "Retention: failed to trim"); + } + _ => {} + } + } +} + async fn run_serve(args: ServeArgs) -> Result<()> { tracing_subscriber::fmt() .with_env_filter( @@ -577,36 +624,14 @@ async fn run_serve(args: ServeArgs) -> Result<()> { let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); loop { interval.tick().await; - if retention_hours > 0 { - let cutoff = OffsetDateTime::now_utc() - time::Duration::hours(retention_hours as i64); - let cutoff_str = format_iso8601(cutoff); - match repo.delete_older_than(&cutoff_str).await { - Ok(ids) if !ids.is_empty() => { - tracing::info!(deleted = ids.len(), "Retention: purged old messages"); - for id in ids { - state.broadcast(WsEvent::MessageDelete { id }); - } - } - Err(e) => { - tracing::error!(error = %e, "Retention: failed to purge"); - } - _ => {} - } - } - if max_messages > 0 { - match repo.trim_to_max(max_messages).await { - Ok(ids) if !ids.is_empty() => { - tracing::info!(deleted = ids.len(), "Retention: trimmed to max"); - for id in ids { - state.broadcast(WsEvent::MessageDelete { id }); - } - } - Err(e) => { - tracing::error!(error = %e, "Retention: failed to trim"); - } - _ => {} - } - } + run_retention_tick( + &repo, + &state, + retention_hours, + max_messages, + OffsetDateTime::now_utc(), + ) + .await; } }) }; @@ -654,3 +679,138 @@ async fn run_serve(args: ServeArgs) -> Result<()> { Ok(()) } + +#[cfg(test)] +mod retention_tests { + use super::*; + use rustmail_storage::{MessageRepository, initialize_database}; + use sqlx::sqlite::SqlitePoolOptions; + use tokio::sync::broadcast; + + async fn build_state() -> (MessageRepository, AppState, broadcast::Receiver) { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .unwrap(); + initialize_database(&pool).await.unwrap(); + let repo = MessageRepository::new(pool); + let (ws_tx, ws_rx) = broadcast::channel::(64); + let state = AppState::new(repo.clone(), ws_tx, None, None); + (repo, state, ws_rx) + } + + fn raw_email(subject: &str) -> Vec { + format!( + "From: a@test.com\r\nTo: b@test.com\r\nSubject: {subject}\r\nContent-Type: text/plain\r\n\r\nbody" + ) + .into_bytes() + } + + async fn insert_sample(repo: &MessageRepository, subject: &str) -> String { + repo + .insert("a@test.com", &["b@test.com".into()], &raw_email(subject)) + .await + .unwrap() + .id + } + + fn drain_delete_events(rx: &mut broadcast::Receiver) -> Vec { + let mut ids = Vec::new(); + while let Ok(event) = rx.try_recv() { + if let WsEvent::MessageDelete { id } = event { + ids.push(id); + } + } + ids + } + + #[tokio::test] + async fn tick_is_noop_when_retention_and_max_disabled() { + let (repo, state, mut rx) = build_state().await; + let _id = insert_sample(&repo, "keep-me").await; + + run_retention_tick(&repo, &state, 0, 0, OffsetDateTime::now_utc()).await; + + assert_eq!(repo.count().await.unwrap(), 1); + assert!(drain_delete_events(&mut rx).is_empty()); + } + + #[tokio::test] + async fn tick_purges_messages_older_than_cutoff() { + let (repo, state, mut rx) = build_state().await; + let old_id = insert_sample(&repo, "old").await; + + // Advance "now" 24h into the future with a 1h retention window so the + // stored row is older than the cutoff and gets deleted. + let future_now = OffsetDateTime::now_utc() + time::Duration::hours(24); + run_retention_tick(&repo, &state, 1, 0, future_now).await; + + assert_eq!(repo.count().await.unwrap(), 0); + let events = drain_delete_events(&mut rx); + assert_eq!(events, vec![old_id]); + } + + #[tokio::test] + async fn tick_preserves_rows_newer_than_cutoff() { + let (repo, state, mut rx) = build_state().await; + let fresh_id = insert_sample(&repo, "fresh").await; + + // Retention window far larger than any elapsed time → nothing to purge. + run_retention_tick(&repo, &state, 24, 0, OffsetDateTime::now_utc()).await; + + assert_eq!(repo.count().await.unwrap(), 1); + assert_eq!(repo.get(&fresh_id).await.unwrap().id, fresh_id); + assert!(drain_delete_events(&mut rx).is_empty()); + } + + #[tokio::test] + async fn tick_trims_to_max_messages() { + let (repo, state, mut rx) = build_state().await; + + let mut ids = Vec::new(); + for i in 0..5 { + ids.push(insert_sample(&repo, &format!("msg-{i}")).await); + // Keep ULIDs strictly monotonic so the two newest-by-id are deterministic. + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + } + + run_retention_tick(&repo, &state, 0, 2, OffsetDateTime::now_utc()).await; + + assert_eq!(repo.count().await.unwrap(), 2); + let events = drain_delete_events(&mut rx); + let expected_deleted: Vec = ids.iter().take(3).cloned().collect(); + let mut got = events.clone(); + got.sort(); + let mut want = expected_deleted.clone(); + want.sort(); + assert_eq!(got, want, "emitted delete events must match deleted ids"); + + // The two newest ids survive. + for surviving in ids.iter().skip(3) { + assert_eq!(repo.get(surviving).await.unwrap().id, *surviving); + } + } + + #[tokio::test] + async fn tick_emits_events_for_combined_purge_and_trim() { + let (repo, state, mut rx) = build_state().await; + + let mut ids = Vec::new(); + for i in 0..4 { + ids.push(insert_sample(&repo, &format!("m-{i}")).await); + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + } + + // Cutoff keeps all four rows; trim drops the oldest two, so exactly two + // delete events should fire and match the two oldest ids. + run_retention_tick(&repo, &state, 24, 2, OffsetDateTime::now_utc()).await; + + assert_eq!(repo.count().await.unwrap(), 2); + let mut got = drain_delete_events(&mut rx); + got.sort(); + let mut want: Vec = ids.iter().take(2).cloned().collect(); + want.sort(); + assert_eq!(got, want); + } +}