From 0ad72de921703e83f3fa5ea8fc76f8bffe9728cd Mon Sep 17 00:00:00 2001 From: dazzling-no-more <278675588+dazzling-no-more@users.noreply.github.com> Date: Sun, 26 Apr 2026 23:20:04 +0400 Subject: [PATCH] perf: negative-cache unreachable destinations and grow startup pre-warm --- src/proxy_server.rs | 39 ++++++- src/tunnel_client.rs | 250 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 287 insertions(+), 2 deletions(-) diff --git a/src/proxy_server.rs b/src/proxy_server.rs index fd5eefb..585b83a 100644 --- a/src/proxy_server.rs +++ b/src/proxy_server.rs @@ -308,9 +308,17 @@ impl ProxyServer { // doesn't pay a fresh TLS handshake to Google edge. Best-effort; // failures are logged and ignored. Skipped in `google_only` — there // is no fronter to warm. + // + // Sized to roughly match a browser's parallel-connection burst at + // startup. The previous fixed `3` was fine for a single deployment + // but left requests 4-10 of the opening burst paying a cold TLS + // handshake each (~300ms). Scaling with deployment count gives + // multi-account configs a proportionally warmer pool, capped so + // single-deployment users don't hammer Google edge unnecessarily. if let Some(warm_fronter) = self.fronter.clone() { + let n = warm_fronter.num_scripts().clamp(6, 16); tokio::spawn(async move { - warm_fronter.warm(3).await; + warm_fronter.warm(n).await; }); } @@ -503,6 +511,20 @@ async fn handle_http_client( if method.eq_ignore_ascii_case("CONNECT") { let (host, port) = parse_host_port(&target); + // Mirror the SOCKS5 short-circuit: if the tunnel-node just failed + // this (host, port) with unreachable, return 502 immediately rather + // than acknowledging the CONNECT and blowing tunnel quota on a + // guaranteed retry. See `TunnelMux::is_unreachable` for context. + if let Some(ref mux) = tunnel_mux { + if mux.is_unreachable(&host, port) { + tracing::info!("CONNECT {}:{} (negative-cached, refusing)", host, port); + let _ = sock + .write_all(b"HTTP/1.1 502 Bad Gateway\r\nContent-Length: 0\r\nConnection: close\r\n\r\n") + .await; + let _ = sock.flush().await; + return Ok(()); + } + } sock.write_all(b"HTTP/1.1 200 Connection Established\r\n\r\n") .await?; sock.flush().await?; @@ -600,6 +622,21 @@ async fn handle_socks5_client( return handle_socks5_udp_associate(sock, rewrite_ctx, tunnel_mux).await; } + // Negative-cache short-circuit: if the tunnel-node just failed to reach + // this exact (host, port) with `Network is unreachable` / `No route to + // host`, reply 0x04 (Host unreachable) immediately. Saves a 1.5–2s tunnel + // round-trip on guaranteed-failing targets — the IPv6 probe retry loop + // is the main offender on devices without IPv6. + if let Some(ref mux) = tunnel_mux { + if mux.is_unreachable(&host, port) { + tracing::info!("SOCKS5 CONNECT -> {}:{} (negative-cached, refusing)", host, port); + sock.write_all(&[0x05, 0x04, 0x00, 0x01, 0, 0, 0, 0, 0, 0]) + .await?; + sock.flush().await?; + return Ok(()); + } + } + tracing::info!("SOCKS5 CONNECT -> {}:{}", host, port); // Success reply with zeroed BND. diff --git a/src/tunnel_client.rs b/src/tunnel_client.rs index a1efe93..2044c7f 100644 --- a/src/tunnel_client.rs +++ b/src/tunnel_client.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; // reason; reuse it here. `AtomicBool` works fine in std on every target. use portable_atomic::AtomicU64; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use base64::engine::general_purpose::STANDARD as B64; @@ -78,6 +78,19 @@ const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP"; /// floor, so network jitter on either side won't false-trigger. const LEGACY_DETECT_THRESHOLD: Duration = Duration::from_millis(1500); +/// How long to remember a `Network is unreachable` / `No route to host` +/// failure for a given `(host, port)`. While cached, the proxy short-circuits +/// repeat CONNECTs with an immediate "host unreachable" reply instead of +/// burning a 1.5–2s tunnel batch round-trip on a target that just failed. +/// Real motivator: IPv6-only probe hostnames (e.g. `ds6.probe.*`) on devices +/// without IPv6 — the OS retries the probe every ~1.5s for 10s+, generating +/// 5–10 wasted tunnel sessions per probe. +const UNREACHABLE_CACHE_TTL: Duration = Duration::from_secs(30); + +/// Hard cap on negative-cache size. Browsing pulls in dozens of distinct +/// hosts; we don't want a runaway map. Pruned opportunistically on insert. +const UNREACHABLE_CACHE_MAX: usize = 256; + /// Ports where the *server* speaks first (SMTP banner, SSH identification, /// POP3/IMAP greeting, FTP banner). On these, waiting for client bytes /// gains nothing and just adds handshake latency — skip the pre-read. @@ -87,6 +100,32 @@ fn is_server_speaks_first(port: u16) -> bool { matches!(port, 21 | 22 | 25 | 80 | 110 | 143 | 587) } +/// Recognize the tunnel-node's connect-error strings that mean +/// "this destination is fundamentally unreachable from the tunnel-node's +/// network right now" — distinct from refused/reset/timeout, which can be +/// transient. These come through as the inner `e` of a `TunnelResponse` +/// after the tunnel-node's std::io::Error is stringified, so we match on +/// substrings rather than `ErrorKind`. Linux: errno 101 (ENETUNREACH), +/// errno 113 (EHOSTUNREACH). Format varies a bit across libc/Tokio +/// versions, so cover both the human text and the os-error tag. +fn is_unreachable_error_str(s: &str) -> bool { + let lc = s.to_ascii_lowercase(); + lc.contains("network is unreachable") + || lc.contains("no route to host") + || lc.contains("os error 101") + || lc.contains("os error 113") +} + +/// Canonicalize a host string for use as a negative-cache key. DNS names +/// are case-insensitive and may carry a trailing root-label dot, so +/// `Example.COM:443`, `example.com:443`, and `example.com.:443` are all the +/// same destination. IPv4 / IPv6 literals are unaffected — IPv4 has no +/// letters, and `Ipv6Addr::to_string()` already emits lowercase. +fn normalize_cache_host(host: &str) -> String { + let trimmed = host.strip_suffix('.').unwrap_or(host); + trimmed.to_ascii_lowercase() +} + // --------------------------------------------------------------------------- // Multiplexer // --------------------------------------------------------------------------- @@ -159,6 +198,11 @@ pub struct TunnelMux { /// Separate monotonic counter used only to trigger the summary log /// (avoids a race where two threads both see `total % 100 == 0`). preread_total_events: AtomicU64, + /// Short-lived negative cache for targets the tunnel-node reported as + /// unreachable (`Network is unreachable` / `No route to host`). Keyed by + /// `(host, port)`, value is the expiry instant. Plain Mutex is + /// fine: it's touched once per CONNECT (cheap) and once per failure. + unreachable_cache: Mutex>, } impl TunnelMux { @@ -181,6 +225,7 @@ impl TunnelMux { preread_skip_unsupported: AtomicU64::new(0), preread_win_total_us: AtomicU64::new(0), preread_total_events: AtomicU64::new(0), + unreachable_cache: Mutex::new(HashMap::new()), }) } @@ -254,6 +299,71 @@ impl TunnelMux { } } + /// Returns true if `(host, port)` has a non-expired unreachable entry. + /// The proxy front-end uses this to skip the tunnel and reply + /// "host unreachable" immediately on follow-up CONNECTs. + pub fn is_unreachable(&self, host: &str, port: u16) -> bool { + let now = Instant::now(); + let mut cache = match self.unreachable_cache.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + let key = (normalize_cache_host(host), port); + match cache.get(&key) { + Some(expiry) if *expiry > now => true, + Some(_) => { + cache.remove(&key); + false + } + None => false, + } + } + + /// If `err` looks like a network-unreachable / no-route-to-host error + /// from the tunnel-node, remember the target for `UNREACHABLE_CACHE_TTL`. + /// No-op for any other error (timeouts, refused, EOF, etc.) — those can + /// be transient and we don't want to lock out a host on a flaky moment. + fn record_unreachable_if_match(&self, host: &str, port: u16, err: &str) { + if !is_unreachable_error_str(err) { + return; + } + let mut cache = match self.unreachable_cache.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + // Cap enforcement is two-stage: first drop anything already expired, + // then if we're STILL at/above the cap (i.e. an unbounded burst of + // unique unreachable hosts within the TTL), evict the entry that + // would expire soonest. This bounds the map size at all times — a + // pure `retain` on expiry alone would let the map grow unbounded + // until the first entry's TTL elapses. + if cache.len() >= UNREACHABLE_CACHE_MAX { + let now = Instant::now(); + cache.retain(|_, expiry| *expiry > now); + while cache.len() >= UNREACHABLE_CACHE_MAX { + let victim = cache + .iter() + .min_by_key(|(_, expiry)| **expiry) + .map(|(k, _)| k.clone()); + match victim { + Some(k) => { + cache.remove(&k); + } + None => break, + } + } + } + let key = (normalize_cache_host(host), port); + cache.insert(key, Instant::now() + UNREACHABLE_CACHE_TTL); + tracing::debug!( + "negative-cached {}:{} for {:?} ({})", + host, + port, + UNREACHABLE_CACHE_TTL, + err + ); + } + fn record_preread_win(&self, port: u16, elapsed: Duration) { self.preread_win.fetch_add(1, Ordering::Relaxed); self.preread_win_total_us @@ -723,6 +833,11 @@ async fn connect_plain(host: &str, port: u16, mux: &Arc) -> std::io:: Ok(Ok(resp)) => { if let Some(ref e) = resp.e { tracing::error!("tunnel connect error for {}:{}: {}", host, port, e); + // Only cache here: `resp.e` is the tunnel-node's own connect() + // result against the target. The outer `Ok(Err(_))` arm below + // is a transport-level failure (relay → Apps Script → tunnel- + // node never reached) and tells us nothing about the target. + mux.record_unreachable_if_match(host, port, e); return Err(std::io::Error::new( std::io::ErrorKind::ConnectionRefused, e.clone(), @@ -769,6 +884,9 @@ async fn connect_with_initial_data( return Ok(ConnectDataOutcome::Unsupported); } tracing::error!("tunnel connect_data error for {}:{}: {}", host, port, e); + // Outer transport failure (relay/Apps Script never reached the + // tunnel-node). Don't poison the destination cache from here — + // see `connect_plain` for the same reasoning. return Err(std::io::Error::new( std::io::ErrorKind::ConnectionRefused, e, @@ -794,6 +912,8 @@ async fn connect_with_initial_data( if let Some(ref e) = resp.e { tracing::error!("tunnel connect_data error for {}:{}: {}", host, port, e); + // `resp.e` is the tunnel-node's own connect result — cache it. + mux.record_unreachable_if_match(host, port, e); return Err(std::io::Error::new( std::io::ErrorKind::ConnectionRefused, e.clone(), @@ -1095,6 +1215,133 @@ mod tests { ))); } + #[test] + fn unreachable_error_str_matches_expected_variants() { + assert!(is_unreachable_error_str( + "connect failed: Network is unreachable (os error 101)" + )); + assert!(is_unreachable_error_str("No route to host")); + assert!(is_unreachable_error_str("os error 113")); + // Case-insensitive. + assert!(is_unreachable_error_str( + "CONNECT FAILED: NETWORK IS UNREACHABLE" + )); + } + + #[test] + fn unreachable_error_str_rejects_unrelated() { + assert!(!is_unreachable_error_str("connection refused")); + assert!(!is_unreachable_error_str("connect timed out")); + assert!(!is_unreachable_error_str("connection reset by peer")); + assert!(!is_unreachable_error_str("")); + } + + #[test] + fn negative_cache_records_and_short_circuits() { + let (mux, _rx) = mux_for_test(); + // Initially nothing is cached. + assert!(!mux.is_unreachable("ds6.probe.example", 443)); + // Record a matching error. + mux.record_unreachable_if_match( + "ds6.probe.example", + 443, + "connect failed: Network is unreachable (os error 101)", + ); + assert!(mux.is_unreachable("ds6.probe.example", 443)); + // A different port for the same host is its own entry. + assert!(!mux.is_unreachable("ds6.probe.example", 80)); + } + + #[test] + fn negative_cache_ignores_non_unreachable_errors() { + let (mux, _rx) = mux_for_test(); + mux.record_unreachable_if_match( + "example.com", + 443, + "connect failed: connection refused", + ); + assert!(!mux.is_unreachable("example.com", 443)); + } + + #[test] + fn negative_cache_normalizes_host_keys() { + let (mux, _rx) = mux_for_test(); + // Cache under one casing/format... + mux.record_unreachable_if_match( + "Example.COM.", + 443, + "Network is unreachable (os error 101)", + ); + // ...and look up under several equivalent forms. + assert!(mux.is_unreachable("example.com", 443)); + assert!(mux.is_unreachable("EXAMPLE.com", 443)); + assert!(mux.is_unreachable("example.com.", 443)); + // Different host should still miss. + assert!(!mux.is_unreachable("other.com", 443)); + } + + /// Outer `Ok(Err(_))` from the mux channel means "the relay never + /// reached the tunnel-node" (HTTP/TLS to Apps Script failed, batch + /// timed out, etc.) — the destination wasn't even attempted. Even if + /// that error string contains "Network is unreachable" (e.g. the + /// client device's WAN was momentarily down), it must NOT poison the + /// destination cache, or every host the user touched during a + /// connectivity blip stays refused for 30s. + #[tokio::test] + async fn negative_cache_skips_outer_relay_errors() { + let (mux, mut rx) = mux_for_test(); + let mux_for_task = mux.clone(); + let task = tokio::spawn(async move { + connect_plain("real.target.example", 443, &mux_for_task).await + }); + + // Receive the Connect msg and reply with an outer Err whose string + // would otherwise match `is_unreachable_error_str`. + let msg = rx.recv().await.expect("connect msg"); + let reply = match msg { + MuxMsg::Connect { reply, .. } => reply, + other => panic!("expected Connect, got {:?}", std::mem::discriminant(&other)), + }; + let _ = reply.send(Err( + "relay failed: Network is unreachable (os error 101)".into(), + )); + + let res = task.await.expect("task"); + assert!(res.is_err(), "connect_plain should surface the error"); + assert!( + !mux.is_unreachable("real.target.example", 443), + "outer relay error must not negative-cache the destination" + ); + } + + #[test] + fn negative_cache_enforces_hard_cap_under_unique_burst() { + let (mux, _rx) = mux_for_test(); + // Insert enough unique still-live entries to exceed the cap. The + // map size must never exceed UNREACHABLE_CACHE_MAX, even though + // every entry is fresh and `retain(expired)` prunes nothing. + let burst = UNREACHABLE_CACHE_MAX + 50; + for i in 0..burst { + let host = format!("h{}.example", i); + mux.record_unreachable_if_match( + &host, + 443, + "connect failed: Network is unreachable (os error 101)", + ); + } + let len = mux + .unreachable_cache + .lock() + .map(|g| g.len()) + .unwrap_or(0); + assert!( + len <= UNREACHABLE_CACHE_MAX, + "cache size {} exceeded cap {}", + len, + UNREACHABLE_CACHE_MAX + ); + } + #[test] fn server_speaks_first_covers_common_protocols() { for p in [21u16, 22, 25, 80, 110, 143, 587] { @@ -1128,6 +1375,7 @@ mod tests { preread_skip_unsupported: AtomicU64::new(0), preread_win_total_us: AtomicU64::new(0), preread_total_events: AtomicU64::new(0), + unreachable_cache: Mutex::new(HashMap::new()), }); (mux, rx) }