diff --git a/cmd/meek-server/deploy.sh b/cmd/meek-server/deploy.sh new file mode 100755 index 0000000..1667e33 --- /dev/null +++ b/cmd/meek-server/deploy.sh @@ -0,0 +1,145 @@ +#!/usr/bin/env bash +# Deploy meek-server to its fronted origin host. +# +# Builds linux/amd64 from THIS checkout, ships it, verifies the transfer by +# sha256, swaps it in atomically (keeping a timestamped backup), restarts the +# service, and verifies /healthz — rolling back automatically if it doesn't come +# back. Finally runs the end-to-end SOCKS5 smoke test (best-effort). +# +# MEEK_HOST is REQUIRED (no default) so an env-less run can't silently deploy to a +# live origin. The host's service layout isn't pinned in-repo, so the rest is +# overridable via env (defaults below). Confirm these match the host before the +# first real run; use --dry-run to preview. +# +# MEEK_HOST=139.162.181.47 (required) MEEK_SSH_USER=root MEEK_SSH_KEY=~/.ssh/id +# MEEK_REMOTE_BIN=/usr/local/bin/meek-server MEEK_SERVICE=meek-server +# MEEK_RESTART_CMD="systemctl restart $MEEK_SERVICE" +# MEEK_STATUS_CMD="systemctl is-active $MEEK_SERVICE" +# MEEK_HEALTHZ_URL=https://meek.getiantem.org/healthz +# MEEK_SSH_STRICT=accept-new (set to "yes" for strict host-key checking) +# +# Usage: cmd/meek-server/deploy.sh [-n|--dry-run] [-h|--help] +set -euo pipefail + +HOST="${MEEK_HOST:-}" +SSH_USER="${MEEK_SSH_USER:-root}" +SSH_KEY="${MEEK_SSH_KEY:-}" +REMOTE_BIN="${MEEK_REMOTE_BIN:-/usr/local/bin/meek-server}" +SERVICE="${MEEK_SERVICE:-meek-server}" +RESTART_CMD="${MEEK_RESTART_CMD:-systemctl restart $SERVICE}" +STATUS_CMD="${MEEK_STATUS_CMD:-systemctl is-active $SERVICE}" +# No default: a fixed prod URL here would verify (and gate rollback on) the wrong +# host for staging/canary deploys. Unset → the HTTP health check is skipped and we +# rely on the service-status check after restart. +HEALTHZ_URL="${MEEK_HEALTHZ_URL:-}" + +DRY_RUN=0 +case "${1:-}" in + -n|--dry-run) DRY_RUN=1 ;; + -h|--help) sed -n '2,21p' "$0"; exit 0 ;; + "") ;; + *) echo "unknown arg: $1 (try --help)" >&2; exit 2 ;; +esac + +# Required (checked after --help/-h so those don't need it): never default to a live host. +: "${HOST:?required — set MEEK_HOST to the target origin (e.g. 139.162.181.47); refusing to default to a live host}" + +cd "$(dirname "$0")/../.." # repo root + +# accept-new (TOFU) by default so a first deploy to operator-owned infra doesn't +# require pre-seeding known_hosts; set MEEK_SSH_STRICT=yes for strict checking. +SSH_STRICT="${MEEK_SSH_STRICT:-accept-new}" +SSH_OPTS=(-o "StrictHostKeyChecking=$SSH_STRICT" -o ConnectTimeout=15) +[ -n "$SSH_KEY" ] && SSH_OPTS+=(-i "$SSH_KEY") +ssh_h() { ssh "${SSH_OPTS[@]}" "${SSH_USER}@${HOST}" "$@"; } +say() { printf '\n=== %s ===\n' "$*"; } + +# Hash locally with whichever tool exists: sha256sum (most Linux) or shasum -a 256 +# (macOS, some others). The remote always has sha256sum. +sha256_local() { + if command -v sha256sum >/dev/null 2>&1; then + sha256sum "$1" | awk '{print $1}' + else + shasum -a 256 "$1" | awk '{print $1}' + fi +} + +say "build meek-server (linux/amd64) from $(git rev-parse --short HEAD 2>/dev/null || echo '?')" +TMP=$(mktemp -d); trap 'rm -rf "$TMP"' EXIT +GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o "$TMP/meek-server" ./cmd/meek-server +LSHA=$(sha256_local "$TMP/meek-server") +echo "built $(wc -c <"$TMP/meek-server") bytes sha256=$LSHA" + +if [ "$DRY_RUN" = 1 ]; then + cat < /tmp/meek-server.new (then verify sha256 == $LSHA) + cp $REMOTE_BIN -> ${REMOTE_BIN}.bak. (backup, if present) + install /tmp/meek-server.new -> $REMOTE_BIN + run $RESTART_CMD ; check $STATUS_CMD + verify $HEALTHZ_URL returns "ok" (else roll back) + smoke test: cmd/meek-server/smoketest/socks5.sh +PLAN + exit 0 +fi + +say "ship to ${SSH_USER}@${HOST}" +scp "${SSH_OPTS[@]}" "$TMP/meek-server" "${SSH_USER}@${HOST}:/tmp/meek-server.new" + +say "verify transfer (sha256)" +RSHA=$(ssh_h "sha256sum /tmp/meek-server.new | awk '{print \$1}'") +if [ "$RSHA" != "$LSHA" ]; then + echo "sha256 mismatch: local=$LSHA remote=$RSHA — aborting, nothing swapped" >&2 + ssh_h "rm -f /tmp/meek-server.new" || true + exit 1 +fi +echo "verified on host" + +say "backup + atomic swap + restart" +BAK="${REMOTE_BIN}.bak.$(date -u +%Y%m%dT%H%M%SZ)" +ssh_h bash -s < $BAK"; fi +install -m 0755 /tmp/meek-server.new "$REMOTE_BIN" +rm -f /tmp/meek-server.new +$RESTART_CMD +sleep 2 +echo -n "service: "; $STATUS_CMD || true +REMOTE + +if [ -z "$HEALTHZ_URL" ]; then + say "verify /healthz (skipped — set MEEK_HEALTHZ_URL for an HTTP health gate)" + echo "relying on the post-restart service-status check above" +else + say "verify /healthz" + ok=0 + for _ in $(seq 1 10); do + if curl -fsS --max-time 10 "$HEALTHZ_URL" 2>/dev/null | grep -q "ok"; then ok=1; break; fi + sleep 2 + done + if [ "$ok" != 1 ]; then + echo "healthz did not return ok — ROLLING BACK to $BAK" >&2 + ssh_h bash -s < httpbin.org ` → expect `05 00 00 01 ...` +3. **HTTP**: POST `GET /ip HTTP/1.0\r\n...` → drain the HTTP response + +Each phase is one or more `POST /` calls with the same `X-Session-Id` +header so meek-server routes them to the same upstream TCP connection. +Follow-up empty POSTs are used to drain bytes that the upstream wrote +while the script was building the next request. + +### What a successful run looks like + +```console +✅ End-to-end SUCCESS: "origin": "139.162.181.47" +The request traversed: curl → Akamai → Caddy → meek-server → microsocks → httpbin.org +``` diff --git a/cmd/meek-server/smoketest/socks5.sh b/cmd/meek-server/smoketest/socks5.sh new file mode 100755 index 0000000..45a8482 --- /dev/null +++ b/cmd/meek-server/smoketest/socks5.sh @@ -0,0 +1,153 @@ +#!/bin/bash +# Real end-to-end smoke test: sequential SOCKS5 + HTTP through the meek tunnel. +# +# microsocks requires strict SOCKS5 request-response (no pipelining), so we do: +# POST 1: send method-select (3 bytes) → wait for 2-byte reply +# POST 2: send CONNECT httpbin.org:80 (18 bytes) → wait for 10-byte reply +# POST 3+: send HTTP GET → drain HTTP response +# +# Success criterion: HTTP body contains "origin": "" + +set -euo pipefail + +FRONT_HOST="${FRONT_HOST:-a248.e.akamai.net}" +INNER_HOST="${INNER_HOST:-meek.dsa.akamai.getiantem.org}" +# Set to the server's -auth-token to smoke-test a hardened (authenticated) +# deployment; unset for the open-relay/local-test default. +AUTH_TOKEN="${MEEK_AUTH_TOKEN:-}" +TARGET_HOST="httpbin.org" +TARGET_PORT=80 + +EDGE_IP=$(dig +short "$FRONT_HOST" | grep -E '^[0-9]+\.' | head -1) +[ -z "$EDGE_IP" ] && { echo "couldn't resolve $FRONT_HOST" >&2; exit 1; } +echo "Akamai edge IP: $EDGE_IP" + +SID=$(openssl rand -hex 16) +echo "session id: $SID" +POST_URL="https://${FRONT_HOST}/" + +# Per-run private temp dir — avoids cross-run collisions and symlink +# races that fixed /tmp paths invite on shared hosts. +WORKDIR=$(mktemp -d "${TMPDIR:-/tmp}/meek-smoke.XXXXXX") +trap 'rm -rf "$WORKDIR"' EXIT + +# --- meek POST helper --- +# usage: meek_post [payload-file] +meek_post() { + local out=$1 + local data=${2:-} + local args=( + -sS + --resolve "${FRONT_HOST}:443:${EDGE_IP}" + --http1.1 + -X POST + -H "Host: ${INNER_HOST}" + -H "X-Session-Id: ${SID}" + -H "Content-Type: application/octet-stream" + -o "$out" + -w "%{size_download}" + ) + [ -n "$AUTH_TOKEN" ] && args+=(-H "X-Meek-Auth: ${AUTH_TOKEN}") + if [ -n "$data" ]; then + args+=(--data-binary "@$data") + else + args+=(--data-binary "") + fi + curl "${args[@]}" "$POST_URL" +} + +# Send then poll empty POSTs, concatenating each response chunk into +# , until appears (when given) or at least +# are received — whichever first. The meek server returns whatever upstream bytes +# are ready per poll, so an HTTP response can span several polls; a marker stop +# avoids exiting before the full response arrives (a fixed byte count can). +# usage: send_and_drain [success-marker] +send_and_drain() { + local payload=$1 accum=$2 minb=$3 maxp=$4 marker=${5:-} + : > "$accum" + local tmp=$(mktemp) + local sz + sz=$(meek_post "$tmp" "$payload") + cat "$tmp" >> "$accum" + local n=$(wc -c < "$accum") + for i in $(seq 1 "$maxp"); do + if [ -n "$marker" ]; then + grep -q "$marker" "$accum" && break + elif [ "$n" -ge "$minb" ]; then + break + fi + sleep 0.3 + sz=$(meek_post "$tmp") + cat "$tmp" >> "$accum" + n=$(wc -c < "$accum") + done + rm -f "$tmp" + echo "$n" +} + +# --- Build the three payloads --- +python3 <&2 + exit 1 +fi + +echo "" +echo "--- Phase 2: SOCKS5 CONNECT $TARGET_HOST:$TARGET_PORT ---" +n=$(send_and_drain ${WORKDIR}/p2-connect.bin ${WORKDIR}/r2.bin 10 8) +echo "received ${n} bytes: $(xxd -p ${WORKDIR}/r2.bin | head -1)" +# SOCKS5 CONNECT reply: 05 00 00 01 ... (success) +if ! [[ "$(xxd -p ${WORKDIR}/r2.bin | head -c 8)" =~ ^050000 ]]; then + echo "ERROR: SOCKS5 CONNECT reply doesn't start with 05 00 00 (REP=success)" >&2 + exit 1 +fi +echo "CONNECT succeeded" + +echo "" +echo "--- Phase 3: HTTP GET /ip ---" +# Poll until the success marker arrives (the response can span multiple polls), +# not a fixed byte count which could exit before "origin" is present. +n=$(send_and_drain ${WORKDIR}/p3-http.bin ${WORKDIR}/r3.bin 1 30 '"origin"') +echo "received ${n} bytes of HTTP response" + +echo "" +echo "--- HTTP response body ---" +cat ${WORKDIR}/r3.bin +echo "" +echo "--- success check ---" +if grep -q '"origin"' ${WORKDIR}/r3.bin; then + origin=$(grep -o '"origin"[^}]*' ${WORKDIR}/r3.bin) + echo "✅ End-to-end SUCCESS: ${origin}" + echo "The request traversed: curl → Akamai → Caddy → meek-server → microsocks → httpbin.org" + echo "and httpbin reported it saw the Linode's public IP, proving the proxy actually exited the box." +else + echo "❌ HTTP response didn't contain origin field — partial chain failure" + exit 1 +fi + +echo "" +echo "--- meek-server healthz ---" +curl -sS https://meek.getiantem.org/healthz +echo "" diff --git a/constant/proxy.go b/constant/proxy.go index 4912b96..5974144 100644 --- a/constant/proxy.go +++ b/constant/proxy.go @@ -3,6 +3,7 @@ package constant const ( TypeAmnezia = "amnezia" TypeALGeneva = "algeneva" + TypeMeek = "meek" TypeOutline = "outline" TypeReflex = "reflex" TypeSamizdat = "samizdat" diff --git a/option/meek.go b/option/meek.go new file mode 100644 index 0000000..da99d18 --- /dev/null +++ b/option/meek.go @@ -0,0 +1,37 @@ +package option + +import "github.com/sagernet/sing-box/option" + +// MeekOutboundOptions configures a domain-fronted meek outbound that +// tunnels arbitrary TCP through HTTPS POSTs to a meek server endpoint. +// +// Fronts is the candidate front pool — pairs of (CDN edge IP, outer SNI) +// known to route the inner Host (URL.Host) to the meek server. Radiance +// populates this from the fronted/scanner package's discoveries; without +// at least one front the outbound has nothing to dial. +type MeekOutboundOptions struct { + option.DialerOptions + + URL string `json:"url"` // meek server URL (e.g. https://meek.dsa.akamai.getiantem.org/) + Fronts []FrontSpec `json:"fronts"` // candidate fronts + Header MeekHeaders `json:"header,omitempty"` // extra HTTP headers per request + + PollIntervalMs int `json:"poll_interval_ms,omitempty"` // default 100 + MaxBodyBytes int `json:"max_body_bytes,omitempty"` // default 256 KiB (caps request + response bodies per poll) + SessionIDLen int `json:"session_id_len,omitempty"` // default 16 + ConnectTimeout string `json:"connect_timeout,omitempty"` // default "15s" + ReadTimeout string `json:"read_timeout,omitempty"` // default "30s" +} + +// FrontSpec is one (CDN edge IP, outer SNI) pair to dial. Empty SNI +// means send no ServerName extension (Akamai-style); non-empty SNI is +// sent in the ClientHello (CloudFront-style). VerifyHostname is the +// host expected on the cert chain. +type FrontSpec struct { + IPAddress string `json:"ip_address"` + SNI string `json:"sni,omitempty"` + VerifyHostname string `json:"verify_hostname,omitempty"` +} + +// MeekHeaders carries fixed-value HTTP headers added to every POST. +type MeekHeaders map[string]string diff --git a/protocol/meek/client.go b/protocol/meek/client.go new file mode 100644 index 0000000..923eb69 --- /dev/null +++ b/protocol/meek/client.go @@ -0,0 +1,523 @@ +// Package meek implements a domain-fronted meek client: chunked +// TCP-over-HTTPS, session-keyed by a per-Conn random ID sent in +// X-Session-Id. The wire format is the meek-v1 polling scheme as used +// by Psiphon and Lantern. +// +// Each Conn maintains a single polling goroutine that POSTs to the meek +// server every PollIntervalMs, batching outbound bytes from Write into +// the request body and feeding the response body to readers via Read. +// The server is expected to be a meek endpoint behind any front the +// client dials — typically a Lantern-operated /meek/ endpoint reachable +// through Akamai or CloudFront via the inner Host. +package meek + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/hex" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strconv" + "sync" + "time" +) + +const ( + defaultPollIntervalMs = 100 + // defaultMaxBodyBytes is the per-poll body cap. Throughput is bytes-per-poll + // ÷ round-trip-time; each poll is a full HTTPS request through the front, so + // the RTT (not the poll interval) paces them. 256 KiB keeps a healthy stream + // moving without making a single lost/retried poll expensive to replay. + defaultMaxBodyBytes = 256 * 1024 + // legacyMaxBodyBytes is the response cap the server applies to clients that + // don't advertise X-Meek-Max-Body — i.e. pre-negotiation clients that read + // at most 64 KiB. Keeps old clients from truncating a larger response. + legacyMaxBodyBytes = 64 * 1024 + defaultSessionIDLen = 16 + defaultReadTimeout = 30 * time.Second + defaultMaxWriteBufBytes = 1 << 20 // 1 MiB + // defaultMaxPollRetries is how many times a failed poll is retried before the + // session is torn down. Retries are safe because each poll carries a + // monotonic X-Meek-Seq the server dedupes (it replays the buffered response + // for a repeated seq), so a lost request or lost response can't dup or drop + // bytes — see roundtrip / the server's seq handling. + defaultMaxPollRetries = 4 + retryBaseBackoff = 250 * time.Millisecond + headerSeq = "X-Meek-Seq" + headerMaxBody = "X-Meek-Max-Body" +) + +// Config is the runtime configuration for a meek Conn. +type Config struct { + URL string + InnerHost string + ExtraHeaders map[string]string + HTTPClient *http.Client + PollInterval time.Duration + MaxBodyBytes int + SessionIDLen int + ReadTimeout time.Duration + // MaxWriteBufBytes caps the unsent Write backlog. Write blocks once + // the buffer reaches this size and resumes as the poll loop drains + // it, so a sender outpacing a slow front applies backpressure + // instead of growing memory without bound. + MaxWriteBufBytes int + // MaxPollRetries is how many times a failed poll is retried (with backoff) + // before the session is torn down. <=0 uses defaultMaxPollRetries. Retries + // are made safe by the per-poll X-Meek-Seq the server dedupes. + MaxPollRetries int +} + +func (c *Config) applyDefaults() { + if c.PollInterval <= 0 { + c.PollInterval = time.Duration(defaultPollIntervalMs) * time.Millisecond + } + if c.MaxBodyBytes <= 0 { + c.MaxBodyBytes = defaultMaxBodyBytes + } + if c.SessionIDLen <= 0 { + c.SessionIDLen = defaultSessionIDLen + } + if c.ReadTimeout <= 0 { + c.ReadTimeout = defaultReadTimeout + } + if c.MaxWriteBufBytes <= 0 { + c.MaxWriteBufBytes = defaultMaxWriteBufBytes + } + if c.MaxPollRetries <= 0 { + c.MaxPollRetries = defaultMaxPollRetries + } +} + +// Conn is a net.Conn that tunnels through a meek server. +type Conn struct { + cfg Config + sessionID string + + ctx context.Context + cancel context.CancelFunc + + mu sync.Mutex + writeBuf bytes.Buffer + writeReady chan struct{} + // writeCond wakes a Write blocked on a full writeBuf when the poll + // loop drains it (or on close / write-deadline). + writeCond *sync.Cond + + readBuf bytes.Buffer + readCond *sync.Cond + + // seq is the monotonic sequence number of the next poll; the server dedupes + // on it so a retried poll replays rather than re-applies. inflight is the + // chunk taken for the current seq, held until the poll succeeds so a retry + // resends the same bytes; inflightTaken distinguishes "not yet taken" from + // "taken, empty" (an empty poll still has a seq). + seq uint64 + inflight []byte + inflightTaken bool + + closed bool + closeErr error + + readDeadline time.Time + readDeadlineTimer *time.Timer + writeDeadline time.Time + writeDeadlineTimer *time.Timer + + pollDone chan struct{} +} + +// Dial opens a meek session. The supplied HTTP client must be configured +// so its TLS DialContext targets a working front — typically the radiance +// fronted/scanner package's output composed with the standard fronted +// dialer. +func Dial(ctx context.Context, cfg Config) (*Conn, error) { + cfg.applyDefaults() + + if cfg.URL == "" { + return nil, errors.New("meek: empty URL") + } + u, err := url.Parse(cfg.URL) + if err != nil { + return nil, fmt.Errorf("meek: parse URL: %w", err) + } + if cfg.InnerHost == "" { + cfg.InnerHost = u.Host + } + if cfg.HTTPClient == nil { + return nil, errors.New("meek: HTTPClient required") + } + + id := make([]byte, cfg.SessionIDLen) + if _, err := rand.Read(id); err != nil { + return nil, fmt.Errorf("meek: session id: %w", err) + } + + c := &Conn{ + cfg: cfg, + sessionID: hex.EncodeToString(id), + writeReady: make(chan struct{}, 1), + pollDone: make(chan struct{}), + } + c.readCond = sync.NewCond(&c.mu) + c.writeCond = sync.NewCond(&c.mu) + c.ctx, c.cancel = context.WithCancel(ctx) + + go c.pollLoop() + return c, nil +} + +func (c *Conn) Read(p []byte) (int, error) { + c.mu.Lock() + defer c.mu.Unlock() + + for c.readBuf.Len() == 0 && !c.closed { + if !c.readDeadline.IsZero() && time.Now().After(c.readDeadline) { + return 0, errReadDeadline + } + c.readCond.Wait() + } + if c.readBuf.Len() == 0 && c.closed { + if c.closeErr != nil { + return 0, c.closeErr + } + return 0, io.EOF + } + return c.readBuf.Read(p) +} + +func (c *Conn) Write(p []byte) (int, error) { + total := len(p) + // Append in chunks bounded by the remaining backlog capacity so a + // single large Write can't grow writeBuf past MaxWriteBufBytes: each + // pass blocks until the poll loop has drained room, applying real + // backpressure rather than only checking the cap before a wholesale + // append. + for len(p) > 0 { + c.mu.Lock() + for c.writeBuf.Len() >= c.cfg.MaxWriteBufBytes && !c.closed { + if !c.writeDeadline.IsZero() && !time.Now().Before(c.writeDeadline) { + c.mu.Unlock() + return total - len(p), errWriteDeadline + } + c.writeCond.Wait() + } + if c.closed { + c.mu.Unlock() + return total - len(p), errors.New("meek: closed") + } + if !c.writeDeadline.IsZero() && !time.Now().Before(c.writeDeadline) { + c.mu.Unlock() + return total - len(p), errWriteDeadline + } + room := c.cfg.MaxWriteBufBytes - c.writeBuf.Len() + n := len(p) + if n > room { + n = room + } + c.writeBuf.Write(p[:n]) + p = p[n:] + c.mu.Unlock() + + select { + case c.writeReady <- struct{}{}: + default: + } + } + return total, nil +} + +func (c *Conn) Close() error { + c.mu.Lock() + if c.closed { + c.mu.Unlock() + return nil + } + c.closed = true + if c.readDeadlineTimer != nil { + c.readDeadlineTimer.Stop() + c.readDeadlineTimer = nil + } + if c.writeDeadlineTimer != nil { + c.writeDeadlineTimer.Stop() + c.writeDeadlineTimer = nil + } + c.readCond.Broadcast() + c.writeCond.Broadcast() + c.mu.Unlock() + + c.cancel() + <-c.pollDone + return nil +} + +func (c *Conn) LocalAddr() net.Addr { return meekAddr("meek-client") } +func (c *Conn) RemoteAddr() net.Addr { return meekAddr(c.cfg.URL) } + +func (c *Conn) SetDeadline(t time.Time) error { + if err := c.SetReadDeadline(t); err != nil { + return err + } + return c.SetWriteDeadline(t) +} + +// SetReadDeadline arranges for a parked Read to wake when t elapses. +// readCond.Wait has no native timeout, so without an active signal a +// Read would park past the deadline until data, close, or a new +// deadline arrived. A zero t clears the deadline. +func (c *Conn) SetReadDeadline(t time.Time) error { + c.mu.Lock() + defer c.mu.Unlock() + c.readDeadline = t + if c.readDeadlineTimer != nil { + c.readDeadlineTimer.Stop() + c.readDeadlineTimer = nil + } + if t.IsZero() { + return nil + } + d := time.Until(t) + if d <= 0 { + c.readCond.Broadcast() + return nil + } + c.readDeadlineTimer = time.AfterFunc(d, func() { + c.mu.Lock() + c.readCond.Broadcast() + c.mu.Unlock() + }) + return nil +} + +// SetWriteDeadline arranges for a Write blocked on a full backlog to +// wake when t elapses (mirrors SetReadDeadline): without an active +// signal the writeCond.Wait would park past the deadline if the front +// is hung and the poll loop never drains. A zero t clears the deadline. +func (c *Conn) SetWriteDeadline(t time.Time) error { + c.mu.Lock() + defer c.mu.Unlock() + c.writeDeadline = t + if c.writeDeadlineTimer != nil { + c.writeDeadlineTimer.Stop() + c.writeDeadlineTimer = nil + } + if t.IsZero() { + return nil + } + d := time.Until(t) + if d <= 0 { + c.writeCond.Broadcast() + return nil + } + c.writeDeadlineTimer = time.AfterFunc(d, func() { + c.mu.Lock() + c.writeCond.Broadcast() + c.mu.Unlock() + }) + return nil +} + +func (c *Conn) pollLoop() { + defer close(c.pollDone) + ticker := time.NewTicker(c.cfg.PollInterval) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + c.markClosed(c.ctx.Err()) + return + case <-ticker.C: + case <-c.writeReady: + } + + if err := c.roundtripWithRetry(); err != nil { + if errors.Is(err, io.EOF) { + c.markClosed(nil) // clean end-of-stream → Read returns io.EOF + } else { + c.markClosed(err) + } + return + } + } +} + +// roundtripWithRetry retries a failed poll up to MaxPollRetries with linear +// backoff. Safe because the poll keeps the same seq + in-flight chunk across +// attempts: the server replays the buffered response for a repeated seq (no dup +// upstream, no dropped downstream). Aborts promptly if the conn is cancelled. +func (c *Conn) roundtripWithRetry() error { + var lastErr error + for attempt := 0; ; attempt++ { + if attempt > 0 { + timer := time.NewTimer(time.Duration(attempt) * retryBaseBackoff) + select { + case <-c.ctx.Done(): + timer.Stop() + return c.ctx.Err() + case <-timer.C: + } + } + if err := c.roundtrip(); err != nil { + lastErr = err + var perm *permanentError + if errors.As(err, &perm) { + return err // session is gone — retrying would resurrect it + } + if attempt >= c.cfg.MaxPollRetries { + return fmt.Errorf("poll failed after %d retries: %w", c.cfg.MaxPollRetries, lastErr) + } + continue + } + return nil + } +} + +func (c *Conn) roundtrip() error { + c.mu.Lock() + if !c.inflightTaken { + c.inflight = c.takeWriteChunkLocked() // may be nil for an empty (poll-only) request + c.inflightTaken = true + } + bodyBytes := c.inflight + seq := c.seq + c.mu.Unlock() + + // Bound each poll by ReadTimeout so one hung request can't block the poll loop + // forever when the caller's HTTPClient has no timeout of its own. Cancel runs + // after the response body is fully read below (deferred LIFO, after Body.Close). + reqCtx, cancel := context.WithTimeout(c.ctx, c.cfg.ReadTimeout) + defer cancel() + req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, c.cfg.URL, bytes.NewReader(bodyBytes)) + if err != nil { + return fmt.Errorf("build request: %w", err) + } + req.Host = c.cfg.InnerHost + // Apply caller headers first, then set the protocol-critical ones so + // config can't override the session keying or framing (e.g. pinning a + // fixed X-Session-Id across conns, or changing Content-Type). Host is + // set via req.Host above and likewise not overridable here. + for k, v := range c.cfg.ExtraHeaders { + if isReservedHeader(k) { + continue + } + req.Header.Set(k, v) + } + req.Header.Set("Content-Type", "application/octet-stream") + req.Header.Set("X-Session-Id", c.sessionID) + req.Header.Set(headerSeq, strconv.FormatUint(seq, 10)) + // Advertise how large a response we can read, so the server can send bigger + // chunks without truncating older clients (which omit this header). + req.Header.Set(headerMaxBody, strconv.Itoa(c.cfg.MaxBodyBytes)) + + resp, err := c.cfg.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("post: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + // Non-200 means the server dropped the session (it does so on every error + // path), so a retry would resurrect a fresh session instead of ending the + // stream — mark it permanent. A clean upstream-closed 410 maps to io.EOF so + // Read surfaces a proper end-of-stream. + if resp.StatusCode == http.StatusGone { + return &permanentError{io.EOF} + } + return &permanentError{fmt.Errorf("meek: status %d", resp.StatusCode)} + } + + // Read one byte past the negotiated cap so an over-cap response is detected and + // rejected, not silently truncated (which would corrupt the tunneled byte stream). + // Mirrors the server's request-size check. + limited := io.LimitReader(resp.Body, int64(c.cfg.MaxBodyBytes)+1) + buf, err := io.ReadAll(limited) + if err != nil { + return fmt.Errorf("read response: %w", err) + } + if len(buf) > c.cfg.MaxBodyBytes { + return fmt.Errorf("meek: response body exceeds negotiated max %d bytes", c.cfg.MaxBodyBytes) + } + // The poll succeeded: commit the response, release the in-flight chunk, and + // advance the seq so the next poll is a fresh one. + c.mu.Lock() + if len(buf) > 0 { + c.readBuf.Write(buf) + c.readCond.Broadcast() + } + c.inflight = nil + c.inflightTaken = false + c.seq++ + c.mu.Unlock() + return nil +} + +func (c *Conn) takeWriteChunkLocked() []byte { + if c.writeBuf.Len() == 0 { + return nil + } + chunk := c.writeBuf.Bytes() + if len(chunk) > c.cfg.MaxBodyBytes { + chunk = chunk[:c.cfg.MaxBodyBytes] + } + out := make([]byte, len(chunk)) + copy(out, chunk) + c.writeBuf.Next(len(chunk)) + // Wake any Write parked on the backlog cap. + c.writeCond.Broadcast() + return out +} + +func (c *Conn) markClosed(err error) { + c.mu.Lock() + if !c.closed { + c.closed = true + c.closeErr = err + } + c.readCond.Broadcast() + c.writeCond.Broadcast() + c.mu.Unlock() +} + +// isReservedHeader reports whether name is a header the meek protocol +// owns and config must not override. +func isReservedHeader(name string) bool { + switch http.CanonicalHeaderKey(name) { + case "Host", "Content-Type", "X-Session-Id", headerSeq, headerMaxBody: + return true + default: + return false + } +} + +type meekAddr string + +func (a meekAddr) Network() string { return "meek" } +func (a meekAddr) String() string { return string(a) } + +// permanentError marks a poll failure that retrying won't fix — specifically a +// non-200 response, since the server drops the session on every error path, so a +// retry just resurrects a fresh session instead of surfacing end-of-stream. +type permanentError struct{ err error } + +func (e *permanentError) Error() string { return e.err.Error() } +func (e *permanentError) Unwrap() error { return e.err } + +// timeoutError implements net.Error with Timeout()==true so the meek Conn behaves +// like a real net.Conn for callers that switch on net.Error/Timeout (the stdlib +// http client, io helpers, etc.) to distinguish a deadline from a hard failure. +type timeoutError struct{ msg string } + +func (e *timeoutError) Error() string { return e.msg } +func (e *timeoutError) Timeout() bool { return true } +func (e *timeoutError) Temporary() bool { return true } + +var ( + errReadDeadline net.Error = &timeoutError{"meek: read deadline exceeded"} + errWriteDeadline net.Error = &timeoutError{"meek: write deadline exceeded"} +) diff --git a/protocol/meek/client_test.go b/protocol/meek/client_test.go new file mode 100644 index 0000000..dc6ea16 --- /dev/null +++ b/protocol/meek/client_test.go @@ -0,0 +1,268 @@ +package meek + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" +) + +func TestConn_RoundTrip(t *testing.T) { + srv := newMeekTestServer() + t.Cleanup(srv.Close) + + cfg := Config{ + URL: srv.server.URL, + HTTPClient: srv.server.Client(), + PollInterval: 20 * time.Millisecond, + } + c, err := Dial(context.Background(), cfg) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { c.Close() }) + + if _, err := c.Write([]byte("hello")); err != nil { + t.Fatalf("Write: %v", err) + } + + buf := make([]byte, 32) + c.SetReadDeadline(time.Now().Add(2 * time.Second)) + n, err := c.Read(buf) + if err != nil { + t.Fatalf("Read: %v", err) + } + if got := string(buf[:n]); got != "HELLO" { + t.Errorf("Read = %q; want %q", got, "HELLO") + } +} + +func TestConn_SessionPersistence(t *testing.T) { + srv := newMeekTestServer() + t.Cleanup(srv.Close) + + cfg := Config{URL: srv.server.URL, HTTPClient: srv.server.Client(), PollInterval: 20 * time.Millisecond} + c, err := Dial(context.Background(), cfg) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { c.Close() }) + + c.Write([]byte("a")) + c.Write([]byte("bc")) + time.Sleep(100 * time.Millisecond) + c.Write([]byte("d")) + time.Sleep(100 * time.Millisecond) + + srv.mu.Lock() + defer srv.mu.Unlock() + if len(srv.sessions) != 1 { + t.Errorf("expected 1 session, got %d", len(srv.sessions)) + } +} + +func TestConn_RequiresHTTPClient(t *testing.T) { + _, err := Dial(context.Background(), Config{URL: "https://example.com/meek/"}) + if err == nil { + t.Errorf("expected error when HTTPClient is nil") + } +} + +func TestConn_RequiresURL(t *testing.T) { + _, err := Dial(context.Background(), Config{HTTPClient: http.DefaultClient}) + if err == nil { + t.Errorf("expected error when URL is empty") + } +} + +// SetReadDeadline must unblock a parked Read when the deadline elapses +// in real time, not only when set in the past. +func TestConn_SetReadDeadlineUnblocksParkedRead(t *testing.T) { + srv := newMeekTestServer() + t.Cleanup(srv.Close) + + cfg := Config{URL: srv.server.URL, HTTPClient: srv.server.Client(), PollInterval: 50 * time.Millisecond} + c, err := Dial(context.Background(), cfg) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { c.Close() }) + + // No Write — server has no upstream bytes, so Read parks immediately. + c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + start := time.Now() + buf := make([]byte, 4) + _, err = c.Read(buf) + elapsed := time.Since(start) + + if !errors.Is(err, errReadDeadline) { + t.Errorf("Read err = %v; want errReadDeadline", err) + } + // Allow generous slack for CI scheduling jitter, but fail hard if + // the Read either returned immediately (deadline not enforced at + // all) or hung past 1s (timer didn't fire). + if elapsed < 50*time.Millisecond { + t.Errorf("Read returned too fast: %v", elapsed) + } + if elapsed > time.Second { + t.Errorf("Read returned too slow: %v", elapsed) + } +} + +// A single Write larger than MaxWriteBufBytes must not buffer the whole +// payload at once: when the poll loop can't drain (front stalled), Write +// fills to the cap, blocks, and surfaces the write deadline having +// accepted far fewer than len(p) bytes. Guards the chunked-append cap. +func TestConn_LargeWriteRespectsBacklogCap(t *testing.T) { + // A transport that parks every POST until the conn's context is + // cancelled stalls the poll loop after at most one drained chunk, so + // writeBuf can't be emptied. Using the request context (which is the + // meek conn's ctx) means Close cancels it deterministically with no + // leaked server goroutine. + cfg := Config{ + URL: "https://meek.example/", + HTTPClient: &http.Client{Transport: roundTripperFunc(func(req *http.Request) (*http.Response, error) { + <-req.Context().Done() + return nil, req.Context().Err() + })}, + PollInterval: 2 * time.Second, // long, so only the Write signal drives the loop + MaxBodyBytes: 512, + MaxWriteBufBytes: 4096, + } + c, err := Dial(context.Background(), cfg) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { _ = c.Close() }) + + const total = 1 << 20 // 1 MiB, far above the 4 KiB cap + c.SetWriteDeadline(time.Now().Add(300 * time.Millisecond)) + n, err := c.Write(make([]byte, total)) + if !errors.Is(err, errWriteDeadline) { + t.Fatalf("Write err = %v; want errWriteDeadline", err) + } + // At most the cap plus the single drained chunk should have been + // accepted — nowhere near the full payload. + if n >= total { + t.Errorf("Write accepted %d bytes; cap should have blocked well below %d", n, total) + } + if n > cfg.MaxWriteBufBytes+cfg.MaxBodyBytes { + t.Errorf("Write accepted %d bytes; want <= cap+chunk (%d)", n, cfg.MaxWriteBufBytes+cfg.MaxBodyBytes) + } +} + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { return f(req) } + +// meekTestServer is a minimal meek server that uppercases every byte the +// client sends and queues it as response data on the next poll. +type meekTestServer struct { + server *httptest.Server + mu sync.Mutex + sessions map[string]*bytes.Buffer +} + +// ExtraHeaders must not override protocol-critical headers: a config +// trying to pin X-Session-Id (which would collapse every conn onto one +// server-side session) is ignored, and the server sees the real +// per-conn random ID. +func TestConn_ReservedHeadersNotOverridable(t *testing.T) { + srv := newMeekTestServer() + t.Cleanup(srv.Close) + + cfg := Config{ + URL: srv.server.URL, + HTTPClient: srv.server.Client(), + PollInterval: 20 * time.Millisecond, + ExtraHeaders: map[string]string{ + "X-Session-Id": "hijacked", + "Content-Type": "text/plain", + "X-Custom": "ok", // non-reserved: should pass through + }, + } + c, err := Dial(context.Background(), cfg) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { c.Close() }) + + if _, err := c.Write([]byte("hi")); err != nil { + t.Fatalf("Write: %v", err) + } + time.Sleep(80 * time.Millisecond) + + srv.mu.Lock() + defer srv.mu.Unlock() + if _, hijacked := srv.sessions["hijacked"]; hijacked { + t.Error("ExtraHeaders overrode X-Session-Id; reserved header not protected") + } + if _, ok := srv.sessions[c.sessionID]; !ok { + t.Errorf("server never saw the real session id %q", c.sessionID) + } +} + +func newMeekTestServer() *meekTestServer { + s := &meekTestServer{sessions: map[string]*bytes.Buffer{}} + s.server = httptest.NewServer(http.HandlerFunc(s.handle)) + return s +} + +func (s *meekTestServer) Close() { + s.server.Close() +} + +func (s *meekTestServer) handle(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + sid := r.Header.Get("X-Session-Id") + if sid == "" { + w.WriteHeader(http.StatusBadRequest) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + s.mu.Lock() + queue, ok := s.sessions[sid] + if !ok { + queue = &bytes.Buffer{} + s.sessions[sid] = queue + } + for _, b := range body { + if b >= 'a' && b <= 'z' { + queue.WriteByte(b - 32) + } else { + queue.WriteByte(b) + } + } + resp := queue.Bytes() + queue.Reset() + s.mu.Unlock() + + w.Header().Set("Content-Type", "application/octet-stream") + w.WriteHeader(http.StatusOK) + _, _ = w.Write(resp) +} + +// Deadline errors must implement net.Error with Timeout()==true so the meek Conn +// behaves like a real net.Conn for callers that switch on net.Error/Timeout. +func TestConn_DeadlineErrorsAreNetTimeouts(t *testing.T) { + if !errReadDeadline.Timeout() { + t.Error("read deadline error: Timeout() = false; want true (net.Conn contract)") + } + if !errWriteDeadline.Timeout() { + t.Error("write deadline error: Timeout() = false; want true") + } +} diff --git a/protocol/meek/outbound.go b/protocol/meek/outbound.go new file mode 100644 index 0000000..8fb2c8a --- /dev/null +++ b/protocol/meek/outbound.go @@ -0,0 +1,327 @@ +package meek + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "io" + "math/rand/v2" + "net" + "net/http" + "net/url" + "time" + + "github.com/sagernet/sing-box/adapter" + "github.com/sagernet/sing-box/adapter/outbound" + "github.com/sagernet/sing-box/common/dialer" + "github.com/sagernet/sing-box/log" + "github.com/sagernet/sing/common/logger" + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" + "github.com/sagernet/sing/protocol/socks/socks5" + + "github.com/getlantern/lantern-box/constant" + "github.com/getlantern/lantern-box/option" +) + +// RegisterOutbound registers the meek outbound adapter. +func RegisterOutbound(registry *outbound.Registry) { + outbound.Register[option.MeekOutboundOptions](registry, constant.TypeMeek, NewOutbound) +} + +// Outbound is the sing-box outbound adapter wrapping the meek client. +type Outbound struct { + outbound.Adapter + logger logger.ContextLogger + cfg Config + fronts []option.FrontSpec + connectTimeout time.Duration +} + +// NewOutbound constructs a meek outbound. Returns an error if no fronts +// or no URL are configured — without one, the outbound has nothing to +// dial. +func NewOutbound( + ctx context.Context, + router adapter.Router, + logger log.ContextLogger, + tag string, + options option.MeekOutboundOptions, +) (adapter.Outbound, error) { + if options.URL == "" { + return nil, errors.New("meek: url is required") + } + if len(options.Fronts) == 0 { + return nil, errors.New("meek: fronts is required (at least one front)") + } + u, err := url.Parse(options.URL) + if err != nil { + return nil, fmt.Errorf("meek: parse url: %w", err) + } + // Must be https: the transport routes every dial through + // DialTLSContext (the fronted TLS dialer). An http:// URL would make + // http.Transport use DialContext instead, bypassing fronting and the + // cert pinning below — silently leaking traffic. + if u.Scheme != "https" { + return nil, fmt.Errorf("meek: url scheme must be https, got %q", u.Scheme) + } + // Every front must pin a cert identity. verifyChain runs with + // VerifyHostname (falling back to SNI); if both are empty the cert + // check degrades to "any publicly-trusted cert", which is no check at + // all for a fronted endpoint. Reject at config time rather than + // silently accepting MITM. + for i, f := range options.Fronts { + if f.VerifyHostname == "" && f.SNI == "" { + return nil, fmt.Errorf("meek: fronts[%d] must set verify_hostname or sni so the cert identity can be checked", i) + } + } + + outboundDialer, err := dialer.New(ctx, options.DialerOptions, false) + if err != nil { + return nil, fmt.Errorf("meek: building dialer: %w", err) + } + + connectTimeout, err := parseDurationOr(options.ConnectTimeout, 15*time.Second) + if err != nil { + return nil, fmt.Errorf("meek: connect_timeout: %w", err) + } + readTimeout, err := parseDurationOr(options.ReadTimeout, defaultReadTimeout) + if err != nil { + return nil, fmt.Errorf("meek: read_timeout: %w", err) + } + + pollInterval := time.Duration(options.PollIntervalMs) * time.Millisecond + if options.PollIntervalMs <= 0 { + pollInterval = time.Duration(defaultPollIntervalMs) * time.Millisecond + } + + o := &Outbound{ + Adapter: outbound.NewAdapterWithDialerOptions( + constant.TypeMeek, + tag, + []string{N.NetworkTCP}, + options.DialerOptions, + ), + logger: logger, + fronts: options.Fronts, + connectTimeout: connectTimeout, + } + + o.cfg = Config{ + URL: options.URL, + InnerHost: u.Host, + ExtraHeaders: options.Header, + HTTPClient: buildHTTPClient(outboundDialer, o.fronts, connectTimeout, readTimeout), + PollInterval: pollInterval, + MaxBodyBytes: options.MaxBodyBytes, + SessionIDLen: options.SessionIDLen, + ReadTimeout: readTimeout, + } + return o, nil +} + +// DialContext opens a meek-tunneled TCP connection to destination. +// +// sing-box treats this as a terminal outbound and writes the application +// stream straight into the returned conn, so the destination must be +// conveyed to the meek server's upstream before we hand the conn back. +// That upstream is a SOCKS5 proxy (microsocks in the standard +// deployment), so we run a SOCKS5 CONNECT to destination over the tunnel +// first; without it the upstream would read the application's opening +// bytes as a malformed SOCKS handshake and the connection would fail. +func (o *Outbound) DialContext(ctx context.Context, network string, destination M.Socksaddr) (net.Conn, error) { + if N.NetworkName(network) != N.NetworkTCP { + return nil, fmt.Errorf("meek: unsupported network %q", network) + } + ctx, metadata := adapter.ExtendContext(ctx) + metadata.Outbound = o.Tag() + metadata.Destination = destination + o.logger.InfoContext(ctx, "meek outbound to ", destination) + + conn, err := Dial(ctx, o.cfg) + if err != nil { + return nil, err + } + // Bound the handshake: meek reads have no native ctx deadline, so a + // silent upstream would otherwise park here until the poll loop's own + // read timeout. + _ = conn.SetDeadline(time.Now().Add(o.connectTimeout)) + if err := socks5ConnectSequenced(conn, destination); err != nil { + conn.Close() + return nil, fmt.Errorf("meek: socks5 connect to %s: %w", destination, err) + } + _ = conn.SetDeadline(time.Time{}) + return conn, nil +} + +// socks5ConnectSequenced performs the SOCKS5 no-auth CONNECT handshake over the +// meek Conn with reads done via io.ReadFull. sing's socks.ClientHandshake5 reads +// the replies byte-at-a-time through varbin's stub ReadByte (which issues a +// 1-byte Read and ignores n); over the meek polling Conn that desyncs the +// handshake — DialContext returns instantly while microsocks actually replies +// 0x05 0xFF ("no acceptable methods"), and that rejection leaks into the +// application stream so every transfer stalls. io.ReadFull tolerates short/zero +// reads and the explicit method-select→reply→CONNECT→reply ordering matches what +// microsocks requires (strict, no pipelining). +func socks5ConnectSequenced(conn net.Conn, dst M.Socksaddr) error { + // 1. Offer only NO_AUTH, then read the 2-byte method-select reply. + if err := socks5.WriteAuthRequest(conn, socks5.AuthRequest{Methods: []byte{socks5.AuthTypeNotRequired}}); err != nil { + return fmt.Errorf("write method-select: %w", err) + } + authReply := make([]byte, 2) + if _, err := io.ReadFull(conn, authReply); err != nil { + return fmt.Errorf("read method-select reply: %w", err) + } + if authReply[0] != socks5.Version { + return fmt.Errorf("unexpected socks version %#x", authReply[0]) + } + if authReply[1] != socks5.AuthTypeNotRequired { + return fmt.Errorf("server rejected no-auth (method %#x)", authReply[1]) + } + + // 2. CONNECT, then read the reply: 4-byte header + bound addr + 2-byte port. + if err := socks5.WriteRequest(conn, socks5.Request{Command: socks5.CommandConnect, Destination: dst}); err != nil { + return fmt.Errorf("write connect: %w", err) + } + head := make([]byte, 4) // VER, REP, RSV, ATYP + if _, err := io.ReadFull(conn, head); err != nil { + return fmt.Errorf("read connect reply: %w", err) + } + if head[0] != socks5.Version { + return fmt.Errorf("unexpected socks version %#x in connect reply", head[0]) + } + if head[1] != 0 { // 0x00 = succeeded (RFC 1928) + return fmt.Errorf("connect failed (reply code %#x)", head[1]) + } + if head[2] != 0 { // RSV must be 0x00 (RFC 1928) + return fmt.Errorf("unexpected reserved byte %#x in connect reply", head[2]) + } + var addrLen int + switch head[3] { // ATYP (RFC 1928): 1=IPv4, 3=domain, 4=IPv6 + case 0x01: + addrLen = net.IPv4len + case 0x04: + addrLen = net.IPv6len + case 0x03: + lb := make([]byte, 1) + if _, err := io.ReadFull(conn, lb); err != nil { + return fmt.Errorf("read bound-addr length: %w", err) + } + addrLen = int(lb[0]) + default: + return fmt.Errorf("unexpected atyp %#x in connect reply", head[3]) + } + if _, err := io.ReadFull(conn, make([]byte, addrLen+2)); err != nil { // bound addr + port + return fmt.Errorf("read bound addr/port: %w", err) + } + return nil +} + +// ListenPacket is unimplemented — meek is a TCP-stream-shaped transport. +func (o *Outbound) ListenPacket(ctx context.Context, destination M.Socksaddr) (net.PacketConn, error) { + return nil, errors.New("meek: udp not supported") +} + +func (o *Outbound) Network() []string { return []string{N.NetworkTCP} } + +func (o *Outbound) Close() error { return nil } + +// buildHTTPClient returns an *http.Client whose TCP+TLS dialer picks a +// random front from fronts on every dial and connects to its IP with +// the spec's outer SNI. cert validation uses VerifyHostname when +// present so the spec drives both who-we-look-like and who-we-trust. +func buildHTTPClient(d N.Dialer, fronts []option.FrontSpec, connectTimeout, readTimeout time.Duration) *http.Client { + tr := &http.Transport{ + DialTLSContext: func(ctx context.Context, _, _ string) (net.Conn, error) { + front := pickFront(fronts) + addr := front.IPAddress + if _, _, err := net.SplitHostPort(addr); err != nil { + addr = net.JoinHostPort(addr, "443") + } + verifyHost := front.VerifyHostname + if verifyHost == "" { + verifyHost = front.SNI + } + // NewOutbound rejects fronts with neither set, but guard the + // dial too: an empty DNSName makes verifyChain skip the + // hostname check, which would silently accept any trusted cert. + if verifyHost == "" { + return nil, errors.New("meek: front has no verify_hostname or sni; refusing to dial without cert identity") + } + dialCtx, cancel := context.WithTimeout(ctx, connectTimeout) + defer cancel() + raw, err := d.DialContext(dialCtx, N.NetworkTCP, M.ParseSocksaddr(addr)) + if err != nil { + return nil, fmt.Errorf("meek: tcp dial %s: %w", addr, err) + } + // InsecureSkipVerify disables the default CA+hostname check so + // we can run our own via VerifyPeerCertificate against the + // front's pinned identity (verifyHost) rather than the outer + // SNI — the SNI is cover, not the cert we trust. + tlsConfig := &tls.Config{InsecureSkipVerify: true} //nolint:gosec // custom verification below pins verifyHost + if front.SNI != "" { + tlsConfig.ServerName = front.SNI + } + tlsConfig.VerifyPeerCertificate = func(rawCerts [][]byte, _ [][]*x509.Certificate) error { + return verifyChain(rawCerts, verifyHost) + } + conn := tls.Client(raw, tlsConfig) + if err := conn.HandshakeContext(dialCtx); err != nil { + raw.Close() + return nil, fmt.Errorf("meek: tls: %w", err) + } + return conn, nil + }, + DisableKeepAlives: false, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: connectTimeout, + } + return &http.Client{Transport: tr, Timeout: readTimeout} +} + +func pickFront(fronts []option.FrontSpec) option.FrontSpec { + if len(fronts) == 1 { + return fronts[0] + } + return fronts[rand.IntN(len(fronts))] +} + +func verifyChain(rawCerts [][]byte, dnsName string) error { + if len(rawCerts) == 0 { + return errors.New("no certs presented") + } + cert, err := x509.ParseCertificate(rawCerts[0]) + if err != nil { + return fmt.Errorf("parse leaf: %w", err) + } + roots, err := x509.SystemCertPool() + if err != nil { + return fmt.Errorf("system roots: %w", err) + } + opts := x509.VerifyOptions{ + Roots: roots, + DNSName: dnsName, + CurrentTime: time.Now(), + Intermediates: x509.NewCertPool(), + } + for i := 1; i < len(rawCerts); i++ { + c, err := x509.ParseCertificate(rawCerts[i]) + if err != nil { + return fmt.Errorf("intermediate %d: %w", i, err) + } + opts.Intermediates.AddCert(c) + } + if _, err := cert.Verify(opts); err != nil { + return fmt.Errorf("verify: %w", err) + } + return nil +} + +func parseDurationOr(s string, def time.Duration) (time.Duration, error) { + if s == "" { + return def, nil + } + return time.ParseDuration(s) +} diff --git a/protocol/meek/outbound_test.go b/protocol/meek/outbound_test.go new file mode 100644 index 0000000..b8e0dff --- /dev/null +++ b/protocol/meek/outbound_test.go @@ -0,0 +1,128 @@ +package meek + +import ( + "bytes" + "context" + "io" + "net" + "strings" + "testing" + "time" + + M "github.com/sagernet/sing/common/metadata" + + "github.com/getlantern/lantern-box/option" +) + +// NewOutbound must reject configs that would silently weaken the +// transport: a non-https URL (which would bypass the fronted TLS dialer) +// and a front with no cert identity (which would skip hostname +// verification). Both checks run before the dialer is built, so a nil +// router/logger is fine for these error paths. +func TestNewOutbound_RejectsUnsafeConfig(t *testing.T) { + base := func() option.MeekOutboundOptions { + return option.MeekOutboundOptions{ + URL: "https://meek.example/", + Fronts: []option.FrontSpec{{IPAddress: "1.2.3.4", VerifyHostname: "a248.e.akamai.net"}}, + } + } + + t.Run("http scheme rejected", func(t *testing.T) { + opts := base() + opts.URL = "http://meek.example/" + _, err := NewOutbound(context.Background(), nil, nil, "meek", opts) + if err == nil || !strings.Contains(err.Error(), "https") { + t.Errorf("err = %v; want a scheme-must-be-https error", err) + } + }) + + t.Run("front without verify_hostname or sni rejected", func(t *testing.T) { + opts := base() + opts.Fronts = []option.FrontSpec{{IPAddress: "1.2.3.4"}} // both SNI and VerifyHostname empty + _, err := NewOutbound(context.Background(), nil, nil, "meek", opts) + if err == nil || !strings.Contains(err.Error(), "verify_hostname or sni") { + t.Errorf("err = %v; want a cert-identity-required error", err) + } + }) + + t.Run("front with sni only is accepted past validation", func(t *testing.T) { + opts := base() + opts.Fronts = []option.FrontSpec{{IPAddress: "1.2.3.4", SNI: "cover.example"}} + // Validation passes; any error must come from later dialer setup, + // not the front/scheme guards. + _, err := NewOutbound(context.Background(), nil, nil, "meek", opts) + if err != nil && (strings.Contains(err.Error(), "https") || strings.Contains(err.Error(), "verify_hostname or sni")) { + t.Errorf("sni-only front should pass the identity guard, got %v", err) + } + }) +} + +// bytewiseConn is a net.Conn that serves canned reply bytes one at a time and +// captures everything written. It reproduces the meek polling Conn's short/ +// byte-wise reads — the exact pattern that desynced sing's ReadByte-based SOCKS5 +// handshake (the bug this PR fixes); io.ReadFull in socks5ConnectSequenced must +// tolerate it. +type bytewiseConn struct { + reply []byte // server->client bytes, served one per Read + written bytes.Buffer +} + +func (c *bytewiseConn) Read(p []byte) (int, error) { + if len(c.reply) == 0 { + return 0, io.EOF + } + if len(p) == 0 { + return 0, nil + } + p[0] = c.reply[0] + c.reply = c.reply[1:] + return 1, nil // one byte per Read — the desync trigger +} +func (c *bytewiseConn) Write(p []byte) (int, error) { return c.written.Write(p) } +func (c *bytewiseConn) Close() error { return nil } +func (c *bytewiseConn) LocalAddr() net.Addr { return nil } +func (c *bytewiseConn) RemoteAddr() net.Addr { return nil } +func (c *bytewiseConn) SetDeadline(time.Time) error { return nil } +func (c *bytewiseConn) SetReadDeadline(time.Time) error { return nil } +func (c *bytewiseConn) SetWriteDeadline(time.Time) error { return nil } + +// Regression guard for FD #174614: the SOCKS5 CONNECT handshake must survive a +// Conn that returns one byte per Read (the polling-Conn behavior that made sing's +// byte-at-a-time ClientHandshake5 desync). socks5ConnectSequenced uses io.ReadFull, +// so it must complete cleanly and emit a correct no-auth method-select + CONNECT. +func TestSocks5ConnectSequenced_ToleratesBytewiseReads(t *testing.T) { + reply := []byte{ + 0x05, 0x00, // method-select: VER=5, METHOD=0x00 (no-auth) + 0x05, 0x00, 0x00, 0x01, // CONNECT reply: VER=5, REP=0x00 (ok), RSV=0x00, ATYP=IPv4 + 0x00, 0x00, 0x00, 0x00, // bound addr + 0x00, 0x00, // bound port + } + conn := &bytewiseConn{reply: append([]byte(nil), reply...)} + + if err := socks5ConnectSequenced(conn, M.Socksaddr{Fqdn: "example.com", Port: 443}); err != nil { + t.Fatalf("socks5ConnectSequenced over byte-wise reads: %v", err) + } + + w := conn.written.Bytes() + // Method-select must offer exactly no-auth: VER=5, NMETHODS=1, METHOD=0x00. + if len(w) < 3 || w[0] != 0x05 || w[1] != 0x01 || w[2] != 0x00 { + t.Fatalf("method-select request = %#x; want 05 01 00 prefix", w) + } + // A CONNECT request (VER=5, CMD=1) must follow it. + if len(w) < 5 || w[3] != 0x05 || w[4] != 0x01 { + t.Fatalf("connect request = %#x; want 05 01 (CONNECT) after method-select", w) + } +} + +// A non-zero RSV byte in the CONNECT reply must be rejected (RFC 1928). +func TestSocks5ConnectSequenced_RejectsNonZeroRSV(t *testing.T) { + conn := &bytewiseConn{reply: []byte{ + 0x05, 0x00, // method-select ok + 0x05, 0x00, 0x07, 0x01, // CONNECT reply with RSV=0x07 (invalid) + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }} + err := socks5ConnectSequenced(conn, M.Socksaddr{Fqdn: "example.com", Port: 443}) + if err == nil || !strings.Contains(err.Error(), "reserved byte") { + t.Fatalf("err = %v; want a non-zero-RSV rejection", err) + } +} diff --git a/protocol/meek/retry_integrity_test.go b/protocol/meek/retry_integrity_test.go new file mode 100644 index 0000000..988d904 --- /dev/null +++ b/protocol/meek/retry_integrity_test.go @@ -0,0 +1,183 @@ +package meek + +import ( + "bytes" + "context" + "crypto/rand" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" +) + +// faultTransport simulates a LOST RESPONSE: it lets the request reach the meek +// server (so the server processes that seq — drains downstream / writes upstream) +// but then discards the response and reports an error to the client. That forces +// the client to retry the same seq, which is exactly the case a naive retry gets +// wrong: the server already advanced, so a correct implementation must replay the +// buffered response (no gap downstream, no duplicate upstream). +type faultTransport struct { + inner http.RoundTripper + n int64 + dropEvery int64 // drop the response on every Nth request (0 = never) + dropped int64 +} + +func (f *faultTransport) RoundTrip(req *http.Request) (*http.Response, error) { + i := atomic.AddInt64(&f.n, 1) + resp, err := f.inner.RoundTrip(req) + if err != nil { + return resp, err + } + if f.dropEvery > 0 && i%f.dropEvery == 0 { + // The server has already handled this seq; drop its response so the + // client must retry and rely on the server replaying it. + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + atomic.AddInt64(&f.dropped, 1) + return nil, fmt.Errorf("injected response loss for request %d", i) + } + return resp, nil +} + +func newMeekTestStack(t *testing.T, upstream string, dropEvery int64) (*Conn, *faultTransport) { + t.Helper() + srv, err := NewServer(ServerConfig{Upstream: upstream}) + if err != nil { + t.Fatalf("NewServer: %v", err) + } + t.Cleanup(func() { srv.Close() }) + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + + ft := &faultTransport{inner: http.DefaultTransport, dropEvery: dropEvery} + hc := &http.Client{Transport: ft, Timeout: 10 * time.Second} + conn, err := Dial(context.Background(), Config{ + URL: hs.URL, InnerHost: "test", HTTPClient: hc, + PollInterval: 5 * time.Millisecond, + }) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { conn.Close() }) + return conn, ft +} + +// TestMeekRetryDownloadIntegrity: upstream streams a 2 MiB random payload; every +// 4th poll response is dropped. The client must reassemble the payload byte-for- +// byte — a gap (lost-after-drain) or dup would fail the compare. +func TestMeekRetryDownloadIntegrity(t *testing.T) { + const size = 2 << 20 + payload := make([]byte, size) + if _, err := rand.Read(payload); err != nil { + t.Fatal(err) + } + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { ln.Close() }) + go func() { + for { + c, err := ln.Accept() + if err != nil { + return + } + go func(c net.Conn) { + defer c.Close() + go io.Copy(io.Discard, c) // drain the client's trigger byte(s) + _, _ = c.Write(payload) + }(c) + } + }() + + conn, ft := newMeekTestStack(t, ln.Addr().String(), 4) + if _, err := conn.Write([]byte("go")); err != nil { // trigger upstream + t.Fatal(err) + } + _ = conn.SetReadDeadline(time.Now().Add(20 * time.Second)) + got := make([]byte, 0, size) + buf := make([]byte, 96*1024) + for len(got) < size { + n, err := conn.Read(buf) + got = append(got, buf[:n]...) + if err != nil { + t.Fatalf("read err at %d/%d bytes: %v", len(got), size, err) + } + } + if !bytes.Equal(got[:size], payload) { + t.Fatalf("download corrupted under response loss (got %d bytes)", len(got)) + } + if d := atomic.LoadInt64(&ft.dropped); d == 0 { + t.Fatal("test ineffective: no responses were dropped") + } else { + t.Logf("download intact across %d dropped+retried responses", d) + } +} + +// TestMeekRetryUploadNoDuplication: the client writes a 2 MiB random payload; the +// upstream concatenates everything it receives. With response loss forcing +// retries, a non-idempotent server would re-write retried chunks → upstream sees +// >2 MiB / corrupted bytes. Correct dedupe yields exactly the payload. +func TestMeekRetryUploadNoDuplication(t *testing.T) { + const size = 2 << 20 + payload := make([]byte, size) + if _, err := rand.Read(payload); err != nil { + t.Fatal(err) + } + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { ln.Close() }) + recv := make([]byte, size) + var readErr error + var extra bool + done := make(chan struct{}) + go func() { + c, err := ln.Accept() + if err != nil { + return + } + defer c.Close() + // Read exactly `size` bytes (no client Close needed — closing would + // cancel the poll loop and drop unsent data), then check that no further + // bytes arrive: a duplicated retry chunk would show up as trailing data. + if _, readErr = io.ReadFull(c, recv); readErr != nil { + close(done) + return + } + _ = c.SetReadDeadline(time.Now().Add(750 * time.Millisecond)) + if n, _ := c.Read(make([]byte, 1)); n > 0 { + extra = true + } + close(done) + }() + + conn, ft := newMeekTestStack(t, ln.Addr().String(), 4) + if _, err := conn.Write(payload); err != nil { + t.Fatalf("write: %v", err) + } + select { + case <-done: + case <-time.After(20 * time.Second): + t.Fatal("upstream did not receive full payload in time") + } + if readErr != nil { + t.Fatalf("upstream read: %v (gap under retry?)", readErr) + } + if extra { + t.Fatal("upstream received MORE than payload — a retried chunk was duplicated") + } + if !bytes.Equal(recv, payload) { + t.Fatal("upstream bytes don't match payload (corruption under retry)") + } + if d := atomic.LoadInt64(&ft.dropped); d == 0 { + t.Fatal("test ineffective: no responses were dropped") + } + t.Logf("upload intact (exactly %d bytes) across %d dropped+retried responses", size, atomic.LoadInt64(&ft.dropped)) +} diff --git a/protocol/meek/server.go b/protocol/meek/server.go new file mode 100644 index 0000000..ceeba1b --- /dev/null +++ b/protocol/meek/server.go @@ -0,0 +1,514 @@ +package meek + +import ( + "crypto/subtle" + "errors" + "fmt" + "io" + "log/slog" + "net" + "net/http" + "strconv" + "sync" + "time" +) + +// ServerConfig configures a Server. +type ServerConfig struct { + // Upstream is dialed for each new session and gets the bytes the + // client posts; bytes from the upstream flow back as response + // bodies. The server pipes bytes verbatim and is agnostic to the + // upstream protocol, but the bundled meek outbound opens each session + // with a SOCKS5 CONNECT, so a SOCKS5 proxy on the same host + // (e.g. microsocks at "127.0.0.1:1080") is the expected deployment. + Upstream string + + // MaxBodyBytes caps both the request-body the server accepts (larger + // POSTs get 413) and the response-body it returns per POST. Default + // 256 KiB (matches the client's default). + MaxBodyBytes int + + // ResponseHoldoff is how long the server waits for upstream bytes + // before responding with whatever it has (possibly empty). + // Too small: many empty responses, idle CPU. Too large: high + // client-perceived latency on the upstream-quiet path. Default + // 50 ms. + ResponseHoldoff time.Duration + + // SessionIdleTimeout is how long a session may go without a POST + // before the reaper drops it. Should be at least 2-3x the client's + // expected PollInterval to handle network blips. Default 5 min. + SessionIdleTimeout time.Duration + + // Dialer optionally overrides net.Dial for upstream connections. + Dialer func(network, address string) (net.Conn, error) + + // AuthToken, when non-empty, is a shared secret every request must + // present in the X-Meek-Auth header. Without it the server is an + // open relay into Upstream — anyone who reaches the endpoint can + // create sessions and tunnel arbitrary traffic — so production + // deployments on a public/fronted hostname MUST set it. Empty + // disables the check (intended only for local tests). + AuthToken string + + Logger *slog.Logger +} + +func (c *ServerConfig) defaults() { + if c.MaxBodyBytes <= 0 { + c.MaxBodyBytes = defaultMaxBodyBytes + } + if c.ResponseHoldoff <= 0 { + c.ResponseHoldoff = 50 * time.Millisecond + } + if c.SessionIdleTimeout <= 0 { + c.SessionIdleTimeout = 5 * time.Minute + } + if c.Dialer == nil { + c.Dialer = func(network, address string) (net.Conn, error) { + return net.DialTimeout(network, address, 10*time.Second) + } + } + if c.Logger == nil { + c.Logger = slog.Default() + } +} + +// Server is an http.Handler implementing the meek-v1 protocol. +type Server struct { + cfg ServerConfig + + mu sync.Mutex + sessions map[string]*session + + closeOnce sync.Once + stop chan struct{} + reaperOK chan struct{} +} + +// NewServer constructs a Server and starts the session reaper goroutine. +// Call Close to stop the reaper and tear down all sessions. +func NewServer(cfg ServerConfig) (*Server, error) { + if cfg.Upstream == "" { + return nil, errors.New("meek server: Upstream required") + } + cfg.defaults() + s := &Server{ + cfg: cfg, + sessions: make(map[string]*session), + stop: make(chan struct{}), + reaperOK: make(chan struct{}), + } + go s.reapLoop() + return s, nil +} + +// Close stops the reaper and closes every active upstream connection. +// Idempotent. +func (s *Server) Close() error { + s.closeOnce.Do(func() { + close(s.stop) + <-s.reaperOK + s.mu.Lock() + for _, sess := range s.sessions { + sess.close() + } + s.sessions = nil + s.mu.Unlock() + }) + return nil +} + +// ServeHTTP handles a POST from a meek client. Non-POST requests get +// 405; missing X-Session-Id gets 400. +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + // Constant-time compare so a probe can't time-discover the token. + if s.cfg.AuthToken != "" && + subtle.ConstantTimeCompare([]byte(r.Header.Get("X-Meek-Auth")), []byte(s.cfg.AuthToken)) != 1 { + http.Error(w, "forbidden", http.StatusForbidden) + return + } + sid := r.Header.Get("X-Session-Id") + if sid == "" { + http.Error(w, "missing X-Session-Id", http.StatusBadRequest) + return + } + + // Read + size-check the body BEFORE creating/dialing a session, so malformed or + // oversized POSTs (e.g. spammed unique X-Session-Ids) can't open an upstream + // connection and leave a session alive until idle reap. Read one byte past the + // cap so an oversized POST is rejected rather than silently truncated (a + // truncated prefix forwarded upstream would corrupt the tunneled TCP stream). + body, err := io.ReadAll(io.LimitReader(r.Body, int64(s.cfg.MaxBodyBytes)+1)) + if err != nil { + http.Error(w, "read body", http.StatusBadRequest) + return + } + if len(body) > s.cfg.MaxBodyBytes { + http.Error(w, "request body exceeds max_body_bytes", http.StatusRequestEntityTooLarge) + return + } + + sess, isNew, err := s.getOrCreateSession(sid) + if err != nil { + s.cfg.Logger.Warn("meek server: upstream dial failed", slog.String("sid", sid), slog.Any("error", err)) + http.Error(w, "upstream unreachable", http.StatusBadGateway) + return + } + if isNew { + s.cfg.Logger.Debug("meek server: new session", slog.String("sid", sid)) + } + + // Response cap: honor the client's advertised read size (X-Meek-Max-Body), + // bounded by our own limit. Clients that don't advertise are pre-negotiation + // and read at most legacyMaxBodyBytes — never send them more, or they truncate. + respCap := s.cfg.MaxBodyBytes + if adv := parseMaxBody(r.Header.Get(headerMaxBody)); adv > 0 { + if adv < respCap { + respCap = adv + } + } else if respCap > legacyMaxBodyBytes { + respCap = legacyMaxBodyBytes + } + + seq, hasSeq := parseSeq(r.Header.Get(headerSeq)) + downstream, err := sess.serveRequest(hasSeq, seq, body, respCap, s.cfg.ResponseHoldoff) + if err != nil { + s.dropSession(sid) + if errors.Is(err, errUpstreamClosed) { + // Clean end-of-stream: upstream closed with nothing left. 410 tells the + // client the session is gone so its Conn surfaces EOF instead of polling forever. + s.cfg.Logger.Debug("meek server: upstream closed; ending session", slog.String("sid", sid)) + http.Error(w, "upstream closed", http.StatusGone) + } else { + s.cfg.Logger.Debug("meek server: upstream write failed; closing session", slog.String("sid", sid), slog.Any("error", err)) + http.Error(w, "upstream write", http.StatusBadGateway) + } + return + } + + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(downstream))) + w.WriteHeader(http.StatusOK) + if len(downstream) > 0 { + _, _ = w.Write(downstream) + } +} + +// parseSeq parses an X-Meek-Seq header; ok is false when absent/invalid (legacy +// client — no dedupe). parseMaxBody parses X-Meek-Max-Body; 0 when absent. +func parseSeq(s string) (uint64, bool) { + if s == "" { + return 0, false + } + v, err := strconv.ParseUint(s, 10, 64) + if err != nil { + return 0, false + } + return v, true +} + +func parseMaxBody(s string) int { + if s == "" { + return 0 + } + v, err := strconv.Atoi(s) + if err != nil || v < 0 { + return 0 + } + return v +} + +func (s *Server) getOrCreateSession(sid string) (*session, bool, error) { + s.mu.Lock() + sess, ok := s.sessions[sid] + if ok { + sess.touch() + s.mu.Unlock() + return sess, false, nil + } + s.mu.Unlock() + + conn, err := s.cfg.Dialer("tcp", s.cfg.Upstream) + if err != nil { + return nil, false, fmt.Errorf("dial upstream %s: %w", s.cfg.Upstream, err) + } + sess = newSession(sid, conn) + go sess.readPump(s.cfg.MaxBodyBytes * 4) + + s.mu.Lock() + if existing, ok := s.sessions[sid]; ok { + conn.Close() + existing.touch() + s.mu.Unlock() + return existing, false, nil + } + s.sessions[sid] = sess + s.mu.Unlock() + return sess, true, nil +} + +func (s *Server) dropSession(sid string) { + s.mu.Lock() + sess, ok := s.sessions[sid] + if ok { + delete(s.sessions, sid) + } + s.mu.Unlock() + if ok { + sess.close() + } +} + +func (s *Server) reapLoop() { + defer close(s.reaperOK) + t := time.NewTicker(s.cfg.SessionIdleTimeout / 2) + defer t.Stop() + for { + select { + case <-s.stop: + return + case <-t.C: + s.reapOnce() + } + } +} + +func (s *Server) reapOnce() { + cutoff := time.Now().Add(-s.cfg.SessionIdleTimeout) + s.mu.Lock() + var dead []*session + for sid, sess := range s.sessions { + if sess.lastSeen().Before(cutoff) { + dead = append(dead, sess) + delete(s.sessions, sid) + } + } + s.mu.Unlock() + for _, sess := range dead { + sess.close() + } +} + +// SessionCount is exposed for ops / metrics. +func (s *Server) SessionCount() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.sessions) +} + +// --- session --- + +type session struct { + id string + upstream net.Conn + + mu sync.Mutex + pending []byte + closed bool + last time.Time + readWakeCh chan struct{} + upstreamDone chan struct{} + // drainCond wakes a readPump paused because pending is at cap, when + // takeLocked frees space or the session closes. + drainCond *sync.Cond + + // reqMu serializes per-session request processing and guards the seq replay + // state below. Separate from mu (which guards pending/readPump) so holding it + // across writeUpstream+takeDownstream can't deadlock the read pump. lastResp + // is the response sent for lastSeq, replayed verbatim if that seq is retried. + reqMu sync.Mutex + haveSeq bool + lastSeq uint64 + lastResp []byte +} + +// errUpstreamClosed signals that the upstream is closed with nothing left to +// send, so ServeHTTP should drop the session and tell the client to tear down. +var errUpstreamClosed = errors.New("meek: upstream closed") + +func newSession(id string, upstream net.Conn) *session { + s := &session{ + id: id, + upstream: upstream, + last: time.Now(), + readWakeCh: make(chan struct{}, 1), + upstreamDone: make(chan struct{}), + } + s.drainCond = sync.NewCond(&s.mu) + return s +} + +// serveRequest applies a client poll to the session and returns the bytes to +// send back. With a sequence number it is idempotent: a repeated seq replays the +// buffered response without re-writing upstream or re-draining downstream, so a +// client that retries a lost poll neither duplicates upstream bytes nor drops +// downstream ones. Without a seq (pre-negotiation clients) every request is +// processed fresh — unchanged legacy behavior. reqMu serializes per-session +// requests so a retry that races the original simply waits and then replays. +func (s *session) serveRequest(hasSeq bool, seq uint64, body []byte, respCap int, holdoff time.Duration) ([]byte, error) { + if !hasSeq { + if len(body) > 0 { + if err := s.writeUpstream(body); err != nil { + return nil, err + } + } + resp := s.takeDownstream(respCap, holdoff) + if len(resp) == 0 && s.upstreamFinished() { + return nil, errUpstreamClosed + } + return resp, nil + } + + s.reqMu.Lock() + defer s.reqMu.Unlock() + if s.haveSeq && seq == s.lastSeq { + return s.lastResp, nil // retry of the last poll — replay its response + } + if len(body) > 0 { + if err := s.writeUpstream(body); err != nil { + return nil, err + } + } + resp := s.takeDownstream(respCap, holdoff) + if len(resp) == 0 && s.upstreamFinished() { + return nil, errUpstreamClosed + } + s.haveSeq = true + s.lastSeq = seq + s.lastResp = resp + return resp, nil +} + +// upstreamFinished reports whether the upstream is closed AND all buffered +// downstream bytes have been drained — i.e. there is nothing left to ever send, +// so the session should end and the client tear down (otherwise a read-only +// client would poll forever on empty 200s, never seeing EOF). +func (s *session) upstreamFinished() bool { + select { + case <-s.upstreamDone: + default: + return false + } + s.mu.Lock() + defer s.mu.Unlock() + return len(s.pending) == 0 +} + +func (s *session) lastSeen() time.Time { + s.mu.Lock() + defer s.mu.Unlock() + return s.last +} + +func (s *session) touch() { + s.mu.Lock() + s.last = time.Now() + s.mu.Unlock() +} + +func (s *session) writeUpstream(b []byte) error { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return errors.New("session closed") + } + s.mu.Unlock() + _, err := s.upstream.Write(b) + return err +} + +// readPump drains the upstream connection into pending until upstream +// closes or session.close is called. cap bounds how much we'll buffer +// — if pending fills, the pump pauses until takeDownstream drains it. +func (s *session) readPump(cap int) { + defer close(s.upstreamDone) + buf := make([]byte, 32*1024) + for { + n, err := s.upstream.Read(buf) + if n > 0 { + s.mu.Lock() + // Only block when there is already buffered data to drain. + // With an empty buffer we append unconditionally so a single + // read larger than cap (possible when MaxBodyBytes*4 < 32 KiB) + // can't wedge the pump waiting for room that never frees. + for len(s.pending) > 0 && len(s.pending)+n > cap && !s.closed { + s.drainCond.Wait() + } + if s.closed { + s.mu.Unlock() + return + } + s.pending = append(s.pending, buf[:n]...) + s.mu.Unlock() + s.signalWake() + } + if err != nil { + return + } + } +} + +// takeDownstream returns up to max pending bytes. If pending is empty +// it blocks up to holdoff waiting for the readPump to deliver bytes, +// then returns whatever it has (possibly empty). +func (s *session) takeDownstream(max int, holdoff time.Duration) []byte { + s.mu.Lock() + if len(s.pending) > 0 { + chunk := s.takeLocked(max) + s.last = time.Now() + s.mu.Unlock() + return chunk + } + s.last = time.Now() + s.mu.Unlock() + + select { + case <-s.readWakeCh: + case <-time.After(holdoff): + } + + s.mu.Lock() + defer s.mu.Unlock() + return s.takeLocked(max) +} + +func (s *session) takeLocked(max int) []byte { + if len(s.pending) == 0 { + return nil + } + n := len(s.pending) + if n > max { + n = max + } + out := make([]byte, n) + copy(out, s.pending[:n]) + s.pending = s.pending[n:] + // Wake a readPump paused at the cap. + s.drainCond.Broadcast() + return out +} + +func (s *session) signalWake() { + select { + case s.readWakeCh <- struct{}{}: + default: + } +} + +func (s *session) close() { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return + } + s.closed = true + s.drainCond.Broadcast() + s.mu.Unlock() + s.upstream.Close() + s.signalWake() +} diff --git a/protocol/meek/server_test.go b/protocol/meek/server_test.go new file mode 100644 index 0000000..1bff1c0 --- /dev/null +++ b/protocol/meek/server_test.go @@ -0,0 +1,558 @@ +package meek + +import ( + "bytes" + "context" + "errors" + "io" + "net" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + M "github.com/sagernet/sing/common/metadata" + "github.com/sagernet/sing/common/varbin" + "github.com/sagernet/sing/protocol/socks" + "github.com/sagernet/sing/protocol/socks/socks5" +) + +func TestServer_EndToEndEcho(t *testing.T) { + upstream := newEchoUpstream(t) + t.Cleanup(upstream.Close) + + srv, err := NewServer(ServerConfig{ + Upstream: upstream.addr, + ResponseHoldoff: 30 * time.Millisecond, + SessionIdleTimeout: 2 * time.Second, + }) + if err != nil { + t.Fatalf("NewServer: %v", err) + } + t.Cleanup(func() { _ = srv.Close() }) + + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + + cfg := Config{ + URL: hs.URL, + HTTPClient: hs.Client(), + PollInterval: 20 * time.Millisecond, + } + c, err := Dial(context.Background(), cfg) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { _ = c.Close() }) + + if _, err := c.Write([]byte("hello over meek")); err != nil { + t.Fatalf("Write: %v", err) + } + + got := make([]byte, 64) + if err := c.SetReadDeadline(time.Now().Add(2 * time.Second)); err != nil { + t.Fatalf("SetReadDeadline: %v", err) + } + n, err := c.Read(got) + if err != nil { + t.Fatalf("Read: %v", err) + } + if s := string(got[:n]); s != "hello over meek" { + t.Errorf("got %q; want %q", s, "hello over meek") + } + + if got := srv.SessionCount(); got != 1 { + t.Errorf("SessionCount = %d; want 1", got) + } +} + +func TestServer_LargeBidirectional(t *testing.T) { + upstream := newEchoUpstream(t) + t.Cleanup(upstream.Close) + + srv, err := NewServer(ServerConfig{ + Upstream: upstream.addr, + ResponseHoldoff: 20 * time.Millisecond, + SessionIdleTimeout: 2 * time.Second, + MaxBodyBytes: 4096, + }) + if err != nil { + t.Fatalf("NewServer: %v", err) + } + t.Cleanup(func() { _ = srv.Close() }) + + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + + cfg := Config{ + URL: hs.URL, + HTTPClient: hs.Client(), + PollInterval: 10 * time.Millisecond, + MaxBodyBytes: 4096, + } + c, err := Dial(context.Background(), cfg) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { _ = c.Close() }) + + const payload = "abcdefghijklmnopqrstuvwxyz0123456789" + send := strings.Repeat(payload, 1000) // 36 KB + go func() { + _, _ = c.Write([]byte(send)) + }() + + var recv []byte + c.SetReadDeadline(time.Now().Add(5 * time.Second)) + buf := make([]byte, 8*1024) + for len(recv) < len(send) { + n, err := c.Read(buf) + if err != nil { + t.Fatalf("Read at %d/%d: %v", len(recv), len(send), err) + } + recv = append(recv, buf[:n]...) + } + if string(recv) != send { + t.Errorf("payload mismatch") + } +} + +func TestServer_BadMethod(t *testing.T) { + upstream := newEchoUpstream(t) + t.Cleanup(upstream.Close) + srv, _ := NewServer(ServerConfig{Upstream: upstream.addr}) + t.Cleanup(func() { _ = srv.Close() }) + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + + resp, err := http.Get(hs.URL) + if err != nil { + t.Fatalf("Get: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusMethodNotAllowed { + t.Errorf("status = %d; want 405", resp.StatusCode) + } +} + +func TestServer_MissingSessionID(t *testing.T) { + upstream := newEchoUpstream(t) + t.Cleanup(upstream.Close) + srv, _ := NewServer(ServerConfig{Upstream: upstream.addr}) + t.Cleanup(func() { _ = srv.Close() }) + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + + resp, err := http.Post(hs.URL, "application/octet-stream", strings.NewReader("")) + if err != nil { + t.Fatalf("Post: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Errorf("status = %d; want 400", resp.StatusCode) + } +} + +func TestServer_AuthTokenRequired(t *testing.T) { + upstream := newEchoUpstream(t) + t.Cleanup(upstream.Close) + srv, _ := NewServer(ServerConfig{Upstream: upstream.addr, AuthToken: "s3cret"}) + t.Cleanup(func() { _ = srv.Close() }) + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + + post := func(authHeader string) int { + req, _ := http.NewRequest(http.MethodPost, hs.URL, strings.NewReader("")) + req.Header.Set("X-Session-Id", "abcdef") + if authHeader != "" { + req.Header.Set("X-Meek-Auth", authHeader) + } + resp, err := hs.Client().Do(req) + if err != nil { + t.Fatalf("Do: %v", err) + } + defer resp.Body.Close() + return resp.StatusCode + } + + if got := post(""); got != http.StatusForbidden { + t.Errorf("missing token: status = %d; want 403", got) + } + if got := post("wrong"); got != http.StatusForbidden { + t.Errorf("wrong token: status = %d; want 403", got) + } + if got := post("s3cret"); got == http.StatusForbidden { + t.Errorf("correct token: status = 403; want the request to proceed") + } +} + +func TestServer_UpstreamDialFails(t *testing.T) { + srv, _ := NewServer(ServerConfig{ + Upstream: "127.0.0.1:1", + ResponseHoldoff: 10 * time.Millisecond, + Dialer: func(network, address string) (net.Conn, error) { + return nil, errors.New("synthetic dial failure") + }, + }) + t.Cleanup(func() { _ = srv.Close() }) + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + + req, _ := http.NewRequest(http.MethodPost, hs.URL, strings.NewReader("")) + req.Header.Set("X-Session-Id", "abcdef") + resp, err := hs.Client().Do(req) + if err != nil { + t.Fatalf("Do: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadGateway { + t.Errorf("status = %d; want 502", resp.StatusCode) + } +} + +func TestServer_SessionReap(t *testing.T) { + upstream := newEchoUpstream(t) + t.Cleanup(upstream.Close) + + srv, _ := NewServer(ServerConfig{ + Upstream: upstream.addr, + ResponseHoldoff: 10 * time.Millisecond, + SessionIdleTimeout: 100 * time.Millisecond, + }) + t.Cleanup(func() { _ = srv.Close() }) + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + + req, _ := http.NewRequest(http.MethodPost, hs.URL, strings.NewReader("")) + req.Header.Set("X-Session-Id", "reapme") + resp, _ := hs.Client().Do(req) + if resp != nil { + _ = resp.Body.Close() + } + if got := srv.SessionCount(); got != 1 { + t.Fatalf("after first POST: SessionCount = %d; want 1", got) + } + + deadline := time.Now().Add(2 * time.Second) + for srv.SessionCount() > 0 && time.Now().Before(deadline) { + time.Sleep(20 * time.Millisecond) + } + if got := srv.SessionCount(); got != 0 { + t.Errorf("after idle timeout: SessionCount = %d; want 0", got) + } +} + +// A POST body larger than MaxBodyBytes must be rejected with 413 rather +// than silently truncated and forwarded upstream, which would corrupt +// the tunneled stream. +func TestServer_RejectsOversizedBody(t *testing.T) { + upstream := newEchoUpstream(t) + t.Cleanup(upstream.Close) + srv, _ := NewServer(ServerConfig{Upstream: upstream.addr, MaxBodyBytes: 512}) + t.Cleanup(func() { _ = srv.Close() }) + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + + req, _ := http.NewRequest(http.MethodPost, hs.URL, bytes.NewReader(make([]byte, 1024))) + req.Header.Set("X-Session-Id", "oversized") + resp, err := hs.Client().Do(req) + if err != nil { + t.Fatalf("Do: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusRequestEntityTooLarge { + t.Errorf("status = %d; want 413", resp.StatusCode) + } +} + +// When MaxBodyBytes is small enough that the read-pump cap (MaxBodyBytes*4) +// is below a single upstream read, an empty pending buffer must still +// accept the chunk; otherwise the pump waits forever and downstream bytes +// never flow. Regression for the cap-vs-read-size deadlock. +func TestServer_SmallMaxBodyBytesDelivers(t *testing.T) { + const blob = 64 * 1024 + upstream := newBurstUpstream(t, blob) + t.Cleanup(upstream.Close) + + srv, _ := NewServer(ServerConfig{ + Upstream: upstream.addr, + ResponseHoldoff: 10 * time.Millisecond, + MaxBodyBytes: 1024, // cap = 4096, far below a 64 KiB read + }) + t.Cleanup(func() { _ = srv.Close() }) + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + + c, err := Dial(context.Background(), Config{ + URL: hs.URL, + HTTPClient: hs.Client(), + PollInterval: 10 * time.Millisecond, + }) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { _ = c.Close() }) + + // Send a byte so the session (and its upstream conn) is created; the + // burst upstream then floods its blob back. + if _, err := c.Write([]byte("x")); err != nil { + t.Fatalf("Write: %v", err) + } + + c.SetReadDeadline(time.Now().Add(8 * time.Second)) + buf := make([]byte, 16*1024) + var got int + for got < blob { + n, err := c.Read(buf) + if err != nil { + t.Fatalf("Read at %d/%d: %v", got, blob, err) + } + got += n + } +} + +// The bundled meek outbound opens each session with a SOCKS5 CONNECT to +// the destination over the tunnel. This exercises that full chain — +// client -> meek server -> SOCKS5 upstream -> destination — using the +// same socks.ClientHandshake5 the outbound runs. +func TestServer_SOCKS5ConnectOverTunnel(t *testing.T) { + dest := newEchoUpstream(t) + t.Cleanup(dest.Close) + + proxy := newSOCKS5Proxy(t) + t.Cleanup(proxy.Close) + + srv, _ := NewServer(ServerConfig{ + Upstream: proxy.addr, + ResponseHoldoff: 10 * time.Millisecond, + }) + t.Cleanup(func() { _ = srv.Close() }) + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + + c, err := Dial(context.Background(), Config{ + URL: hs.URL, + HTTPClient: hs.Client(), + PollInterval: 10 * time.Millisecond, + }) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { _ = c.Close() }) + + c.SetDeadline(time.Now().Add(5 * time.Second)) + if _, err := socks.ClientHandshake5(c, socks5.CommandConnect, M.ParseSocksaddr(dest.addr), "", ""); err != nil { + t.Fatalf("SOCKS5 CONNECT over tunnel: %v", err) + } + + if _, err := c.Write([]byte("ping through socks")); err != nil { + t.Fatalf("Write: %v", err) + } + got := make([]byte, 64) + n, err := c.Read(got) + if err != nil { + t.Fatalf("Read: %v", err) + } + if s := string(got[:n]); s != "ping through socks" { + t.Errorf("got %q; want %q", s, "ping through socks") + } +} + +// --- helpers --- + +// burstUpstream writes a fixed-size blob to every accepted connection +// immediately, then holds the connection open. Used to force a single +// large upstream read on the meek server's read pump. +type burstUpstream struct { + listener net.Listener + addr string + blob []byte + wg sync.WaitGroup +} + +func newBurstUpstream(t *testing.T, n int) *burstUpstream { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + u := &burstUpstream{listener: l, addr: l.Addr().String(), blob: bytes.Repeat([]byte("Z"), n)} + u.wg.Add(1) + go func() { + defer u.wg.Done() + for { + conn, err := l.Accept() + if err != nil { + return + } + go func(conn net.Conn) { + _, _ = conn.Write(u.blob) + _, _ = io.Copy(io.Discard, conn) + conn.Close() + }(conn) + } + }() + return u +} + +func (u *burstUpstream) Close() { _ = u.listener.Close(); u.wg.Wait() } + +// socks5Proxy is a minimal no-auth SOCKS5 CONNECT proxy used as a meek +// upstream in tests: it completes the handshake, dials the requested +// destination, and pipes bytes both ways. +type socks5Proxy struct { + listener net.Listener + addr string + wg sync.WaitGroup +} + +func newSOCKS5Proxy(t *testing.T) *socks5Proxy { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + p := &socks5Proxy{listener: l, addr: l.Addr().String()} + p.wg.Add(1) + go func() { + defer p.wg.Done() + for { + conn, err := l.Accept() + if err != nil { + return + } + go p.serve(conn) + } + }() + return p +} + +func (p *socks5Proxy) serve(conn net.Conn) { + defer conn.Close() + reader := varbin.StubReader(conn) + if _, err := socks5.ReadAuthRequest(reader); err != nil { + return + } + if err := socks5.WriteAuthResponse(conn, socks5.AuthResponse{Method: socks5.AuthTypeNotRequired}); err != nil { + return + } + req, err := socks5.ReadRequest(reader) + if err != nil { + return + } + upstream, err := net.Dial("tcp", req.Destination.String()) + if err != nil { + _ = socks5.WriteResponse(conn, socks5.Response{ReplyCode: socks5.ReplyCodeFailure}) + return + } + defer upstream.Close() + if err := socks5.WriteResponse(conn, socks5.Response{ReplyCode: socks5.ReplyCodeSuccess}); err != nil { + return + } + go func() { _, _ = io.Copy(upstream, conn) }() + _, _ = io.Copy(conn, upstream) +} + +func (p *socks5Proxy) Close() { _ = p.listener.Close(); p.wg.Wait() } + +// echoUpstream is a TCP listener that loops every byte back to the sender. +// Used as the meek server's upstream so the client → meek → upstream → meek +// → client round-trip can be verified end-to-end. +type echoUpstream struct { + listener net.Listener + addr string + wg sync.WaitGroup + closed chan struct{} +} + +func newEchoUpstream(t *testing.T) *echoUpstream { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + u := &echoUpstream{ + listener: l, + addr: l.Addr().String(), + closed: make(chan struct{}), + } + u.wg.Add(1) + go u.accept() + return u +} + +func (u *echoUpstream) accept() { + defer u.wg.Done() + for { + c, err := u.listener.Accept() + if err != nil { + return + } + go func(c net.Conn) { + defer c.Close() + _, _ = io.Copy(c, c) + }(c) + } +} + +func (u *echoUpstream) Close() { + select { + case <-u.closed: + return + default: + close(u.closed) + } + _ = u.listener.Close() + u.wg.Wait() +} + +// TestServer_PropagatesUpstreamEOF guards end-of-stream propagation: when the +// upstream closes with nothing left, the server must end the session (410) and the +// client's Read must surface io.EOF rather than polling forever on empty 200s. +func TestServer_PropagatesUpstreamEOF(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + t.Cleanup(func() { _ = ln.Close() }) + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + _ = conn.Close() // accept then drop → upstream EOF + } + }() + + srv, err := NewServer(ServerConfig{ + Upstream: ln.Addr().String(), + ResponseHoldoff: 20 * time.Millisecond, + SessionIdleTimeout: 2 * time.Second, + }) + if err != nil { + t.Fatalf("NewServer: %v", err) + } + t.Cleanup(func() { _ = srv.Close() }) + + hs := httptest.NewServer(srv) + t.Cleanup(hs.Close) + + c, err := Dial(context.Background(), Config{URL: hs.URL, HTTPClient: hs.Client(), PollInterval: 20 * time.Millisecond}) + if err != nil { + t.Fatalf("Dial: %v", err) + } + t.Cleanup(func() { _ = c.Close() }) + + // Read-deadline bounds the test: a hang (the bug) trips the deadline instead, + // which is not io.EOF, so the assertion fails loudly rather than blocking. + if err := c.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil { + t.Fatalf("SetReadDeadline: %v", err) + } + buf := make([]byte, 16) + if _, err := c.Read(buf); !errors.Is(err, io.EOF) { + t.Fatalf("Read err = %v; want io.EOF after upstream close", err) + } +} diff --git a/protocol/register.go b/protocol/register.go index 45e40ca..7c537be 100644 --- a/protocol/register.go +++ b/protocol/register.go @@ -17,6 +17,7 @@ import ( "github.com/getlantern/lantern-box/protocol/algeneva" "github.com/getlantern/lantern-box/protocol/amnezia" "github.com/getlantern/lantern-box/protocol/group" + "github.com/getlantern/lantern-box/protocol/meek" "github.com/getlantern/lantern-box/protocol/outline" "github.com/getlantern/lantern-box/protocol/reflex" "github.com/getlantern/lantern-box/protocol/samizdat" @@ -94,6 +95,7 @@ func registerInbounds(registry *inbound.Registry) { func registerOutbounds(registry *outbound.Registry) { // custom protocol outbounds algeneva.RegisterOutbound(registry) + meek.RegisterOutbound(registry) outline.RegisterOutbound(registry) reflex.RegisterOutbound(registry) samizdat.RegisterOutbound(registry)