diff --git a/option/meek.go b/option/meek.go index da99d18..e6ebc69 100644 --- a/option/meek.go +++ b/option/meek.go @@ -35,3 +35,37 @@ type FrontSpec struct { // MeekHeaders carries fixed-value HTTP headers added to every POST. type MeekHeaders map[string]string + +// MeekInboundOptions configures a meek server inbound: an HTTP meek-v1 endpoint +// (plain HTTP — TLS and CDN fronting are terminated by Caddy/a CDN in front) +// whose tunneled sessions are routed through sing-box. Each session opens with a +// SOCKS5 CONNECT (as the bundled meek outbound sends), which the inbound +// terminates in-process and hands to the router — so no external SOCKS5 proxy +// (microsocks) is required, unlike the standalone cmd/meek-server. +type MeekInboundOptions struct { + option.ListenOptions + + // MaxBodyBytes caps the request + response body per poll. Default 256 KiB + // (must match the client's cap, or bodies truncate). + MaxBodyBytes int `json:"max_body_bytes,omitempty"` + // ResponseHoldoff is how long the server waits for upstream bytes before + // responding (possibly empty). Default "50ms". + ResponseHoldoff string `json:"response_holdoff,omitempty"` + // SessionIdleTimeout drops a session after this long without a poll; should + // be >= 2-3x the client's poll interval. Default "5m". + SessionIdleTimeout string `json:"session_idle_timeout,omitempty"` + // AuthToken is the shared secret every request must present in X-Meek-Auth. + // Required by default — a meek inbound is a public/fronted relay into sing-box, + // so an empty token is an open relay. To deliberately run without auth (test or + // private deployments), set AllowUnauthenticated. + AuthToken string `json:"auth_token,omitempty"` + // AllowUnauthenticated explicitly opts into the no-auth mode (empty AuthToken). + // Off by default so the secure path is the default. + AllowUnauthenticated bool `json:"allow_unauthenticated,omitempty"` + + // HTTP server timeouts (empty -> defaults). ReadTimeout/WriteTimeout bound a + // single poll; IdleTimeout bounds keep-alive reuse between polls. + ReadTimeout string `json:"read_timeout,omitempty"` + WriteTimeout string `json:"write_timeout,omitempty"` + IdleTimeout string `json:"idle_timeout,omitempty"` +} diff --git a/protocol/meek/inbound.go b/protocol/meek/inbound.go new file mode 100644 index 0000000..00b064c --- /dev/null +++ b/protocol/meek/inbound.go @@ -0,0 +1,278 @@ +package meek + +import ( + "bufio" + "context" + "errors" + "fmt" + "log/slog" + "net" + "net/http" + "sync" + "time" + + "github.com/sagernet/sing-box/adapter" + "github.com/sagernet/sing-box/adapter/inbound" + "github.com/sagernet/sing-box/common/listener" + "github.com/sagernet/sing-box/common/uot" + "github.com/sagernet/sing-box/log" + M "github.com/sagernet/sing/common/metadata" + "github.com/sagernet/sing/protocol/socks/socks5" + + "github.com/getlantern/lantern-box/constant" + L "github.com/getlantern/lantern-box/log" + "github.com/getlantern/lantern-box/option" +) + +// RegisterInbound registers the meek inbound adapter with the given registry. +func RegisterInbound(registry *inbound.Registry) { + inbound.Register[option.MeekInboundOptions](registry, constant.TypeMeek, NewInbound) +} + +// Inbound is the sing-box inbound adapter for a meek server. It serves the +// meek-v1 HTTP polling protocol (plain HTTP — a CDN/Caddy terminates TLS and +// fronting in front) and routes each tunneled session through sing-box. +// +// The bundled meek outbound opens every session with a SOCKS5 CONNECT, so the +// inbound runs a tiny in-process SOCKS5 acceptor on loopback as the meek +// server's upstream: it terminates the CONNECT, extracts the destination, and +// hands the post-CONNECT stream to the router. This replaces the external SOCKS5 +// proxy (microsocks) that the standalone cmd/meek-server relies on, so the data +// plane is native sing-box (routing rules, the configured outbound, metrics all +// apply). +type Inbound struct { + inbound.Adapter + ctx context.Context + logger log.ContextLogger + router adapter.ConnectionRouterEx + listener *listener.Listener + tcpListener net.Listener + httpServer *http.Server + meekServer *Server + socksLn net.Listener + closeOnce sync.Once +} + +// NewInbound constructs a meek inbound adapter. +func NewInbound( + ctx context.Context, + router adapter.Router, + logger log.ContextLogger, + tag string, + options option.MeekInboundOptions, +) (adapter.Inbound, error) { + // A meek inbound is a public/fronted relay into sing-box, so an empty auth + // token would make it an open relay. Require one unless the operator explicitly + // opts into the no-auth mode (test/private deployments only). + if options.AuthToken == "" && !options.AllowUnauthenticated { + return nil, errors.New( + "meek: auth_token is required (a meek inbound is a public relay into sing-box); " + + "set auth_token, or set allow_unauthenticated:true to deliberately run an open relay", + ) + } + responseHoldoff, err := parseDurationOr(options.ResponseHoldoff, 50*time.Millisecond) + if err != nil { + return nil, fmt.Errorf("meek: response_holdoff: %w", err) + } + sessionIdleTimeout, err := parseDurationOr(options.SessionIdleTimeout, 5*time.Minute) + if err != nil { + return nil, fmt.Errorf("meek: session_idle_timeout: %w", err) + } + readTimeout, err := parseDurationOr(options.ReadTimeout, 30*time.Second) + if err != nil { + return nil, fmt.Errorf("meek: read_timeout: %w", err) + } + writeTimeout, err := parseDurationOr(options.WriteTimeout, 30*time.Second) + if err != nil { + return nil, fmt.Errorf("meek: write_timeout: %w", err) + } + idleTimeout, err := parseDurationOr(options.IdleTimeout, 120*time.Second) + if err != nil { + return nil, fmt.Errorf("meek: idle_timeout: %w", err) + } + + // In-process SOCKS5 acceptor on loopback. The meek server dials this per + // session (replacing microsocks); handleSocks terminates the CONNECT and + // routes through sing-box. Loopback-only, so it isn't externally reachable. + socksLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("meek: loopback socks listener: %w", err) + } + + ib := &Inbound{ + Adapter: inbound.NewAdapter(constant.TypeMeek, tag), + ctx: ctx, + logger: logger, + router: uot.NewRouter(router, logger), + listener: listener.New(listener.Options{ + Context: ctx, + Logger: logger, + Listen: options.ListenOptions, + }), + socksLn: socksLn, + } + + tcpListener, err := ib.listener.ListenTCP() + if err != nil { + _ = socksLn.Close() + return nil, fmt.Errorf("meek: tcp listener: %w", err) + } + ib.tcpListener = tcpListener + + meekServer, err := NewServer(ServerConfig{ + Upstream: socksLn.Addr().String(), + MaxBodyBytes: options.MaxBodyBytes, + ResponseHoldoff: responseHoldoff, + SessionIdleTimeout: sessionIdleTimeout, + AuthToken: options.AuthToken, + // Route meek-server logs through the inbound's sing-box logger (matches + // WATER), not slog.Default(), so levels/routing stay consistent. + Logger: slog.New(L.NewLogHandler(logger)), + }) + if err != nil { + _ = socksLn.Close() + _ = tcpListener.Close() + return nil, fmt.Errorf("meek: server: %w", err) + } + ib.meekServer = meekServer + ib.httpServer = &http.Server{ + Handler: meekServer, + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + IdleTimeout: idleTimeout, + } + return ib, nil +} + +// Start serves the meek HTTP endpoint and the loopback SOCKS5 acceptor. +func (i *Inbound) Start(stage adapter.StartStage) error { + if stage != adapter.StartStateStart { + return nil + } + go i.acceptSocks() + go func() { + if err := i.httpServer.Serve(i.tcpListener); err != nil && !errors.Is(err, http.ErrServerClosed) { + i.logger.Error("meek inbound http server: ", err) + } + }() + return nil +} + +// acceptSocks accepts loopback connections from the meek server and routes each +// through sing-box after terminating its SOCKS5 CONNECT. +func (i *Inbound) acceptSocks() { + var failures int + for { + conn, err := i.socksLn.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + return // listener closed on Close() + } + // A transient Accept error must not permanently kill routing while the + // HTTP endpoint keeps serving — log and keep accepting, but back off + // (capped at 1s) so a persistent error (e.g. FD exhaustion) can't spin + // hot and flood the log. + failures++ + backoff := min(time.Duration(failures)*10*time.Millisecond, time.Second) + i.logger.Error("meek inbound: socks accept (retry in ", backoff, "): ", err) + time.Sleep(backoff) + continue + } + failures = 0 + go i.handleSocks(conn) + } +} + +// handleSocks terminates the SOCKS5 no-auth CONNECT the meek outbound sends to +// open a session, then routes the tunneled stream through sing-box. +func (i *Inbound) handleSocks(conn net.Conn) { + // Read through a bufio.Reader: it satisfies socks5's varbin.Reader (Read + + // ReadByte) and preserves any bytes buffered past the request for routing. + br := bufio.NewReader(conn) + + authReq, err := socks5.ReadAuthRequest(br) + if err != nil { + i.logger.Debug("meek inbound: read socks auth: ", err) + _ = conn.Close() + return + } + if !containsByte(authReq.Methods, socks5.AuthTypeNotRequired) { + _ = socks5.WriteAuthResponse(conn, socks5.AuthResponse{Method: 0xFF}) // no acceptable methods + _ = conn.Close() + return + } + if err := socks5.WriteAuthResponse(conn, socks5.AuthResponse{Method: socks5.AuthTypeNotRequired}); err != nil { + _ = conn.Close() + return + } + + req, err := socks5.ReadRequest(br) + if err != nil { + i.logger.Debug("meek inbound: read socks request: ", err) + _ = conn.Close() + return + } + if req.Command != socks5.CommandConnect { + _ = socks5.WriteResponse(conn, socks5.Response{ReplyCode: socks5.ReplyCodeUnsupported}) + _ = conn.Close() + return + } + if err := socks5.WriteResponse(conn, socks5.Response{ + ReplyCode: socks5.ReplyCodeSuccess, + Bind: M.SocksaddrFromNet(conn.LocalAddr()), + }); err != nil { + _ = conn.Close() + return + } + + routed := &bufferedConn{Conn: conn, r: br} + var metadata adapter.InboundContext + metadata.Inbound = i.Tag() + metadata.InboundType = i.Type() + metadata.Source = M.SocksaddrFromNet(conn.RemoteAddr()).Unwrap() + metadata.Destination = req.Destination + i.router.RouteConnectionEx(i.ctx, routed, metadata, func(err error) { + if err != nil { + i.logger.Debug("meek inbound: route ", req.Destination, ": ", err) + } + }) +} + +// Close stops the HTTP server, the meek server (and its sessions), the loopback +// acceptor, and the sing-box listener. +func (i *Inbound) Close() error { + var httpErr, meekErr, socksErr, lnErr error + i.closeOnce.Do(func() { + if i.httpServer != nil { + httpErr = i.httpServer.Close() + } + if i.meekServer != nil { + meekErr = i.meekServer.Close() + } + if i.socksLn != nil { + socksErr = i.socksLn.Close() + } + if i.listener != nil { + lnErr = i.listener.Close() + } + }) + return errors.Join(httpErr, meekErr, socksErr, lnErr) +} + +// bufferedConn reads through a bufio.Reader (so bytes buffered past the SOCKS5 +// request aren't lost) while writes/close/addrs/deadlines pass to the conn. +type bufferedConn struct { + net.Conn + r *bufio.Reader +} + +func (c *bufferedConn) Read(p []byte) (int, error) { return c.r.Read(p) } + +func containsByte(b []byte, v byte) bool { + for _, x := range b { + if x == v { + return true + } + } + return false +} diff --git a/protocol/meek/inbound_test.go b/protocol/meek/inbound_test.go new file mode 100644 index 0000000..01e3f3a --- /dev/null +++ b/protocol/meek/inbound_test.go @@ -0,0 +1,206 @@ +package meek + +import ( + "context" + "fmt" + "io" + "net" + "sync/atomic" + "testing" + "time" + + "github.com/sagernet/sing-box/adapter" + "github.com/sagernet/sing-box/adapter/inbound" + "github.com/sagernet/sing-box/log" + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" + "github.com/sagernet/sing/protocol/socks/socks5" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/getlantern/lantern-box/option" +) + +// TestNewInbound_RequiresAuthToken verifies a meek inbound refuses to start as an +// open relay: an empty auth_token errors unless allow_unauthenticated is set. The +// check runs before any listener is bound, so a nil router/empty options is fine. +func TestNewInbound_RequiresAuthToken(t *testing.T) { + _, err := NewInbound(context.Background(), nil, log.NewNOPFactory().Logger(), "meek-in", + option.MeekInboundOptions{}) + assert.Error(t, err, "empty auth_token without allow_unauthenticated must error") + + ib, err := NewInbound(context.Background(), nil, log.NewNOPFactory().Logger(), "meek-in", + option.MeekInboundOptions{AllowUnauthenticated: true}) + // With the opt-in, the auth gate passes (any later error is unrelated to auth). + if err != nil { + assert.NotContains(t, err.Error(), "auth_token is required") + } else { + t.Cleanup(func() { _ = ib.Close() }) // it opened listeners — don't leak them + } +} + +// mockRouter implements adapter.ConnectionRouterEx with a controllable callback. +type mockRouter struct { + onRoute func(ctx context.Context, conn net.Conn, metadata adapter.InboundContext, onClose N.CloseHandlerFunc) +} + +func (m *mockRouter) RouteConnection(context.Context, net.Conn, adapter.InboundContext) error { + return nil +} + +func (m *mockRouter) RoutePacketConnection(context.Context, N.PacketConn, adapter.InboundContext) error { + return nil +} + +func (m *mockRouter) RouteConnectionEx(ctx context.Context, conn net.Conn, metadata adapter.InboundContext, onClose N.CloseHandlerFunc) { + if m.onRoute != nil { + m.onRoute(ctx, conn, metadata, onClose) + } +} + +func (m *mockRouter) RoutePacketConnectionEx(context.Context, N.PacketConn, adapter.InboundContext, N.CloseHandlerFunc) { +} + +// TestInbound_HandleSocks_RoutesConnect drives the SOCKS5 no-auth CONNECT the +// meek outbound sends (over a loopback pair, as the meek server does in prod) +// and checks the inbound terminates it, routes to the right destination, and +// pipes bytes — i.e. the microsocks-free data path works. +func TestInbound_HandleSocks_RoutesConnect(t *testing.T) { + captured := make(chan adapter.InboundContext, 1) + router := &mockRouter{ + onRoute: func(ctx context.Context, conn net.Conn, md adapter.InboundContext, onClose N.CloseHandlerFunc) { + captured <- md + go func() { + defer onClose(nil) + buf := make([]byte, 64) + n, err := conn.Read(buf) + if err == nil && n > 0 { + _, _ = conn.Write(buf[:n]) // echo the app stream back + } + }() + }, + } + ib := &Inbound{ + Adapter: inbound.NewAdapter("meek", "meek-in"), + ctx: context.Background(), + logger: log.NewNOPFactory().Logger(), + router: router, + } + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer ln.Close() + go func() { + s, aerr := ln.Accept() + if aerr == nil { + ib.handleSocks(s) + } + }() + + c, err := net.Dial("tcp", ln.Addr().String()) + require.NoError(t, err) + defer c.Close() + require.NoError(t, c.SetDeadline(time.Now().Add(5*time.Second))) + + // 1. SOCKS5 no-auth method-select. + require.NoError(t, socks5.WriteAuthRequest(c, socks5.AuthRequest{Methods: []byte{socks5.AuthTypeNotRequired}})) + authReply := make([]byte, 2) + require.NoError(t, readFull(c, authReply)) + require.Equal(t, socks5.Version, authReply[0]) + require.Equal(t, socks5.AuthTypeNotRequired, authReply[1]) + + // 2. CONNECT, then consume the reply (header + bound addr + port). + dst := M.ParseSocksaddr("93.184.216.34:443") + require.NoError(t, socks5.WriteRequest(c, socks5.Request{Command: socks5.CommandConnect, Destination: dst})) + require.NoError(t, readConnectReply(t, c)) + + // 3. App bytes round-trip through the routed conn. + _, err = c.Write([]byte("hello meek")) + require.NoError(t, err) + got := make([]byte, len("hello meek")) + require.NoError(t, readFull(c, got)) + assert.Equal(t, "hello meek", string(got)) + + // 4. The router saw the right destination + inbound identity (bounded wait so a + // routing failure surfaces promptly instead of hanging to the test timeout). + var md adapter.InboundContext + select { + case md = <-captured: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for the router to be invoked") + } + assert.Equal(t, "meek-in", md.Inbound) + assert.Equal(t, "meek", md.InboundType) + assert.Equal(t, "93.184.216.34", md.Destination.Addr.String()) + assert.Equal(t, uint16(443), md.Destination.Port) +} + +// TestInbound_HandleSocks_RejectsNonConnect verifies a non-CONNECT command is +// refused (and the conn closed) rather than routed. +func TestInbound_HandleSocks_RejectsNonConnect(t *testing.T) { + var routed atomic.Bool // written by the handler goroutine, read by the test + ib := &Inbound{ + Adapter: inbound.NewAdapter("meek", "meek-in"), + ctx: context.Background(), + logger: log.NewNOPFactory().Logger(), + router: &mockRouter{onRoute: func(context.Context, net.Conn, adapter.InboundContext, N.CloseHandlerFunc) { routed.Store(true) }}, + } + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer ln.Close() + go func() { + if s, aerr := ln.Accept(); aerr == nil { + ib.handleSocks(s) + } + }() + + c, err := net.Dial("tcp", ln.Addr().String()) + require.NoError(t, err) + defer c.Close() + require.NoError(t, c.SetDeadline(time.Now().Add(5*time.Second))) + + require.NoError(t, socks5.WriteAuthRequest(c, socks5.AuthRequest{Methods: []byte{socks5.AuthTypeNotRequired}})) + authReply := make([]byte, 2) + require.NoError(t, readFull(c, authReply)) + + // CommandBind (0x02) is not supported by the inbound. + require.NoError(t, socks5.WriteRequest(c, socks5.Request{Command: socks5.CommandBind, Destination: M.ParseSocksaddr("1.2.3.4:80")})) + head := make([]byte, 4) + require.NoError(t, readFull(c, head)) + assert.Equal(t, socks5.ReplyCodeUnsupported, head[1]) + assert.False(t, routed.Load(), "a non-CONNECT command must not be routed") +} + +func readFull(r io.Reader, b []byte) error { + _, err := io.ReadFull(r, b) + return err +} + +// readConnectReply consumes a SOCKS5 CONNECT reply: 4-byte header + bound addr + +// 2-byte port (mirrors the client's socks5ConnectSequenced). +func readConnectReply(t *testing.T, r io.Reader) error { + t.Helper() + head := make([]byte, 4) + if err := readFull(r, head); err != nil { + return err + } + require.Equal(t, socks5.Version, head[0]) + require.Equal(t, socks5.ReplyCodeSuccess, head[1]) + var addrLen int + switch head[3] { + case 0x01: + addrLen = net.IPv4len + case 0x04: + addrLen = net.IPv6len + case 0x03: + lb := make([]byte, 1) + if err := readFull(r, lb); err != nil { + return err + } + addrLen = int(lb[0]) + default: + return fmt.Errorf("unexpected ATYP %#x in connect reply", head[3]) + } + return readFull(r, make([]byte, addrLen+2)) +} diff --git a/protocol/register.go b/protocol/register.go index 7c537be..5950c4f 100644 --- a/protocol/register.go +++ b/protocol/register.go @@ -87,6 +87,7 @@ func RegisterProtocols(ctx context.Context) context.Context { func registerInbounds(registry *inbound.Registry) { algeneva.RegisterInbound(registry) + meek.RegisterInbound(registry) reflex.RegisterInbound(registry) samizdat.RegisterInbound(registry) water.RegisterInbound(registry)