Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/rustmail-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use axum::Router;
use axum::http::HeaderValue;
use axum::routing::{delete, get, patch, post};
use tower_http::compression::CompressionLayer;
use tower_http::cors::{AllowOrigin, CorsLayer};
use tower_http::cors::CorsLayer;
use tower_http::set_header::SetResponseHeaderLayer;
use tower_http::trace::TraceLayer;

Expand Down Expand Up @@ -73,7 +73,7 @@ pub fn router(state: AppState) -> Router {
.route("/ws", get(ws::ws_handler));

let cors = CorsLayer::new()
.allow_origin(AllowOrigin::mirror_request())
.allow_origin(tower_http::cors::Any)
.allow_methods(tower_http::cors::Any)
.allow_headers(tower_http::cors::Any);

Expand Down
7 changes: 6 additions & 1 deletion crates/rustmail-api/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;
use tracing::{debug, warn};

use crate::state::AppState;
Expand Down Expand Up @@ -49,7 +50,11 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
break;
}
}
Err(_) => break,
Err(RecvError::Lagged(n)) => {
warn!(missed = n, "WebSocket client lagged, skipping missed events");
continue;
}
Err(RecvError::Closed) => break,
}
}
msg = socket.recv() => {
Expand Down
7 changes: 7 additions & 0 deletions crates/rustmail-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Result;
use clap::{Parser, Subcommand};
Expand Down Expand Up @@ -442,6 +443,7 @@ async fn run_serve(args: ServeArgs) -> Result<()> {

let webhook_client = args.webhook_url.as_ref().map(|_| reqwest::Client::new());
let webhook_url = args.webhook_url.clone();
let webhook_semaphore = Arc::new(tokio::sync::Semaphore::new(10));

let message_processor = {
let repo = repo.clone();
Expand All @@ -459,7 +461,12 @@ async fn run_serve(args: ServeArgs) -> Result<()> {
let client = client.clone();
let url = url.clone();
let payload = summary;
let sem = webhook_semaphore.clone();
tokio::spawn(async move {
let _permit = match sem.acquire().await {
Ok(p) => p,
Err(_) => return,
};
if let Err(e) = client
.post(&url)
.json(&payload)
Expand Down
2 changes: 1 addition & 1 deletion crates/rustmail-storage/src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl MessageRepository {
fn sanitize_fts_query(query: &str) -> Option<String> {
let sanitized: String = query
.chars()
.filter(|c| c.is_alphanumeric() || matches!(c, ' ' | '@' | '.' | '-'))
.filter(|c| c.is_alphanumeric() || matches!(c, ' ' | '@' | '.' | '-' | '+' | '_'))
.collect();
if sanitized.trim().is_empty() {
return None;
Expand Down
32 changes: 26 additions & 6 deletions crates/rustmail-tui/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,32 @@ impl ApiClient {
req = req.query(&[("q", q)]);
}

let resp = req.send().await?.json().await?;
let resp = req.send().await?.error_for_status()?.json().await?;
Ok(resp)
}

pub async fn get_message(&self, id: &str) -> Result<Message> {
let url = format!("{}/api/v1/messages/{}", self.base_url, id);
let resp = self.client.get(&url).send().await?.json().await?;
let resp = self
.client
.get(&url)
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(resp)
}

pub async fn delete_message(&self, id: &str) -> Result<()> {
let url = format!("{}/api/v1/messages/{}", self.base_url, id);
self.client.delete(&url).send().await?;
self.client.delete(&url).send().await?.error_for_status()?;
Ok(())
}

pub async fn delete_all_messages(&self) -> Result<()> {
let url = format!("{}/api/v1/messages", self.base_url);
self.client.delete(&url).send().await?;
self.client.delete(&url).send().await?.error_for_status()?;
Ok(())
}

Expand All @@ -127,13 +134,26 @@ impl ApiClient {
if let Some(v) = is_starred {
body.insert("is_starred".into(), serde_json::Value::Bool(v));
}
self.client.patch(&url).json(&body).send().await?;
self
.client
.patch(&url)
.json(&body)
.send()
.await?
.error_for_status()?;
Ok(())
}

pub async fn get_raw_message(&self, id: &str) -> Result<String> {
let url = format!("{}/api/v1/messages/{}/raw", self.base_url, id);
let resp = self.client.get(&url).send().await?.text().await?;
let resp = self
.client
.get(&url)
.send()
.await?
.error_for_status()?
.text()
.await?;
Ok(resp)
}
}