diff --git a/crates/rustmail-api/src/lib.rs b/crates/rustmail-api/src/lib.rs index de247bf..ccb4bf7 100644 --- a/crates/rustmail-api/src/lib.rs +++ b/crates/rustmail-api/src/lib.rs @@ -40,7 +40,7 @@ use axum::Router; use axum::http::HeaderValue; use axum::routing::{delete, get, patch, post}; use tower_http::compression::CompressionLayer; -use tower_http::cors::{AllowOrigin, CorsLayer}; +use tower_http::cors::CorsLayer; use tower_http::set_header::SetResponseHeaderLayer; use tower_http::trace::TraceLayer; @@ -73,7 +73,7 @@ pub fn router(state: AppState) -> Router { .route("/ws", get(ws::ws_handler)); let cors = CorsLayer::new() - .allow_origin(AllowOrigin::mirror_request()) + .allow_origin(tower_http::cors::Any) .allow_methods(tower_http::cors::Any) .allow_headers(tower_http::cors::Any); diff --git a/crates/rustmail-api/src/ws.rs b/crates/rustmail-api/src/ws.rs index e74f63d..7c5cc33 100644 --- a/crates/rustmail-api/src/ws.rs +++ b/crates/rustmail-api/src/ws.rs @@ -3,6 +3,7 @@ use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade}; use axum::http::StatusCode; use axum::response::IntoResponse; use std::time::Duration; +use tokio::sync::broadcast::error::RecvError; use tracing::{debug, warn}; use crate::state::AppState; @@ -49,7 +50,11 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) { break; } } - Err(_) => break, + Err(RecvError::Lagged(n)) => { + warn!(missed = n, "WebSocket client lagged, skipping missed events"); + continue; + } + Err(RecvError::Closed) => break, } } msg = socket.recv() => { diff --git a/crates/rustmail-server/src/main.rs b/crates/rustmail-server/src/main.rs index e6fdde3..1b089eb 100644 --- a/crates/rustmail-server/src/main.rs +++ b/crates/rustmail-server/src/main.rs @@ -1,4 +1,5 @@ use std::path::PathBuf; +use std::sync::Arc; use anyhow::Result; use clap::{Parser, Subcommand}; @@ -442,6 +443,7 @@ async fn run_serve(args: ServeArgs) -> Result<()> { let webhook_client = args.webhook_url.as_ref().map(|_| reqwest::Client::new()); let webhook_url = args.webhook_url.clone(); + let webhook_semaphore = Arc::new(tokio::sync::Semaphore::new(10)); let message_processor = { let repo = repo.clone(); @@ -459,7 +461,12 @@ async fn run_serve(args: ServeArgs) -> Result<()> { let client = client.clone(); let url = url.clone(); let payload = summary; + let sem = webhook_semaphore.clone(); tokio::spawn(async move { + let _permit = match sem.acquire().await { + Ok(p) => p, + Err(_) => return, + }; if let Err(e) = client .post(&url) .json(&payload) diff --git a/crates/rustmail-storage/src/repo.rs b/crates/rustmail-storage/src/repo.rs index f292344..aa96da6 100644 --- a/crates/rustmail-storage/src/repo.rs +++ b/crates/rustmail-storage/src/repo.rs @@ -222,7 +222,7 @@ impl MessageRepository { fn sanitize_fts_query(query: &str) -> Option { let sanitized: String = query .chars() - .filter(|c| c.is_alphanumeric() || matches!(c, ' ' | '@' | '.' | '-')) + .filter(|c| c.is_alphanumeric() || matches!(c, ' ' | '@' | '.' | '-' | '+' | '_')) .collect(); if sanitized.trim().is_empty() { return None; diff --git a/crates/rustmail-tui/src/api.rs b/crates/rustmail-tui/src/api.rs index 1a99c8c..db4c49d 100644 --- a/crates/rustmail-tui/src/api.rs +++ b/crates/rustmail-tui/src/api.rs @@ -91,25 +91,32 @@ impl ApiClient { req = req.query(&[("q", q)]); } - let resp = req.send().await?.json().await?; + let resp = req.send().await?.error_for_status()?.json().await?; Ok(resp) } pub async fn get_message(&self, id: &str) -> Result { let url = format!("{}/api/v1/messages/{}", self.base_url, id); - let resp = self.client.get(&url).send().await?.json().await?; + let resp = self + .client + .get(&url) + .send() + .await? + .error_for_status()? + .json() + .await?; Ok(resp) } pub async fn delete_message(&self, id: &str) -> Result<()> { let url = format!("{}/api/v1/messages/{}", self.base_url, id); - self.client.delete(&url).send().await?; + self.client.delete(&url).send().await?.error_for_status()?; Ok(()) } pub async fn delete_all_messages(&self) -> Result<()> { let url = format!("{}/api/v1/messages", self.base_url); - self.client.delete(&url).send().await?; + self.client.delete(&url).send().await?.error_for_status()?; Ok(()) } @@ -127,13 +134,26 @@ impl ApiClient { if let Some(v) = is_starred { body.insert("is_starred".into(), serde_json::Value::Bool(v)); } - self.client.patch(&url).json(&body).send().await?; + self + .client + .patch(&url) + .json(&body) + .send() + .await? + .error_for_status()?; Ok(()) } pub async fn get_raw_message(&self, id: &str) -> Result { let url = format!("{}/api/v1/messages/{}/raw", self.base_url, id); - let resp = self.client.get(&url).send().await?.text().await?; + let resp = self + .client + .get(&url) + .send() + .await? + .error_for_status()? + .text() + .await?; Ok(resp) } }