Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 190 additions & 30 deletions crates/rustmail-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,53 @@ fn parse_release_host(s: &str) -> (String, Option<u16>) {
(s.to_string(), None)
}

/// Runs a single retention sweep: purges messages older than `retention_hours`
/// and trims to `max_messages` when configured, broadcasting `MessageDelete`
/// events for each removed id.
///
/// `now` is injected so callers can drive deterministic cutoffs in tests.
async fn run_retention_tick(
repo: &MessageRepository,
state: &AppState,
retention_hours: u64,
max_messages: i64,
now: OffsetDateTime,
) {
if retention_hours == 0 && max_messages == 0 {
return;
}
if retention_hours > 0 {
let cutoff = now - time::Duration::hours(retention_hours as i64);
let cutoff_str = format_iso8601(cutoff);
match repo.delete_older_than(&cutoff_str).await {
Ok(ids) if !ids.is_empty() => {
tracing::info!(deleted = ids.len(), "Retention: purged old messages");
for id in ids {
state.broadcast(WsEvent::MessageDelete { id });
}
}
Err(e) => {
tracing::error!(error = %e, "Retention: failed to purge");
}
_ => {}
}
}
if max_messages > 0 {
match repo.trim_to_max(max_messages).await {
Ok(ids) if !ids.is_empty() => {
tracing::info!(deleted = ids.len(), "Retention: trimmed to max");
for id in ids {
state.broadcast(WsEvent::MessageDelete { id });
}
}
Err(e) => {
tracing::error!(error = %e, "Retention: failed to trim");
}
_ => {}
}
}
}

async fn run_serve(args: ServeArgs) -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
Expand Down Expand Up @@ -577,36 +624,14 @@ async fn run_serve(args: ServeArgs) -> Result<()> {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
loop {
interval.tick().await;
if retention_hours > 0 {
let cutoff = OffsetDateTime::now_utc() - time::Duration::hours(retention_hours as i64);
let cutoff_str = format_iso8601(cutoff);
match repo.delete_older_than(&cutoff_str).await {
Ok(ids) if !ids.is_empty() => {
tracing::info!(deleted = ids.len(), "Retention: purged old messages");
for id in ids {
state.broadcast(WsEvent::MessageDelete { id });
}
}
Err(e) => {
tracing::error!(error = %e, "Retention: failed to purge");
}
_ => {}
}
}
if max_messages > 0 {
match repo.trim_to_max(max_messages).await {
Ok(ids) if !ids.is_empty() => {
tracing::info!(deleted = ids.len(), "Retention: trimmed to max");
for id in ids {
state.broadcast(WsEvent::MessageDelete { id });
}
}
Err(e) => {
tracing::error!(error = %e, "Retention: failed to trim");
}
_ => {}
}
}
run_retention_tick(
&repo,
&state,
retention_hours,
max_messages,
OffsetDateTime::now_utc(),
)
.await;
}
})
};
Expand Down Expand Up @@ -654,3 +679,138 @@ async fn run_serve(args: ServeArgs) -> Result<()> {

Ok(())
}

#[cfg(test)]
mod retention_tests {
use super::*;
use rustmail_storage::{MessageRepository, initialize_database};
use sqlx::sqlite::SqlitePoolOptions;
use tokio::sync::broadcast;

async fn build_state() -> (MessageRepository, AppState, broadcast::Receiver<WsEvent>) {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.unwrap();
initialize_database(&pool).await.unwrap();
let repo = MessageRepository::new(pool);
let (ws_tx, ws_rx) = broadcast::channel::<WsEvent>(64);
let state = AppState::new(repo.clone(), ws_tx, None, None);
(repo, state, ws_rx)
}

fn raw_email(subject: &str) -> Vec<u8> {
format!(
"From: a@test.com\r\nTo: b@test.com\r\nSubject: {subject}\r\nContent-Type: text/plain\r\n\r\nbody"
)
.into_bytes()
}

async fn insert_sample(repo: &MessageRepository, subject: &str) -> String {
repo
.insert("a@test.com", &["b@test.com".into()], &raw_email(subject))
.await
.unwrap()
.id
}

fn drain_delete_events(rx: &mut broadcast::Receiver<WsEvent>) -> Vec<String> {
let mut ids = Vec::new();
while let Ok(event) = rx.try_recv() {
if let WsEvent::MessageDelete { id } = event {
ids.push(id);
}
}
ids
}

#[tokio::test]
async fn tick_is_noop_when_retention_and_max_disabled() {
let (repo, state, mut rx) = build_state().await;
let _id = insert_sample(&repo, "keep-me").await;

run_retention_tick(&repo, &state, 0, 0, OffsetDateTime::now_utc()).await;

assert_eq!(repo.count().await.unwrap(), 1);
assert!(drain_delete_events(&mut rx).is_empty());
}

#[tokio::test]
async fn tick_purges_messages_older_than_cutoff() {
let (repo, state, mut rx) = build_state().await;
let old_id = insert_sample(&repo, "old").await;

// Advance "now" 24h into the future with a 1h retention window so the
// stored row is older than the cutoff and gets deleted.
let future_now = OffsetDateTime::now_utc() + time::Duration::hours(24);
run_retention_tick(&repo, &state, 1, 0, future_now).await;

assert_eq!(repo.count().await.unwrap(), 0);
let events = drain_delete_events(&mut rx);
assert_eq!(events, vec![old_id]);
}

#[tokio::test]
async fn tick_preserves_rows_newer_than_cutoff() {
let (repo, state, mut rx) = build_state().await;
let fresh_id = insert_sample(&repo, "fresh").await;

// Retention window far larger than any elapsed time → nothing to purge.
run_retention_tick(&repo, &state, 24, 0, OffsetDateTime::now_utc()).await;

assert_eq!(repo.count().await.unwrap(), 1);
assert_eq!(repo.get(&fresh_id).await.unwrap().id, fresh_id);
assert!(drain_delete_events(&mut rx).is_empty());
}

#[tokio::test]
async fn tick_trims_to_max_messages() {
let (repo, state, mut rx) = build_state().await;

let mut ids = Vec::new();
for i in 0..5 {
ids.push(insert_sample(&repo, &format!("msg-{i}")).await);
// Keep ULIDs strictly monotonic so the two newest-by-id are deterministic.
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
}

run_retention_tick(&repo, &state, 0, 2, OffsetDateTime::now_utc()).await;

assert_eq!(repo.count().await.unwrap(), 2);
let events = drain_delete_events(&mut rx);
let expected_deleted: Vec<String> = ids.iter().take(3).cloned().collect();
let mut got = events.clone();
got.sort();
let mut want = expected_deleted.clone();
want.sort();
assert_eq!(got, want, "emitted delete events must match deleted ids");

// The two newest ids survive.
for surviving in ids.iter().skip(3) {
assert_eq!(repo.get(surviving).await.unwrap().id, *surviving);
}
}

#[tokio::test]
async fn tick_emits_events_for_combined_purge_and_trim() {
let (repo, state, mut rx) = build_state().await;

let mut ids = Vec::new();
for i in 0..4 {
ids.push(insert_sample(&repo, &format!("m-{i}")).await);
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
}

// Cutoff keeps all four rows; trim drops the oldest two, so exactly two
// delete events should fire and match the two oldest ids.
run_retention_tick(&repo, &state, 24, 2, OffsetDateTime::now_utc()).await;

assert_eq!(repo.count().await.unwrap(), 2);
let mut got = drain_delete_events(&mut rx);
got.sort();
let mut want: Vec<String> = ids.iter().take(2).cloned().collect();
want.sort();
assert_eq!(got, want);
}
}