diff --git a/src/config.rs b/src/config.rs index 74d0815..39ca2f2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -78,6 +78,12 @@ pub struct Config { pub hosts: HashMap, #[serde(default)] pub enable_batching: bool, + #[serde(default = "default_batch_timeout_ms")] + pub batch_timeout_ms: u64, + #[serde(default = "default_batch_coalesce_window_ms")] + pub batch_coalesce_window_ms: u64, + #[serde(default = "default_max_batch_ops")] + pub max_batch_ops: usize, /// Optional upstream SOCKS5 proxy for non-HTTP / raw-TCP traffic /// (e.g. `"127.0.0.1:50529"` pointing at a local xray / v2ray instance). /// When set, the SOCKS5 listener forwards raw-TCP flows through it @@ -164,7 +170,9 @@ pub struct Config { #[serde(default)] pub passthrough_hosts: Vec, } - +fn default_batch_timeout_ms() -> u64 { 10000 } +fn default_batch_coalesce_window_ms() -> u64 { 8 } +fn default_max_batch_ops() -> usize { 50 } fn default_fetch_ips_from_api() -> bool { false } fn default_max_ips_to_scan() -> usize { 100 } fn default_scan_batch_size() -> usize {500} @@ -220,6 +228,12 @@ impl Config { } } } + if self.batch_timeout_ms == 0 { + return Err(ConfigError::Invalid("batch_timeout_ms must be greater than 0".into())); + } + if self.max_batch_ops == 0 { + return Err(ConfigError::Invalid("max_batch_ops must be greater than 0".into())); + } if self.scan_batch_size == 0 { return Err(ConfigError::Invalid( "scan_batch_size must be greater than 0".into(), diff --git a/src/tunnel_client.rs b/src/tunnel_client.rs index a1efe93..e6108b0 100644 --- a/src/tunnel_client.rs +++ b/src/tunnel_client.rs @@ -41,19 +41,19 @@ const MAX_BATCH_OPS: usize = 50; /// Timeout for a single batch HTTP round-trip. If the tunnel-node or Apps /// Script takes longer than this, the batch fails and sessions get error /// replies rather than hanging forever. -const BATCH_TIMEOUT: Duration = Duration::from_secs(30); +/// const BATCH_TIMEOUT: Duration = Duration::from_secs(30); /// Timeout for a session waiting for its batch reply. If the batch task /// is slow (e.g. one op in the batch has a dead target on the tunnel-node /// side), the session gives up and retries on the next tick rather than /// blocking indefinitely. -const REPLY_TIMEOUT: Duration = Duration::from_secs(35); +/// const REPLY_TIMEOUT: Duration = Duration::from_secs(35); /// How long we'll briefly hold the client socket after the local /// CONNECT/SOCKS5 handshake, waiting for the client's first bytes (the /// TLS ClientHello for HTTPS). Bundling those bytes with the tunnel-node /// connect saves one Apps Script round-trip per new flow. -const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50); +/// const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50); /// How long the muxer holds open the batch buffer after the first op /// arrives, waiting for more ops to coalesce. Issue #231 — the previous @@ -63,7 +63,7 @@ const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50); /// vs the ~2-7 s Apps Script round-trip the batch is amortizing, but /// long enough that concurrent HTTP/2 stream openings, parallel fetches, /// or any other burst lands in the same batch. -const BATCH_COALESCE_WINDOW: Duration = Duration::from_millis(8); +/// const BATCH_COALESCE_WINDOW: Duration = Duration::from_millis(8); /// Structured error code the tunnel-node returns when it doesn't know the /// op (version mismatch). Must match `tunnel-node/src/main.rs`. @@ -337,7 +337,7 @@ async fn mux_loop(mut rx: mpsc::Receiver, fronter: Arc) { Some(msg) => msgs.push(msg), None => break, } - let deadline = tokio::time::Instant::now() + BATCH_COALESCE_WINDOW; + let deadline = tokio::time::Instant::now() + Duration::from_millis(config.batch_coalesce_window_ms); loop { // Drain anything that's already queued without waiting. while let Ok(msg) = rx.try_recv() { @@ -570,9 +570,16 @@ async fn fire_batch( // Bounded-wait: if the batch takes longer than BATCH_TIMEOUT, // all sessions in this batch get an error and can retry. + + // let result = tokio::time::timeout( + // BATCH_TIMEOUT, + // f.tunnel_batch_request_to(&script_id, &data_ops), + //) + //.await; + let result = tokio::time::timeout( - BATCH_TIMEOUT, - f.tunnel_batch_request_to(&script_id, &data_ops), + Duration::from_millis(config.batch_timeout_ms), + f.tunnel_batch_request_to(&script_id, &data_ops), ) .await; tracing::info!( @@ -634,7 +641,8 @@ pub async fn tunnel_connection( } else { let mut buf = vec![0u8; 65536]; let t0 = Instant::now(); - match tokio::time::timeout(CLIENT_FIRST_DATA_WAIT, sock.read(&mut buf)).await { + // match tokio::time::timeout(CLIENT_FIRST_DATA_WAIT, sock.read(&mut buf)).await { + match tokio::time::timeout(Duration::from_millis(config.client_first_data_wait_ms), sock.read(&mut buf)).await { Ok(Ok(0)) => return Ok(()), Ok(Ok(n)) => { mux.record_preread_win(port, t0.elapsed());