diff --git a/crates/rustmail-server/src/main.rs b/crates/rustmail-server/src/main.rs index e6fdde3..879da92 100644 --- a/crates/rustmail-server/src/main.rs +++ b/crates/rustmail-server/src/main.rs @@ -551,17 +551,19 @@ async fn run_serve(args: ServeArgs) -> Result<()> { tokio::select! { result = smtp_server.run() => { - if let Err(e) = result { - tracing::error!(error = %e, "SMTP server error"); + match result { + Err(e) => anyhow::bail!("SMTP server failed: {e}"), + Ok(()) => anyhow::bail!("SMTP server exited unexpectedly"), } } result = axum::serve(listener, app) => { - if let Err(e) = result { - tracing::error!(error = %e, "HTTP server error"); + match result { + Err(e) => anyhow::bail!("HTTP server failed: {e}"), + Ok(()) => anyhow::bail!("HTTP server exited unexpectedly"), } } _ = message_processor => { - tracing::error!("Message processor stopped unexpectedly"); + anyhow::bail!("Message processor stopped unexpectedly"); } _ = retention_task => {} } diff --git a/crates/rustmail-server/tests/integration.rs b/crates/rustmail-server/tests/integration.rs index 425dbff..45f80f7 100644 --- a/crates/rustmail-server/tests/integration.rs +++ b/crates/rustmail-server/tests/integration.rs @@ -699,20 +699,20 @@ async fn smtp_session_limit_rejects_excess() { tokio::time::sleep(Duration::from_millis(200)).await; - // The 101st connection should be silently dropped by the semaphore. - // The server accepts the TCP connection but immediately continues without - // spawning a session, so we expect: EOF (0 bytes), timeout, or IO error. + // The 101st connection should receive a 421 response and then be closed. let mut probe = TcpStream::connect(addr).await.unwrap(); - let result = tokio::time::timeout(Duration::from_secs(2), async { - let mut buf = [0u8; 512]; - probe.read(&mut buf).await - }) - .await; + let mut buf = [0u8; 512]; + let result = + tokio::time::timeout(Duration::from_secs(2), async { probe.read(&mut buf).await }).await; match result { Ok(Ok(0)) | Err(_) | Ok(Err(_)) => {} Ok(Ok(n)) => { - panic!("Expected connection to be rejected, but got {n} bytes"); + let response = std::str::from_utf8(&buf[..n]).unwrap_or(""); + assert!( + response.starts_with("421"), + "Expected 421 rejection, got: {response}" + ); } } diff --git a/crates/rustmail-smtp/src/server.rs b/crates/rustmail-smtp/src/server.rs index f236af0..e93bd95 100644 --- a/crates/rustmail-smtp/src/server.rs +++ b/crates/rustmail-smtp/src/server.rs @@ -5,6 +5,8 @@ use tokio::net::TcpListener; use tokio::sync::{Semaphore, mpsc}; use tracing::{error, info, warn}; +use tokio::io::AsyncWriteExt; + use crate::message::ReceivedMessage; use crate::session::Session; @@ -60,11 +62,14 @@ impl SmtpServer { loop { match listener.accept().await { - Ok((stream, peer)) => { + Ok((mut stream, peer)) => { let permit = match semaphore.clone().try_acquire_owned() { Ok(permit) => permit, Err(_) => { warn!(peer = %peer, "SMTP connection rejected: max concurrent sessions reached"); + let _ = stream + .write_all(b"421 Service not available, too many connections\r\n") + .await; continue; } }; diff --git a/crates/rustmail-smtp/src/session.rs b/crates/rustmail-smtp/src/session.rs index ec38392..9a501a9 100644 --- a/crates/rustmail-smtp/src/session.rs +++ b/crates/rustmail-smtp/src/session.rs @@ -202,8 +202,14 @@ impl Session { raw: data, }; - let _ = self.sender.send(message).await; - self.write(OK).await?; + if self.sender.send(message).await.is_err() { + warn!(peer = %self.peer, "Channel closed, message not stored"); + self + .write("451 Requested action aborted: local error in processing\r\n") + .await?; + } else { + self.write(OK).await?; + } self.mail_from = None; self.rcpt_to.clear(); Ok(())