From 497c5986942f100fea8c59567d04e093677fbafb Mon Sep 17 00:00:00 2001 From: Vsevolod Strukchinsky Date: Mon, 29 Jun 2026 00:59:51 +0500 Subject: [PATCH 1/2] feat(session): add RequestMux for routing inbound requests by type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RequestMux is the request-stream counterpart of Demux: it replaces the hand-rolled "AcceptRequest loop + type switch + dispatch" a server writes with per-message-type handler registration. Handle(message.Type, h), OnUnknown, and Run(ctx, sess) mirror Demux's API and concurrency contract (synchronous dispatch; handlers spawn a goroutine for long-lived streams). Run surfaces AcceptRequest errors unchanged so the caller can escalate the session-fatal ones (§10.1 Request-ID violations, token-cache faults) by closing the session — the unmatched-type default rejects with REQUEST_ERROR NOT_SUPPORTED. Includes table tests (routing by type, late registration, OnUnknown, and the default reject), an ExampleRequestMux, and a README row. Relay adoption is deferred: its per-type limiter + token-verify pre-dispatch hooks are a separate concern. Co-Authored-By: Claude Opus 4.8 --- README.md | 1 + pkg/moqt/session/example_test.go | 39 +++++++++ pkg/moqt/session/requestmux.go | 106 ++++++++++++++++++++++++ pkg/moqt/session/requestmux_test.go | 120 ++++++++++++++++++++++++++++ 4 files changed, 266 insertions(+) create mode 100644 pkg/moqt/session/requestmux.go create mode 100644 pkg/moqt/session/requestmux_test.go diff --git a/README.md b/README.md index 300ca47..a8dedac 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,7 @@ grouped here by the file they live in: | Publish a track | `ExampleSession_Publish` | | Subscribe to a track | `ExampleSession_Subscribe` | | Route many tracks' data streams | `ExampleDemux` | +| Route inbound requests (server side) | `ExampleRequestMux` | | Joining / standalone FETCH | `ExampleSession_Fetch`, `ExampleSession_Fetch_standalone`, `ExampleIncomingFetchStream` | | Update a live request | `ExampleSession_UpdateRequest` | | End a publication | `Example_endingAPublication` | diff --git a/pkg/moqt/session/example_test.go b/pkg/moqt/session/example_test.go index aa92d4c..c6651b0 100644 --- a/pkg/moqt/session/example_test.go +++ b/pkg/moqt/session/example_test.go @@ -192,6 +192,45 @@ func drain(s *session.IncomingSubgroupStream, label string) { }() } +// Routing inbound requests on the server side. RequestMux is the request-stream +// counterpart of Demux: register a handler per message.Type instead of hand- +// rolling an AcceptRequest loop + type switch. +func ExampleRequestMux() { + var server *session.Session // e.g. from session.Server + ctx := context.Background() + + mux := session.NewRequestMux() + + // A SUBSCRIBE handler keeps the request stream open for the subscription's + // lifetime, so it spawns a goroutine — Run dispatches synchronously, exactly + // like Demux. + mux.Handle(message.TypeSubscribe, func(r *session.Request) { + go func() { + pub, err := r.AcceptSubscribe(nil) // writes SUBSCRIBE_OK, returns a Publication + if err != nil { + return + } + defer pub.Close() + // … push objects via pub.OpenSubgroup … then pub.Done(...). + _ = pub + }() + }) + mux.Handle(message.TypePublishNamespace, func(r *session.Request) { + _ = r.Reply(&message.RequestOK{}) + _ = r.Stream.Close() + }) + + // Optional: without OnUnknown, an unhandled type is rejected NOT_SUPPORTED. + mux.OnUnknown(func(r *session.Request) { + _ = r.RejectError(moqt.RequestNotSupported, "unsupported request type") + }) + + // Run returns when ctx is cancelled or AcceptRequest fails. A session-fatal + // error (e.g. *session.ErrDuplicateRequestID) should be escalated by closing + // the session with the mapped code. + _ = mux.Run(ctx, server) +} + // Backfilling with a Relative Joining FETCH (§10.12.2): a FilterLargestObject // subscription only delivers objects strictly after the live edge, so the // current group is invisible until the next one lands. A joining FETCH keyed diff --git a/pkg/moqt/session/requestmux.go b/pkg/moqt/session/requestmux.go new file mode 100644 index 0000000..c33b976 --- /dev/null +++ b/pkg/moqt/session/requestmux.go @@ -0,0 +1,106 @@ +package session + +import ( + "context" + "sync" + + "github.com/floatdrop/moq-go/pkg/moqt" + "github.com/floatdrop/moq-go/pkg/moqt/message" +) + +// RequestHandler handles one inbound request that a [RequestMux] routed to it by +// the [message.Type] of its first message. It is invoked synchronously by +// [RequestMux.Run]; spawn a goroutine inside it when a request must be serviced +// concurrently with accepting the next one (see [RequestMux.Run]). +type RequestHandler func(*Request) + +// RequestMux routes the requests accepted from a [Session] to per-type handlers, +// replacing the hand-rolled "AcceptRequest loop + type-switch + dispatch" a +// server otherwise writes. It is the request-stream counterpart of [Demux], +// which does the same for inbound data streams. +// +// Requests are dispatched by the [message.Type] of their first message — e.g. +// [message.TypeSubscribe] for an inbound SUBSCRIBE. A request whose type has no +// registered handler is passed to the OnUnknown callback. +// +// Handlers may be registered or replaced at any time, including while +// [RequestMux.Run] is executing. Registration is safe for concurrent use. +// +// The zero value is not ready for use — construct with [NewRequestMux]. +type RequestMux struct { + mu sync.RWMutex + handlers map[message.Type]RequestHandler + onUnknown func(*Request) +} + +// NewRequestMux returns an empty RequestMux ready for handler registration. +func NewRequestMux() *RequestMux { + return &RequestMux{handlers: make(map[message.Type]RequestHandler)} +} + +// Handle registers h for inbound requests whose first message is of type t +// (e.g. [message.TypeSubscribe]). A nil h unregisters t; registering a type +// that already has a handler replaces it. +func (m *RequestMux) Handle(t message.Type, h RequestHandler) { + m.mu.Lock() + defer m.mu.Unlock() + if h == nil { + delete(m.handlers, t) + return + } + m.handlers[t] = h +} + +// OnUnknown sets the callback invoked for an accepted request whose type has no +// registered handler. With no callback set (the default, or a nil f), an +// unmatched request is rejected with REQUEST_ERROR NOT_SUPPORTED and its stream +// FIN'd so it does not leak. +func (m *RequestMux) OnUnknown(f func(*Request)) { + m.mu.Lock() + defer m.mu.Unlock() + m.onUnknown = f +} + +// Run accepts requests from sess and dispatches each to its registered handler +// until ctx is cancelled or [Session.AcceptRequest] returns an error, which Run +// returns. +// +// Some AcceptRequest errors are session-fatal protocol violations — a §10.1 +// Request-ID parity/monotonicity violation (*ErrRequestIDParityViolation / +// *ErrDuplicateRequestID) or a token-cache fault (*TokenCacheError) — that the +// caller MUST escalate by closing the session with the mapped code (see +// [Session.AcceptRequest]). Run surfaces the error unchanged so the caller can +// inspect it with errors.As and Close accordingly. +// +// Dispatch is synchronous: a handler runs to completion before Run accepts the +// next request, mirroring a hand-written accept loop and [Demux.Run]. A handler +// that keeps a request stream open for the lifetime of a subscription therefore +// blocks the loop, so spawn a goroutine inside the handler when requests must be +// serviced concurrently. +func (m *RequestMux) Run(ctx context.Context, sess *Session) error { + for { + req, err := sess.AcceptRequest(ctx) + if err != nil { + return err + } + m.dispatch(req) + } +} + +// dispatch routes one accepted request to its registered handler, or to the +// unknown path when none matches. +func (m *RequestMux) dispatch(req *Request) { + m.mu.RLock() + h := m.handlers[req.First.Type()] + f := m.onUnknown + m.mu.RUnlock() + if h != nil { + h(req) + return + } + if f != nil { + f(req) + return + } + _ = req.RejectError(moqt.RequestNotSupported, "moqt/session: no handler for request type") +} diff --git a/pkg/moqt/session/requestmux_test.go b/pkg/moqt/session/requestmux_test.go new file mode 100644 index 0000000..469a979 --- /dev/null +++ b/pkg/moqt/session/requestmux_test.go @@ -0,0 +1,120 @@ +package session_test + +import ( + "slices" + "testing" + + "github.com/floatdrop/moq-go/pkg/moqt" + "github.com/floatdrop/moq-go/pkg/moqt/message" + "github.com/floatdrop/moq-go/pkg/moqt/session" + "github.com/floatdrop/moq-go/pkg/moqt/wire" +) + +// TestRequestMuxRoutesByType checks that RequestMux dispatches requests by their +// first message's Type, routes an unregistered type to OnUnknown, and honours a +// handler registered after Run has already started. +func TestRequestMuxRoutesByType(t *testing.T) { + t.Parallel() + client, server := openPair(t) + + events := make(chan message.Type, 8) + firstSeen := make(chan struct{}, 1) + record := func(r *session.Request) { + events <- r.First.Type() + _ = r.Stream.Close() + } + + mux := session.NewRequestMux() + mux.Handle(message.TypeSubscribe, func(r *session.Request) { + record(r) + firstSeen <- struct{}{} + }) + mux.Handle(message.TypePublishNamespace, record) + mux.OnUnknown(record) + + go func() { _ = mux.Run(t.Context(), server) }() + + // Client sends even Request IDs (§10.1), strictly increasing. + + // 1. SUBSCRIBE on a registered type. + openRequest(t, client, &message.Subscribe{ + RequestID: 0, + Namespace: wire.Namespace("ns"), + Name: []byte("a"), + Parameters: message.Parameters{message.LargestObjectFilter()}, + }) + <-firstSeen // ensure Run is live before late registration + + // 2. Register TRACK_STATUS *after* Run started, then send one. + mux.Handle(message.TypeTrackStatus, record) + openRequest(t, client, &message.TrackStatus{ + RequestID: 2, + Namespace: wire.Namespace("ns"), + Name: []byte("a"), + }) + + // 3. PUBLISH_NAMESPACE on a registered type. + openRequest(t, client, &message.PublishNamespace{ + RequestID: 4, + Namespace: wire.Namespace("ns"), + }) + + // 4. SUBSCRIBE_NAMESPACE — unregistered → OnUnknown. + openRequest(t, client, &message.SubscribeNamespace{ + RequestID: 6, + TrackNamespacePrefix: wire.Namespace("ns"), + }) + + got := collectEvents(t, events, 4) + for _, want := range []message.Type{ + message.TypeSubscribe, + message.TypeTrackStatus, + message.TypePublishNamespace, + message.TypeSubscribeNamespace, + } { + if !slices.Contains(got, want) { + t.Errorf("missing dispatched type %s; got %v", want, got) + } + } +} + +// TestRequestMuxDefaultRejectsUnhandled verifies that with no OnUnknown set, an +// unhandled request type is rejected with REQUEST_ERROR NOT_SUPPORTED and its +// stream FIN'd, so the requester learns the server cannot serve it. +func TestRequestMuxDefaultRejectsUnhandled(t *testing.T) { + t.Parallel() + client, server := openPair(t) + + mux := session.NewRequestMux() // no handlers, no OnUnknown + go func() { _ = mux.Run(t.Context(), server) }() + + stream, err := client.OpenRequest(&message.TrackStatus{ + RequestID: 0, + Namespace: wire.Namespace("ns"), + Name: []byte("x"), + }) + if err != nil { + t.Fatalf("OpenRequest: %v", err) + } + + resp, err := message.Parse(stream) + if err != nil { + t.Fatalf("read response: %v", err) + } + rerr, ok := resp.(*message.RequestError) + if !ok { + t.Fatalf("got %s, want REQUEST_ERROR", resp.Type()) + } + if rerr.ErrorCode != moqt.RequestNotSupported { + t.Errorf("ErrorCode = %#x, want NOT_SUPPORTED (%#x)", rerr.ErrorCode, moqt.RequestNotSupported) + } +} + +// openRequest opens a bidi request stream carrying first as its initial message +// and leaves it open (the mux handler or session cleanup tears it down). +func openRequest(t *testing.T, s *session.Session, first message.Message) { + t.Helper() + if _, err := s.OpenRequest(first); err != nil { + t.Fatalf("OpenRequest(%s): %v", first.Type(), err) + } +} From e4257329cff3afcd820675a5fa3d93b6ab44c36e Mon Sep 17 00:00:00 2001 From: Vsevolod Strukchinsky Date: Mon, 29 Jun 2026 01:13:16 +0500 Subject: [PATCH 2/2] feat(session): add generic HandleType for typed RequestMux handlers HandleType[T](mux, func(*Request, T)) registers a handler keyed by T's message.Type and hands the handler the already-asserted typed message, so callers don't repeat req.First.(*message.X). It's a free function because Go methods can't take type parameters; the type key is derived from a zero T (the message Type() methods are constant returns, safe on a nil pointer). Adds TestRequestMuxHandleType and switches the SUBSCRIBE branch of ExampleRequestMux to HandleType. Co-Authored-By: Claude Opus 4.8 --- pkg/moqt/session/example_test.go | 5 +++-- pkg/moqt/session/requestmux.go | 22 ++++++++++++++++++++ pkg/moqt/session/requestmux_test.go | 31 +++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/pkg/moqt/session/example_test.go b/pkg/moqt/session/example_test.go index c6651b0..4ce0e78 100644 --- a/pkg/moqt/session/example_test.go +++ b/pkg/moqt/session/example_test.go @@ -201,10 +201,11 @@ func ExampleRequestMux() { mux := session.NewRequestMux() - // A SUBSCRIBE handler keeps the request stream open for the subscription's + // HandleType hands the handler the already-asserted typed message. A + // SUBSCRIBE handler keeps the request stream open for the subscription's // lifetime, so it spawns a goroutine — Run dispatches synchronously, exactly // like Demux. - mux.Handle(message.TypeSubscribe, func(r *session.Request) { + session.HandleType(mux, func(r *session.Request, _ *message.Subscribe) { go func() { pub, err := r.AcceptSubscribe(nil) // writes SUBSCRIBE_OK, returns a Publication if err != nil { diff --git a/pkg/moqt/session/requestmux.go b/pkg/moqt/session/requestmux.go index c33b976..521756f 100644 --- a/pkg/moqt/session/requestmux.go +++ b/pkg/moqt/session/requestmux.go @@ -51,6 +51,28 @@ func (m *RequestMux) Handle(t message.Type, h RequestHandler) { m.handlers[t] = h } +// HandleType registers h for inbound requests whose first message is the +// concrete type T (e.g. *message.Subscribe), handing h the already-asserted +// typed message alongside the [*Request]. It is the generic form of +// [RequestMux.Handle]: the [message.Type] key is derived from T, and the type +// assertion a Handle callback would otherwise repeat (req.First.(*message.X)) is +// done once, here. +// +// Go methods cannot take type parameters, so this is a free function taking the +// mux as its first argument. A nil h unregisters T's type; registering a type +// that already has a handler replaces it. +func HandleType[T message.WithRequestID](m *RequestMux, h func(*Request, T)) { + var zero T // nil pointer; message Type() methods are constant returns + if h == nil { + m.Handle(zero.Type(), nil) + return + } + m.Handle(zero.Type(), func(req *Request) { + msg, _ := req.First.(T) + h(req, msg) + }) +} + // OnUnknown sets the callback invoked for an accepted request whose type has no // registered handler. With no callback set (the default, or a nil f), an // unmatched request is rejected with REQUEST_ERROR NOT_SUPPORTED and its stream diff --git a/pkg/moqt/session/requestmux_test.go b/pkg/moqt/session/requestmux_test.go index 469a979..4218aec 100644 --- a/pkg/moqt/session/requestmux_test.go +++ b/pkg/moqt/session/requestmux_test.go @@ -3,6 +3,7 @@ package session_test import ( "slices" "testing" + "time" "github.com/floatdrop/moq-go/pkg/moqt" "github.com/floatdrop/moq-go/pkg/moqt/message" @@ -78,6 +79,36 @@ func TestRequestMuxRoutesByType(t *testing.T) { } } +// TestRequestMuxHandleType checks that HandleType derives the message.Type key +// from T and hands the handler the already-asserted typed message. +func TestRequestMuxHandleType(t *testing.T) { + t.Parallel() + client, server := openPair(t) + + got := make(chan string, 1) + mux := session.NewRequestMux() + session.HandleType(mux, func(_ *session.Request, msg *message.Subscribe) { + got <- string(msg.Name) // typed access without a manual assertion + }) + go func() { _ = mux.Run(t.Context(), server) }() + + openRequest(t, client, &message.Subscribe{ + RequestID: 0, + Namespace: wire.Namespace("ns"), + Name: []byte("typed-track"), + Parameters: message.Parameters{message.LargestObjectFilter()}, + }) + + select { + case name := <-got: + if name != "typed-track" { + t.Errorf("handler saw Name = %q, want %q", name, "typed-track") + } + case <-time.After(2 * time.Second): + t.Fatal("typed handler was not invoked") + } +} + // TestRequestMuxDefaultRejectsUnhandled verifies that with no OnUnknown set, an // unhandled request type is rejected with REQUEST_ERROR NOT_SUPPORTED and its // stream FIN'd, so the requester learns the server cannot serve it.