From 4464158a3838958c2a5af8020cec4978e8021188 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Fri, 26 Jun 2026 11:27:44 -0600 Subject: [PATCH 1/5] meek: add a first-class meek server inbound MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make meek a registered sing-box inbound so meek-servers can be provisioned through the track/bandit pipeline (one origin VM per route) instead of the hand-deployed standalone cmd/meek-server — and so a meek track has real routes to contribute bandit arms. The inbound serves the existing meek-v1 Server (HTTP polling) on a sing-box listener (plain HTTP — a CDN/Caddy terminates TLS + fronting in front). The bundled meek outbound opens each session with a SOCKS5 CONNECT, so the inbound runs a tiny in-process SOCKS5 acceptor on loopback as the meek server's upstream: it terminates the CONNECT and routes the stream via the sing-box router. This drops the external microsocks dependency — routing rules, the configured outbound, and metrics now apply to meek traffic natively. - option: MeekInboundOptions (ListenOptions + meek tunables + http timeouts) - protocol/meek/inbound.go: Inbound adapter + SOCKS5-terminate-and-route glue - protocol/register.go: register the meek inbound - tests: SOCKS5 CONNECT routes to the right destination + pipes bytes; non-CONNECT is refused, not routed Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01H9beSsYGzUaBhRK5ULmtGr --- option/meek.go | 30 ++++ protocol/meek/inbound.go | 253 ++++++++++++++++++++++++++++++++++ protocol/meek/inbound_test.go | 176 +++++++++++++++++++++++ protocol/register.go | 1 + 4 files changed, 460 insertions(+) create mode 100644 protocol/meek/inbound.go create mode 100644 protocol/meek/inbound_test.go diff --git a/option/meek.go b/option/meek.go index da99d18..c3dab13 100644 --- a/option/meek.go +++ b/option/meek.go @@ -35,3 +35,33 @@ type FrontSpec struct { // MeekHeaders carries fixed-value HTTP headers added to every POST. type MeekHeaders map[string]string + +// MeekInboundOptions configures a meek server inbound: an HTTP meek-v1 endpoint +// (plain HTTP — TLS and CDN fronting are terminated by Caddy/a CDN in front) +// whose tunneled sessions are routed through sing-box. Each session opens with a +// SOCKS5 CONNECT (as the bundled meek outbound sends), which the inbound +// terminates in-process and hands to the router — so no external SOCKS5 proxy +// (microsocks) is required, unlike the standalone cmd/meek-server. +type MeekInboundOptions struct { + option.ListenOptions + + // MaxBodyBytes caps the request + response body per poll. Default 256 KiB + // (must match the client's cap, or bodies truncate). + MaxBodyBytes int `json:"max_body_bytes,omitempty"` + // ResponseHoldoff is how long the server waits for upstream bytes before + // responding (possibly empty). Default "50ms". + ResponseHoldoff string `json:"response_holdoff,omitempty"` + // SessionIdleTimeout drops a session after this long without a poll; should + // be >= 2-3x the client's poll interval. Default "5m". + SessionIdleTimeout string `json:"session_idle_timeout,omitempty"` + // AuthToken, when set, is the shared secret every request must present in + // X-Meek-Auth. Strongly recommended for a public/fronted endpoint — without + // it the server is an open relay. Empty disables the check. + AuthToken string `json:"auth_token,omitempty"` + + // HTTP server timeouts (empty -> defaults). ReadTimeout/WriteTimeout bound a + // single poll; IdleTimeout bounds keep-alive reuse between polls. + ReadTimeout string `json:"read_timeout,omitempty"` + WriteTimeout string `json:"write_timeout,omitempty"` + IdleTimeout string `json:"idle_timeout,omitempty"` +} diff --git a/protocol/meek/inbound.go b/protocol/meek/inbound.go new file mode 100644 index 0000000..337fcaf --- /dev/null +++ b/protocol/meek/inbound.go @@ -0,0 +1,253 @@ +package meek + +import ( + "bufio" + "context" + "errors" + "fmt" + "log/slog" + "net" + "net/http" + "sync" + "time" + + "github.com/sagernet/sing-box/adapter" + "github.com/sagernet/sing-box/adapter/inbound" + "github.com/sagernet/sing-box/common/listener" + "github.com/sagernet/sing-box/common/uot" + "github.com/sagernet/sing-box/log" + M "github.com/sagernet/sing/common/metadata" + "github.com/sagernet/sing/protocol/socks/socks5" + + "github.com/getlantern/lantern-box/constant" + "github.com/getlantern/lantern-box/option" +) + +// RegisterInbound registers the meek inbound adapter with the given registry. +func RegisterInbound(registry *inbound.Registry) { + inbound.Register[option.MeekInboundOptions](registry, constant.TypeMeek, NewInbound) +} + +// Inbound is the sing-box inbound adapter for a meek server. It serves the +// meek-v1 HTTP polling protocol (plain HTTP — a CDN/Caddy terminates TLS and +// fronting in front) and routes each tunneled session through sing-box. +// +// The bundled meek outbound opens every session with a SOCKS5 CONNECT, so the +// inbound runs a tiny in-process SOCKS5 acceptor on loopback as the meek +// server's upstream: it terminates the CONNECT, extracts the destination, and +// hands the post-CONNECT stream to the router. This replaces the external SOCKS5 +// proxy (microsocks) that the standalone cmd/meek-server relies on, so the data +// plane is native sing-box (routing rules, the configured outbound, metrics all +// apply). +type Inbound struct { + inbound.Adapter + ctx context.Context + logger log.ContextLogger + router adapter.ConnectionRouterEx + listener *listener.Listener + tcpListener net.Listener + httpServer *http.Server + meekServer *Server + socksLn net.Listener + closeOnce sync.Once +} + +// NewInbound constructs a meek inbound adapter. +func NewInbound( + ctx context.Context, + router adapter.Router, + logger log.ContextLogger, + tag string, + options option.MeekInboundOptions, +) (adapter.Inbound, error) { + responseHoldoff, err := parseDurationOr(options.ResponseHoldoff, 50*time.Millisecond) + if err != nil { + return nil, fmt.Errorf("meek: response_holdoff: %w", err) + } + sessionIdleTimeout, err := parseDurationOr(options.SessionIdleTimeout, 5*time.Minute) + if err != nil { + return nil, fmt.Errorf("meek: session_idle_timeout: %w", err) + } + readTimeout, err := parseDurationOr(options.ReadTimeout, 30*time.Second) + if err != nil { + return nil, fmt.Errorf("meek: read_timeout: %w", err) + } + writeTimeout, err := parseDurationOr(options.WriteTimeout, 30*time.Second) + if err != nil { + return nil, fmt.Errorf("meek: write_timeout: %w", err) + } + idleTimeout, err := parseDurationOr(options.IdleTimeout, 120*time.Second) + if err != nil { + return nil, fmt.Errorf("meek: idle_timeout: %w", err) + } + + // In-process SOCKS5 acceptor on loopback. The meek server dials this per + // session (replacing microsocks); handleSocks terminates the CONNECT and + // routes through sing-box. Loopback-only, so it isn't externally reachable. + socksLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("meek: loopback socks listener: %w", err) + } + + ib := &Inbound{ + Adapter: inbound.NewAdapter(constant.TypeMeek, tag), + ctx: ctx, + logger: logger, + router: uot.NewRouter(router, logger), + listener: listener.New(listener.Options{ + Context: ctx, + Logger: logger, + Listen: options.ListenOptions, + }), + socksLn: socksLn, + } + + tcpListener, err := ib.listener.ListenTCP() + if err != nil { + _ = socksLn.Close() + return nil, fmt.Errorf("meek: tcp listener: %w", err) + } + ib.tcpListener = tcpListener + + meekServer, err := NewServer(ServerConfig{ + Upstream: socksLn.Addr().String(), + MaxBodyBytes: options.MaxBodyBytes, + ResponseHoldoff: responseHoldoff, + SessionIdleTimeout: sessionIdleTimeout, + AuthToken: options.AuthToken, + Logger: slog.Default(), + }) + if err != nil { + _ = socksLn.Close() + _ = tcpListener.Close() + return nil, fmt.Errorf("meek: server: %w", err) + } + ib.meekServer = meekServer + ib.httpServer = &http.Server{ + Handler: meekServer, + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + IdleTimeout: idleTimeout, + } + return ib, nil +} + +// Start serves the meek HTTP endpoint and the loopback SOCKS5 acceptor. +func (i *Inbound) Start(stage adapter.StartStage) error { + if stage != adapter.StartStateStart { + return nil + } + go i.acceptSocks() + go func() { + if err := i.httpServer.Serve(i.tcpListener); err != nil && !errors.Is(err, http.ErrServerClosed) { + i.logger.Error("meek inbound http server: ", err) + } + }() + return nil +} + +// acceptSocks accepts loopback connections from the meek server and routes each +// through sing-box after terminating its SOCKS5 CONNECT. +func (i *Inbound) acceptSocks() { + for { + conn, err := i.socksLn.Accept() + if err != nil { + return // listener closed on Close() + } + go i.handleSocks(conn) + } +} + +// handleSocks terminates the SOCKS5 no-auth CONNECT the meek outbound sends to +// open a session, then routes the tunneled stream through sing-box. +func (i *Inbound) handleSocks(conn net.Conn) { + // Read through a bufio.Reader: it satisfies socks5's varbin.Reader (Read + + // ReadByte) and preserves any bytes buffered past the request for routing. + br := bufio.NewReader(conn) + + authReq, err := socks5.ReadAuthRequest(br) + if err != nil { + i.logger.Debug("meek inbound: read socks auth: ", err) + _ = conn.Close() + return + } + if !containsByte(authReq.Methods, socks5.AuthTypeNotRequired) { + _ = socks5.WriteAuthResponse(conn, socks5.AuthResponse{Method: 0xFF}) // no acceptable methods + _ = conn.Close() + return + } + if err := socks5.WriteAuthResponse(conn, socks5.AuthResponse{Method: socks5.AuthTypeNotRequired}); err != nil { + _ = conn.Close() + return + } + + req, err := socks5.ReadRequest(br) + if err != nil { + i.logger.Debug("meek inbound: read socks request: ", err) + _ = conn.Close() + return + } + if req.Command != socks5.CommandConnect { + _ = socks5.WriteResponse(conn, socks5.Response{ReplyCode: socks5.ReplyCodeUnsupported}) + _ = conn.Close() + return + } + if err := socks5.WriteResponse(conn, socks5.Response{ + ReplyCode: socks5.ReplyCodeSuccess, + Bind: M.SocksaddrFromNet(conn.LocalAddr()), + }); err != nil { + _ = conn.Close() + return + } + + routed := &bufferedConn{Conn: conn, r: br} + var metadata adapter.InboundContext + metadata.Inbound = i.Tag() + metadata.InboundType = i.Type() + metadata.Source = M.SocksaddrFromNet(conn.RemoteAddr()).Unwrap() + metadata.Destination = req.Destination + i.router.RouteConnectionEx(i.ctx, routed, metadata, func(err error) { + if err != nil { + i.logger.Debug("meek inbound: route ", req.Destination, ": ", err) + } + }) +} + +// Close stops the HTTP server, the meek server (and its sessions), the loopback +// acceptor, and the sing-box listener. +func (i *Inbound) Close() error { + var httpErr, meekErr, socksErr, lnErr error + i.closeOnce.Do(func() { + if i.httpServer != nil { + httpErr = i.httpServer.Close() + } + if i.meekServer != nil { + meekErr = i.meekServer.Close() + } + if i.socksLn != nil { + socksErr = i.socksLn.Close() + } + if i.listener != nil { + lnErr = i.listener.Close() + } + }) + return errors.Join(httpErr, meekErr, socksErr, lnErr) +} + +// bufferedConn reads through a bufio.Reader (so bytes buffered past the SOCKS5 +// request aren't lost) while writes/close/addrs/deadlines pass to the conn. +type bufferedConn struct { + net.Conn + r *bufio.Reader +} + +func (c *bufferedConn) Read(p []byte) (int, error) { return c.r.Read(p) } + +func containsByte(b []byte, v byte) bool { + for _, x := range b { + if x == v { + return true + } + } + return false +} diff --git a/protocol/meek/inbound_test.go b/protocol/meek/inbound_test.go new file mode 100644 index 0000000..7867f7c --- /dev/null +++ b/protocol/meek/inbound_test.go @@ -0,0 +1,176 @@ +package meek + +import ( + "context" + "io" + "net" + "testing" + "time" + + "github.com/sagernet/sing-box/adapter" + "github.com/sagernet/sing-box/adapter/inbound" + "github.com/sagernet/sing-box/log" + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" + "github.com/sagernet/sing/protocol/socks/socks5" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// mockRouter implements adapter.ConnectionRouterEx with a controllable callback. +type mockRouter struct { + onRoute func(ctx context.Context, conn net.Conn, metadata adapter.InboundContext, onClose N.CloseHandlerFunc) +} + +func (m *mockRouter) RouteConnection(context.Context, net.Conn, adapter.InboundContext) error { + return nil +} + +func (m *mockRouter) RoutePacketConnection(context.Context, N.PacketConn, adapter.InboundContext) error { + return nil +} + +func (m *mockRouter) RouteConnectionEx(ctx context.Context, conn net.Conn, metadata adapter.InboundContext, onClose N.CloseHandlerFunc) { + if m.onRoute != nil { + m.onRoute(ctx, conn, metadata, onClose) + } +} + +func (m *mockRouter) RoutePacketConnectionEx(context.Context, N.PacketConn, adapter.InboundContext, N.CloseHandlerFunc) { +} + +// TestInbound_HandleSocks_RoutesConnect drives the SOCKS5 no-auth CONNECT the +// meek outbound sends (over a loopback pair, as the meek server does in prod) +// and checks the inbound terminates it, routes to the right destination, and +// pipes bytes — i.e. the microsocks-free data path works. +func TestInbound_HandleSocks_RoutesConnect(t *testing.T) { + captured := make(chan adapter.InboundContext, 1) + router := &mockRouter{ + onRoute: func(ctx context.Context, conn net.Conn, md adapter.InboundContext, onClose N.CloseHandlerFunc) { + captured <- md + go func() { + defer onClose(nil) + buf := make([]byte, 64) + n, err := conn.Read(buf) + if err == nil && n > 0 { + _, _ = conn.Write(buf[:n]) // echo the app stream back + } + }() + }, + } + ib := &Inbound{ + Adapter: inbound.NewAdapter("meek", "meek-in"), + ctx: context.Background(), + logger: log.NewNOPFactory().Logger(), + router: router, + } + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer ln.Close() + go func() { + s, aerr := ln.Accept() + if aerr == nil { + ib.handleSocks(s) + } + }() + + c, err := net.Dial("tcp", ln.Addr().String()) + require.NoError(t, err) + defer c.Close() + require.NoError(t, c.SetDeadline(time.Now().Add(5*time.Second))) + + // 1. SOCKS5 no-auth method-select. + require.NoError(t, socks5.WriteAuthRequest(c, socks5.AuthRequest{Methods: []byte{socks5.AuthTypeNotRequired}})) + authReply := make([]byte, 2) + require.NoError(t, readFull(c, authReply)) + require.Equal(t, socks5.Version, authReply[0]) + require.Equal(t, socks5.AuthTypeNotRequired, authReply[1]) + + // 2. CONNECT, then consume the reply (header + bound addr + port). + dst := M.ParseSocksaddr("93.184.216.34:443") + require.NoError(t, socks5.WriteRequest(c, socks5.Request{Command: socks5.CommandConnect, Destination: dst})) + require.NoError(t, readConnectReply(t, c)) + + // 3. App bytes round-trip through the routed conn. + _, err = c.Write([]byte("hello meek")) + require.NoError(t, err) + got := make([]byte, len("hello meek")) + require.NoError(t, readFull(c, got)) + assert.Equal(t, "hello meek", string(got)) + + // 4. The router saw the right destination + inbound identity. + md := <-captured + assert.Equal(t, "meek-in", md.Inbound) + assert.Equal(t, "meek", md.InboundType) + assert.Equal(t, "93.184.216.34", md.Destination.Addr.String()) + assert.Equal(t, uint16(443), md.Destination.Port) +} + +// TestInbound_HandleSocks_RejectsNonConnect verifies a non-CONNECT command is +// refused (and the conn closed) rather than routed. +func TestInbound_HandleSocks_RejectsNonConnect(t *testing.T) { + routed := false + ib := &Inbound{ + Adapter: inbound.NewAdapter("meek", "meek-in"), + ctx: context.Background(), + logger: log.NewNOPFactory().Logger(), + router: &mockRouter{onRoute: func(context.Context, net.Conn, adapter.InboundContext, N.CloseHandlerFunc) { routed = true }}, + } + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer ln.Close() + go func() { + if s, aerr := ln.Accept(); aerr == nil { + ib.handleSocks(s) + } + }() + + c, err := net.Dial("tcp", ln.Addr().String()) + require.NoError(t, err) + defer c.Close() + require.NoError(t, c.SetDeadline(time.Now().Add(5*time.Second))) + + require.NoError(t, socks5.WriteAuthRequest(c, socks5.AuthRequest{Methods: []byte{socks5.AuthTypeNotRequired}})) + authReply := make([]byte, 2) + require.NoError(t, readFull(c, authReply)) + + // CommandBind (0x02) is not supported by the inbound. + require.NoError(t, socks5.WriteRequest(c, socks5.Request{Command: socks5.CommandBind, Destination: M.ParseSocksaddr("1.2.3.4:80")})) + head := make([]byte, 4) + require.NoError(t, readFull(c, head)) + assert.Equal(t, socks5.ReplyCodeUnsupported, head[1]) + assert.False(t, routed, "a non-CONNECT command must not be routed") +} + +func readFull(r io.Reader, b []byte) error { + _, err := io.ReadFull(r, b) + return err +} + +// readConnectReply consumes a SOCKS5 CONNECT reply: 4-byte header + bound addr + +// 2-byte port (mirrors the client's socks5ConnectSequenced). +func readConnectReply(t *testing.T, r io.Reader) error { + t.Helper() + head := make([]byte, 4) + if err := readFull(r, head); err != nil { + return err + } + require.Equal(t, socks5.Version, head[0]) + require.Equal(t, socks5.ReplyCodeSuccess, head[1]) + var addrLen int + switch head[3] { + case 0x01: + addrLen = net.IPv4len + case 0x04: + addrLen = net.IPv6len + case 0x03: + lb := make([]byte, 1) + if err := readFull(r, lb); err != nil { + return err + } + addrLen = int(lb[0]) + } + return readFull(r, make([]byte, addrLen+2)) +} diff --git a/protocol/register.go b/protocol/register.go index 7c537be..5950c4f 100644 --- a/protocol/register.go +++ b/protocol/register.go @@ -87,6 +87,7 @@ func RegisterProtocols(ctx context.Context) context.Context { func registerInbounds(registry *inbound.Registry) { algeneva.RegisterInbound(registry) + meek.RegisterInbound(registry) reflex.RegisterInbound(registry) samizdat.RegisterInbound(registry) water.RegisterInbound(registry) From 441ee67b1b5185cbe43e9f9eadab3250f30bfb1b Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Fri, 26 Jun 2026 17:18:14 -0600 Subject: [PATCH 2/5] meek inbound: address review (logger, accept-loop resilience, require auth) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Route meek-server logs through the inbound's sing-box logger via log.NewLogHandler (matches WATER) instead of slog.Default(), so levels/routing stay consistent. (Copilot) - acceptSocks: only return on net.ErrClosed; log + continue on transient Accept errors so a temporary failure can't permanently kill routing while the HTTP endpoint keeps serving. (Copilot + CodeRabbit) - Require auth_token by default — a meek inbound is a public/fronted relay into sing-box, so an empty token is an open relay. New allow_unauthenticated opt-in for deliberate no-auth (test/private). Added a test. (CodeRabbit, security) Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01H9beSsYGzUaBhRK5ULmtGr --- option/meek.go | 10 +++++++--- protocol/meek/inbound.go | 22 ++++++++++++++++++++-- protocol/meek/inbound_test.go | 18 ++++++++++++++++++ 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/option/meek.go b/option/meek.go index c3dab13..e6ebc69 100644 --- a/option/meek.go +++ b/option/meek.go @@ -54,10 +54,14 @@ type MeekInboundOptions struct { // SessionIdleTimeout drops a session after this long without a poll; should // be >= 2-3x the client's poll interval. Default "5m". SessionIdleTimeout string `json:"session_idle_timeout,omitempty"` - // AuthToken, when set, is the shared secret every request must present in - // X-Meek-Auth. Strongly recommended for a public/fronted endpoint — without - // it the server is an open relay. Empty disables the check. + // AuthToken is the shared secret every request must present in X-Meek-Auth. + // Required by default — a meek inbound is a public/fronted relay into sing-box, + // so an empty token is an open relay. To deliberately run without auth (test or + // private deployments), set AllowUnauthenticated. AuthToken string `json:"auth_token,omitempty"` + // AllowUnauthenticated explicitly opts into the no-auth mode (empty AuthToken). + // Off by default so the secure path is the default. + AllowUnauthenticated bool `json:"allow_unauthenticated,omitempty"` // HTTP server timeouts (empty -> defaults). ReadTimeout/WriteTimeout bound a // single poll; IdleTimeout bounds keep-alive reuse between polls. diff --git a/protocol/meek/inbound.go b/protocol/meek/inbound.go index 337fcaf..fd6295b 100644 --- a/protocol/meek/inbound.go +++ b/protocol/meek/inbound.go @@ -20,6 +20,7 @@ import ( "github.com/sagernet/sing/protocol/socks/socks5" "github.com/getlantern/lantern-box/constant" + L "github.com/getlantern/lantern-box/log" "github.com/getlantern/lantern-box/option" ) @@ -60,6 +61,15 @@ func NewInbound( tag string, options option.MeekInboundOptions, ) (adapter.Inbound, error) { + // A meek inbound is a public/fronted relay into sing-box, so an empty auth + // token would make it an open relay. Require one unless the operator explicitly + // opts into the no-auth mode (test/private deployments only). + if options.AuthToken == "" && !options.AllowUnauthenticated { + return nil, errors.New( + "meek: auth_token is required (a meek inbound is a public relay into sing-box); " + + "set auth_token, or set allow_unauthenticated:true to deliberately run an open relay", + ) + } responseHoldoff, err := parseDurationOr(options.ResponseHoldoff, 50*time.Millisecond) if err != nil { return nil, fmt.Errorf("meek: response_holdoff: %w", err) @@ -115,7 +125,9 @@ func NewInbound( ResponseHoldoff: responseHoldoff, SessionIdleTimeout: sessionIdleTimeout, AuthToken: options.AuthToken, - Logger: slog.Default(), + // Route meek-server logs through the inbound's sing-box logger (matches + // WATER), not slog.Default(), so levels/routing stay consistent. + Logger: slog.New(L.NewLogHandler(logger)), }) if err != nil { _ = socksLn.Close() @@ -152,7 +164,13 @@ func (i *Inbound) acceptSocks() { for { conn, err := i.socksLn.Accept() if err != nil { - return // listener closed on Close() + if errors.Is(err, net.ErrClosed) { + return // listener closed on Close() + } + // A transient Accept error must not permanently kill routing while the + // HTTP endpoint keeps serving — log and keep accepting. + i.logger.Error("meek inbound: socks accept: ", err) + continue } go i.handleSocks(conn) } diff --git a/protocol/meek/inbound_test.go b/protocol/meek/inbound_test.go index 7867f7c..ad15718 100644 --- a/protocol/meek/inbound_test.go +++ b/protocol/meek/inbound_test.go @@ -15,8 +15,26 @@ import ( "github.com/sagernet/sing/protocol/socks/socks5" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/getlantern/lantern-box/option" ) +// TestNewInbound_RequiresAuthToken verifies a meek inbound refuses to start as an +// open relay: an empty auth_token errors unless allow_unauthenticated is set. The +// check runs before any listener is bound, so a nil router/empty options is fine. +func TestNewInbound_RequiresAuthToken(t *testing.T) { + _, err := NewInbound(context.Background(), nil, log.NewNOPFactory().Logger(), "meek-in", + option.MeekInboundOptions{}) + assert.Error(t, err, "empty auth_token without allow_unauthenticated must error") + + _, err = NewInbound(context.Background(), nil, log.NewNOPFactory().Logger(), "meek-in", + option.MeekInboundOptions{AllowUnauthenticated: true}) + // With the opt-in, the auth gate passes (any later error is unrelated to auth). + if err != nil { + assert.NotContains(t, err.Error(), "auth_token is required") + } +} + // mockRouter implements adapter.ConnectionRouterEx with a controllable callback. type mockRouter struct { onRoute func(ctx context.Context, conn net.Conn, metadata adapter.InboundContext, onClose N.CloseHandlerFunc) From 51a6aa512022b013a68a3d86fb46acdb4fa9ce28 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Fri, 26 Jun 2026 17:45:34 -0600 Subject: [PATCH 3/5] meek inbound: harden tests (review round 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - TestNewInbound_RequiresAuthToken: capture + t.Cleanup the inbound on the allow_unauthenticated path so it doesn't leak the listeners it opens. - TestInbound_HandleSocks_RoutesConnect: bounded select on the router channel so a routing failure fails fast instead of hanging to the test timeout. - TestInbound_HandleSocks_RejectsNonConnect: make `routed` an atomic.Bool — it's written by the handler goroutine and read by the test (data race under -race). Verified with `go test -race`. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01H9beSsYGzUaBhRK5ULmtGr --- protocol/meek/inbound_test.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/protocol/meek/inbound_test.go b/protocol/meek/inbound_test.go index ad15718..73b9177 100644 --- a/protocol/meek/inbound_test.go +++ b/protocol/meek/inbound_test.go @@ -4,6 +4,7 @@ import ( "context" "io" "net" + "sync/atomic" "testing" "time" @@ -27,11 +28,13 @@ func TestNewInbound_RequiresAuthToken(t *testing.T) { option.MeekInboundOptions{}) assert.Error(t, err, "empty auth_token without allow_unauthenticated must error") - _, err = NewInbound(context.Background(), nil, log.NewNOPFactory().Logger(), "meek-in", + ib, err := NewInbound(context.Background(), nil, log.NewNOPFactory().Logger(), "meek-in", option.MeekInboundOptions{AllowUnauthenticated: true}) // With the opt-in, the auth gate passes (any later error is unrelated to auth). if err != nil { assert.NotContains(t, err.Error(), "auth_token is required") + } else { + t.Cleanup(func() { _ = ib.Close() }) // it opened listeners — don't leak them } } @@ -117,8 +120,14 @@ func TestInbound_HandleSocks_RoutesConnect(t *testing.T) { require.NoError(t, readFull(c, got)) assert.Equal(t, "hello meek", string(got)) - // 4. The router saw the right destination + inbound identity. - md := <-captured + // 4. The router saw the right destination + inbound identity (bounded wait so a + // routing failure surfaces promptly instead of hanging to the test timeout). + var md adapter.InboundContext + select { + case md = <-captured: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for the router to be invoked") + } assert.Equal(t, "meek-in", md.Inbound) assert.Equal(t, "meek", md.InboundType) assert.Equal(t, "93.184.216.34", md.Destination.Addr.String()) @@ -128,12 +137,12 @@ func TestInbound_HandleSocks_RoutesConnect(t *testing.T) { // TestInbound_HandleSocks_RejectsNonConnect verifies a non-CONNECT command is // refused (and the conn closed) rather than routed. func TestInbound_HandleSocks_RejectsNonConnect(t *testing.T) { - routed := false + var routed atomic.Bool // written by the handler goroutine, read by the test ib := &Inbound{ Adapter: inbound.NewAdapter("meek", "meek-in"), ctx: context.Background(), logger: log.NewNOPFactory().Logger(), - router: &mockRouter{onRoute: func(context.Context, net.Conn, adapter.InboundContext, N.CloseHandlerFunc) { routed = true }}, + router: &mockRouter{onRoute: func(context.Context, net.Conn, adapter.InboundContext, N.CloseHandlerFunc) { routed.Store(true) }}, } ln, err := net.Listen("tcp", "127.0.0.1:0") @@ -159,7 +168,7 @@ func TestInbound_HandleSocks_RejectsNonConnect(t *testing.T) { head := make([]byte, 4) require.NoError(t, readFull(c, head)) assert.Equal(t, socks5.ReplyCodeUnsupported, head[1]) - assert.False(t, routed, "a non-CONNECT command must not be routed") + assert.False(t, routed.Load(), "a non-CONNECT command must not be routed") } func readFull(r io.Reader, b []byte) error { From 9cbd7024eb5b91d384181a327a9c3047efadad8c Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Fri, 26 Jun 2026 19:38:16 -0600 Subject: [PATCH 4/5] meek inbound: readConnectReply fails on unknown SOCKS5 ATYP (review round 3) Add a default case so an unexpected address type in the CONNECT reply fails the test instead of silently reading 0 bytes and passing on a malformed reply. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01H9beSsYGzUaBhRK5ULmtGr --- protocol/meek/inbound_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/protocol/meek/inbound_test.go b/protocol/meek/inbound_test.go index 73b9177..01e3f3a 100644 --- a/protocol/meek/inbound_test.go +++ b/protocol/meek/inbound_test.go @@ -2,6 +2,7 @@ package meek import ( "context" + "fmt" "io" "net" "sync/atomic" @@ -198,6 +199,8 @@ func readConnectReply(t *testing.T, r io.Reader) error { return err } addrLen = int(lb[0]) + default: + return fmt.Errorf("unexpected ATYP %#x in connect reply", head[3]) } return readFull(r, make([]byte, addrLen+2)) } From 7081a7b129fe38968a199f5849d28ca348956665 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Sat, 27 Jun 2026 06:46:50 -0600 Subject: [PATCH 5/5] meek inbound: cap acceptSocks backoff on repeated errors (review round 4) Add a capped backoff (failures*10ms, max 1s, reset on success) so a persistent non-close Accept error (e.g. FD exhaustion) can't spin the loop hot and flood the log, while transient errors still recover. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01H9beSsYGzUaBhRK5ULmtGr --- protocol/meek/inbound.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/protocol/meek/inbound.go b/protocol/meek/inbound.go index fd6295b..00b064c 100644 --- a/protocol/meek/inbound.go +++ b/protocol/meek/inbound.go @@ -161,6 +161,7 @@ func (i *Inbound) Start(stage adapter.StartStage) error { // acceptSocks accepts loopback connections from the meek server and routes each // through sing-box after terminating its SOCKS5 CONNECT. func (i *Inbound) acceptSocks() { + var failures int for { conn, err := i.socksLn.Accept() if err != nil { @@ -168,10 +169,16 @@ func (i *Inbound) acceptSocks() { return // listener closed on Close() } // A transient Accept error must not permanently kill routing while the - // HTTP endpoint keeps serving — log and keep accepting. - i.logger.Error("meek inbound: socks accept: ", err) + // HTTP endpoint keeps serving — log and keep accepting, but back off + // (capped at 1s) so a persistent error (e.g. FD exhaustion) can't spin + // hot and flood the log. + failures++ + backoff := min(time.Duration(failures)*10*time.Millisecond, time.Second) + i.logger.Error("meek inbound: socks accept (retry in ", backoff, "): ", err) + time.Sleep(backoff) continue } + failures = 0 go i.handleSocks(conn) } }