Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ pub struct Config {
pub hosts: HashMap<String, String>,
#[serde(default)]
pub enable_batching: bool,
#[serde(default = "default_batch_timeout_ms")]
pub batch_timeout_ms: u64,
#[serde(default = "default_batch_coalesce_window_ms")]
pub batch_coalesce_window_ms: u64,
#[serde(default = "default_max_batch_ops")]
pub max_batch_ops: usize,
/// Optional upstream SOCKS5 proxy for non-HTTP / raw-TCP traffic
/// (e.g. `"127.0.0.1:50529"` pointing at a local xray / v2ray instance).
/// When set, the SOCKS5 listener forwards raw-TCP flows through it
Expand Down Expand Up @@ -164,7 +170,9 @@ pub struct Config {
#[serde(default)]
pub passthrough_hosts: Vec<String>,
}

fn default_batch_timeout_ms() -> u64 { 10000 }
fn default_batch_coalesce_window_ms() -> u64 { 8 }
fn default_max_batch_ops() -> usize { 50 }
fn default_fetch_ips_from_api() -> bool { false }
fn default_max_ips_to_scan() -> usize { 100 }
fn default_scan_batch_size() -> usize {500}
Expand Down Expand Up @@ -220,6 +228,12 @@ impl Config {
}
}
}
if self.batch_timeout_ms == 0 {
return Err(ConfigError::Invalid("batch_timeout_ms must be greater than 0".into()));
}
if self.max_batch_ops == 0 {
return Err(ConfigError::Invalid("max_batch_ops must be greater than 0".into()));
}
if self.scan_batch_size == 0 {
return Err(ConfigError::Invalid(
"scan_batch_size must be greater than 0".into(),
Expand Down
24 changes: 16 additions & 8 deletions src/tunnel_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ const MAX_BATCH_OPS: usize = 50;
/// Timeout for a single batch HTTP round-trip. If the tunnel-node or Apps
/// Script takes longer than this, the batch fails and sessions get error
/// replies rather than hanging forever.
const BATCH_TIMEOUT: Duration = Duration::from_secs(30);
/// const BATCH_TIMEOUT: Duration = Duration::from_secs(30);

/// Timeout for a session waiting for its batch reply. If the batch task
/// is slow (e.g. one op in the batch has a dead target on the tunnel-node
/// side), the session gives up and retries on the next tick rather than
/// blocking indefinitely.
const REPLY_TIMEOUT: Duration = Duration::from_secs(35);
/// const REPLY_TIMEOUT: Duration = Duration::from_secs(35);

/// How long we'll briefly hold the client socket after the local
/// CONNECT/SOCKS5 handshake, waiting for the client's first bytes (the
/// TLS ClientHello for HTTPS). Bundling those bytes with the tunnel-node
/// connect saves one Apps Script round-trip per new flow.
const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50);
/// const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50);

/// How long the muxer holds open the batch buffer after the first op
/// arrives, waiting for more ops to coalesce. Issue #231 — the previous
Expand All @@ -63,7 +63,7 @@ const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50);
/// vs the ~2-7 s Apps Script round-trip the batch is amortizing, but
/// long enough that concurrent HTTP/2 stream openings, parallel fetches,
/// or any other burst lands in the same batch.
const BATCH_COALESCE_WINDOW: Duration = Duration::from_millis(8);
/// const BATCH_COALESCE_WINDOW: Duration = Duration::from_millis(8);

/// Structured error code the tunnel-node returns when it doesn't know the
/// op (version mismatch). Must match `tunnel-node/src/main.rs`.
Expand Down Expand Up @@ -337,7 +337,7 @@ async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
Some(msg) => msgs.push(msg),
None => break,
}
let deadline = tokio::time::Instant::now() + BATCH_COALESCE_WINDOW;
let deadline = tokio::time::Instant::now() + Duration::from_millis(config.batch_coalesce_window_ms);
loop {
// Drain anything that's already queued without waiting.
while let Ok(msg) = rx.try_recv() {
Expand Down Expand Up @@ -570,9 +570,16 @@ async fn fire_batch(

// Bounded-wait: if the batch takes longer than BATCH_TIMEOUT,
// all sessions in this batch get an error and can retry.

// let result = tokio::time::timeout(
// BATCH_TIMEOUT,
// f.tunnel_batch_request_to(&script_id, &data_ops),
//)
//.await;

let result = tokio::time::timeout(
BATCH_TIMEOUT,
f.tunnel_batch_request_to(&script_id, &data_ops),
Duration::from_millis(config.batch_timeout_ms),
f.tunnel_batch_request_to(&script_id, &data_ops),
)
.await;
tracing::info!(
Expand Down Expand Up @@ -634,7 +641,8 @@ pub async fn tunnel_connection(
} else {
let mut buf = vec![0u8; 65536];
let t0 = Instant::now();
match tokio::time::timeout(CLIENT_FIRST_DATA_WAIT, sock.read(&mut buf)).await {
// match tokio::time::timeout(CLIENT_FIRST_DATA_WAIT, sock.read(&mut buf)).await {
match tokio::time::timeout(Duration::from_millis(config.client_first_data_wait_ms), sock.read(&mut buf)).await {
Ok(Ok(0)) => return Ok(()),
Ok(Ok(n)) => {
mux.record_preread_win(port, t0.elapsed());
Expand Down
Loading