Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion src/proxy_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,17 @@ impl ProxyServer {
// doesn't pay a fresh TLS handshake to Google edge. Best-effort;
// failures are logged and ignored. Skipped in `google_only` — there
// is no fronter to warm.
//
// Sized to roughly match a browser's parallel-connection burst at
// startup. The previous fixed `3` was fine for a single deployment
// but left requests 4-10 of the opening burst paying a cold TLS
// handshake each (~300ms). Scaling with deployment count gives
// multi-account configs a proportionally warmer pool, capped so
// single-deployment users don't hammer Google edge unnecessarily.
if let Some(warm_fronter) = self.fronter.clone() {
let n = warm_fronter.num_scripts().clamp(6, 16);
tokio::spawn(async move {
warm_fronter.warm(3).await;
warm_fronter.warm(n).await;
});
}

Expand Down Expand Up @@ -503,6 +511,20 @@ async fn handle_http_client(

if method.eq_ignore_ascii_case("CONNECT") {
let (host, port) = parse_host_port(&target);
// Mirror the SOCKS5 short-circuit: if the tunnel-node just failed
// this (host, port) with unreachable, return 502 immediately rather
// than acknowledging the CONNECT and blowing tunnel quota on a
// guaranteed retry. See `TunnelMux::is_unreachable` for context.
if let Some(ref mux) = tunnel_mux {
if mux.is_unreachable(&host, port) {
tracing::info!("CONNECT {}:{} (negative-cached, refusing)", host, port);
let _ = sock
.write_all(b"HTTP/1.1 502 Bad Gateway\r\nContent-Length: 0\r\nConnection: close\r\n\r\n")
.await;
let _ = sock.flush().await;
return Ok(());
}
}
sock.write_all(b"HTTP/1.1 200 Connection Established\r\n\r\n")
.await?;
sock.flush().await?;
Expand Down Expand Up @@ -600,6 +622,21 @@ async fn handle_socks5_client(
return handle_socks5_udp_associate(sock, rewrite_ctx, tunnel_mux).await;
}

// Negative-cache short-circuit: if the tunnel-node just failed to reach
// this exact (host, port) with `Network is unreachable` / `No route to
// host`, reply 0x04 (Host unreachable) immediately. Saves a 1.5–2s tunnel
// round-trip on guaranteed-failing targets — the IPv6 probe retry loop
// is the main offender on devices without IPv6.
if let Some(ref mux) = tunnel_mux {
if mux.is_unreachable(&host, port) {
tracing::info!("SOCKS5 CONNECT -> {}:{} (negative-cached, refusing)", host, port);
sock.write_all(&[0x05, 0x04, 0x00, 0x01, 0, 0, 0, 0, 0, 0])
.await?;
sock.flush().await?;
return Ok(());
}
}

tracing::info!("SOCKS5 CONNECT -> {}:{}", host, port);

// Success reply with zeroed BND.
Expand Down
250 changes: 249 additions & 1 deletion src/tunnel_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::collections::HashMap;
// reason; reuse it here. `AtomicBool` works fine in std on every target.
use portable_atomic::AtomicU64;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use base64::engine::general_purpose::STANDARD as B64;
Expand Down Expand Up @@ -78,6 +78,19 @@ const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP";
/// floor, so network jitter on either side won't false-trigger.
const LEGACY_DETECT_THRESHOLD: Duration = Duration::from_millis(1500);

/// How long to remember a `Network is unreachable` / `No route to host`
/// failure for a given `(host, port)`. While cached, the proxy short-circuits
/// repeat CONNECTs with an immediate "host unreachable" reply instead of
/// burning a 1.5–2s tunnel batch round-trip on a target that just failed.
/// Real motivator: IPv6-only probe hostnames (e.g. `ds6.probe.*`) on devices
/// without IPv6 — the OS retries the probe every ~1.5s for 10s+, generating
/// 5–10 wasted tunnel sessions per probe.
const UNREACHABLE_CACHE_TTL: Duration = Duration::from_secs(30);

/// Hard cap on negative-cache size. Browsing pulls in dozens of distinct
/// hosts; we don't want a runaway map. Pruned opportunistically on insert.
const UNREACHABLE_CACHE_MAX: usize = 256;

/// Ports where the *server* speaks first (SMTP banner, SSH identification,
/// POP3/IMAP greeting, FTP banner). On these, waiting for client bytes
/// gains nothing and just adds handshake latency — skip the pre-read.
Expand All @@ -87,6 +100,32 @@ fn is_server_speaks_first(port: u16) -> bool {
matches!(port, 21 | 22 | 25 | 80 | 110 | 143 | 587)
}

/// Recognize the tunnel-node's connect-error strings that mean
/// "this destination is fundamentally unreachable from the tunnel-node's
/// network right now" — distinct from refused/reset/timeout, which can be
/// transient. These come through as the inner `e` of a `TunnelResponse`
/// after the tunnel-node's std::io::Error is stringified, so we match on
/// substrings rather than `ErrorKind`. Linux: errno 101 (ENETUNREACH),
/// errno 113 (EHOSTUNREACH). Format varies a bit across libc/Tokio
/// versions, so cover both the human text and the os-error tag.
fn is_unreachable_error_str(s: &str) -> bool {
let lc = s.to_ascii_lowercase();
lc.contains("network is unreachable")
|| lc.contains("no route to host")
|| lc.contains("os error 101")
|| lc.contains("os error 113")
}

/// Canonicalize a host string for use as a negative-cache key. DNS names
/// are case-insensitive and may carry a trailing root-label dot, so
/// `Example.COM:443`, `example.com:443`, and `example.com.:443` are all the
/// same destination. IPv4 / IPv6 literals are unaffected — IPv4 has no
/// letters, and `Ipv6Addr::to_string()` already emits lowercase.
fn normalize_cache_host(host: &str) -> String {
let trimmed = host.strip_suffix('.').unwrap_or(host);
trimmed.to_ascii_lowercase()
}

// ---------------------------------------------------------------------------
// Multiplexer
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -159,6 +198,11 @@ pub struct TunnelMux {
/// Separate monotonic counter used only to trigger the summary log
/// (avoids a race where two threads both see `total % 100 == 0`).
preread_total_events: AtomicU64,
/// Short-lived negative cache for targets the tunnel-node reported as
/// unreachable (`Network is unreachable` / `No route to host`). Keyed by
/// `(host, port)`, value is the expiry instant. Plain Mutex<HashMap> is
/// fine: it's touched once per CONNECT (cheap) and once per failure.
unreachable_cache: Mutex<HashMap<(String, u16), Instant>>,
}

impl TunnelMux {
Expand All @@ -181,6 +225,7 @@ impl TunnelMux {
preread_skip_unsupported: AtomicU64::new(0),
preread_win_total_us: AtomicU64::new(0),
preread_total_events: AtomicU64::new(0),
unreachable_cache: Mutex::new(HashMap::new()),
})
}

Expand Down Expand Up @@ -254,6 +299,71 @@ impl TunnelMux {
}
}

/// Returns true if `(host, port)` has a non-expired unreachable entry.
/// The proxy front-end uses this to skip the tunnel and reply
/// "host unreachable" immediately on follow-up CONNECTs.
pub fn is_unreachable(&self, host: &str, port: u16) -> bool {
let now = Instant::now();
let mut cache = match self.unreachable_cache.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
let key = (normalize_cache_host(host), port);
match cache.get(&key) {
Some(expiry) if *expiry > now => true,
Some(_) => {
cache.remove(&key);
false
}
None => false,
}
}

/// If `err` looks like a network-unreachable / no-route-to-host error
/// from the tunnel-node, remember the target for `UNREACHABLE_CACHE_TTL`.
/// No-op for any other error (timeouts, refused, EOF, etc.) — those can
/// be transient and we don't want to lock out a host on a flaky moment.
fn record_unreachable_if_match(&self, host: &str, port: u16, err: &str) {
if !is_unreachable_error_str(err) {
return;
}
let mut cache = match self.unreachable_cache.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
// Cap enforcement is two-stage: first drop anything already expired,
// then if we're STILL at/above the cap (i.e. an unbounded burst of
// unique unreachable hosts within the TTL), evict the entry that
// would expire soonest. This bounds the map size at all times — a
// pure `retain` on expiry alone would let the map grow unbounded
// until the first entry's TTL elapses.
if cache.len() >= UNREACHABLE_CACHE_MAX {
let now = Instant::now();
cache.retain(|_, expiry| *expiry > now);
while cache.len() >= UNREACHABLE_CACHE_MAX {
let victim = cache
.iter()
.min_by_key(|(_, expiry)| **expiry)
.map(|(k, _)| k.clone());
match victim {
Some(k) => {
cache.remove(&k);
}
None => break,
}
}
}
let key = (normalize_cache_host(host), port);
cache.insert(key, Instant::now() + UNREACHABLE_CACHE_TTL);
tracing::debug!(
"negative-cached {}:{} for {:?} ({})",
host,
port,
UNREACHABLE_CACHE_TTL,
err
);
}

fn record_preread_win(&self, port: u16, elapsed: Duration) {
self.preread_win.fetch_add(1, Ordering::Relaxed);
self.preread_win_total_us
Expand Down Expand Up @@ -723,6 +833,11 @@ async fn connect_plain(host: &str, port: u16, mux: &Arc<TunnelMux>) -> std::io::
Ok(Ok(resp)) => {
if let Some(ref e) = resp.e {
tracing::error!("tunnel connect error for {}:{}: {}", host, port, e);
// Only cache here: `resp.e` is the tunnel-node's own connect()
// result against the target. The outer `Ok(Err(_))` arm below
// is a transport-level failure (relay → Apps Script → tunnel-
// node never reached) and tells us nothing about the target.
mux.record_unreachable_if_match(host, port, e);
return Err(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
e.clone(),
Expand Down Expand Up @@ -769,6 +884,9 @@ async fn connect_with_initial_data(
return Ok(ConnectDataOutcome::Unsupported);
}
tracing::error!("tunnel connect_data error for {}:{}: {}", host, port, e);
// Outer transport failure (relay/Apps Script never reached the
// tunnel-node). Don't poison the destination cache from here —
// see `connect_plain` for the same reasoning.
return Err(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
e,
Expand All @@ -794,6 +912,8 @@ async fn connect_with_initial_data(

if let Some(ref e) = resp.e {
tracing::error!("tunnel connect_data error for {}:{}: {}", host, port, e);
// `resp.e` is the tunnel-node's own connect result — cache it.
mux.record_unreachable_if_match(host, port, e);
return Err(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
e.clone(),
Expand Down Expand Up @@ -1095,6 +1215,133 @@ mod tests {
)));
}

#[test]
fn unreachable_error_str_matches_expected_variants() {
assert!(is_unreachable_error_str(
"connect failed: Network is unreachable (os error 101)"
));
assert!(is_unreachable_error_str("No route to host"));
assert!(is_unreachable_error_str("os error 113"));
// Case-insensitive.
assert!(is_unreachable_error_str(
"CONNECT FAILED: NETWORK IS UNREACHABLE"
));
}

#[test]
fn unreachable_error_str_rejects_unrelated() {
assert!(!is_unreachable_error_str("connection refused"));
assert!(!is_unreachable_error_str("connect timed out"));
assert!(!is_unreachable_error_str("connection reset by peer"));
assert!(!is_unreachable_error_str(""));
}

#[test]
fn negative_cache_records_and_short_circuits() {
let (mux, _rx) = mux_for_test();
// Initially nothing is cached.
assert!(!mux.is_unreachable("ds6.probe.example", 443));
// Record a matching error.
mux.record_unreachable_if_match(
"ds6.probe.example",
443,
"connect failed: Network is unreachable (os error 101)",
);
assert!(mux.is_unreachable("ds6.probe.example", 443));
// A different port for the same host is its own entry.
assert!(!mux.is_unreachable("ds6.probe.example", 80));
}

#[test]
fn negative_cache_ignores_non_unreachable_errors() {
let (mux, _rx) = mux_for_test();
mux.record_unreachable_if_match(
"example.com",
443,
"connect failed: connection refused",
);
assert!(!mux.is_unreachable("example.com", 443));
}

#[test]
fn negative_cache_normalizes_host_keys() {
let (mux, _rx) = mux_for_test();
// Cache under one casing/format...
mux.record_unreachable_if_match(
"Example.COM.",
443,
"Network is unreachable (os error 101)",
);
// ...and look up under several equivalent forms.
assert!(mux.is_unreachable("example.com", 443));
assert!(mux.is_unreachable("EXAMPLE.com", 443));
assert!(mux.is_unreachable("example.com.", 443));
// Different host should still miss.
assert!(!mux.is_unreachable("other.com", 443));
}

/// Outer `Ok(Err(_))` from the mux channel means "the relay never
/// reached the tunnel-node" (HTTP/TLS to Apps Script failed, batch
/// timed out, etc.) — the destination wasn't even attempted. Even if
/// that error string contains "Network is unreachable" (e.g. the
/// client device's WAN was momentarily down), it must NOT poison the
/// destination cache, or every host the user touched during a
/// connectivity blip stays refused for 30s.
#[tokio::test]
async fn negative_cache_skips_outer_relay_errors() {
let (mux, mut rx) = mux_for_test();
let mux_for_task = mux.clone();
let task = tokio::spawn(async move {
connect_plain("real.target.example", 443, &mux_for_task).await
});

// Receive the Connect msg and reply with an outer Err whose string
// would otherwise match `is_unreachable_error_str`.
let msg = rx.recv().await.expect("connect msg");
let reply = match msg {
MuxMsg::Connect { reply, .. } => reply,
other => panic!("expected Connect, got {:?}", std::mem::discriminant(&other)),
};
let _ = reply.send(Err(
"relay failed: Network is unreachable (os error 101)".into(),
));

let res = task.await.expect("task");
assert!(res.is_err(), "connect_plain should surface the error");
assert!(
!mux.is_unreachable("real.target.example", 443),
"outer relay error must not negative-cache the destination"
);
}

#[test]
fn negative_cache_enforces_hard_cap_under_unique_burst() {
let (mux, _rx) = mux_for_test();
// Insert enough unique still-live entries to exceed the cap. The
// map size must never exceed UNREACHABLE_CACHE_MAX, even though
// every entry is fresh and `retain(expired)` prunes nothing.
let burst = UNREACHABLE_CACHE_MAX + 50;
for i in 0..burst {
let host = format!("h{}.example", i);
mux.record_unreachable_if_match(
&host,
443,
"connect failed: Network is unreachable (os error 101)",
);
}
let len = mux
.unreachable_cache
.lock()
.map(|g| g.len())
.unwrap_or(0);
assert!(
len <= UNREACHABLE_CACHE_MAX,
"cache size {} exceeded cap {}",
len,
UNREACHABLE_CACHE_MAX
);
}

#[test]
fn server_speaks_first_covers_common_protocols() {
for p in [21u16, 22, 25, 80, 110, 143, 587] {
Expand Down Expand Up @@ -1128,6 +1375,7 @@ mod tests {
preread_skip_unsupported: AtomicU64::new(0),
preread_win_total_us: AtomicU64::new(0),
preread_total_events: AtomicU64::new(0),
unreachable_cache: Mutex::new(HashMap::new()),
});
(mux, rx)
}
Expand Down
Loading