diff --git a/README.md b/README.md index 300ca47..a8dedac 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,7 @@ grouped here by the file they live in: | Publish a track | `ExampleSession_Publish` | | Subscribe to a track | `ExampleSession_Subscribe` | | Route many tracks' data streams | `ExampleDemux` | +| Route inbound requests (server side) | `ExampleRequestMux` | | Joining / standalone FETCH | `ExampleSession_Fetch`, `ExampleSession_Fetch_standalone`, `ExampleIncomingFetchStream` | | Update a live request | `ExampleSession_UpdateRequest` | | End a publication | `Example_endingAPublication` | diff --git a/pkg/moqt/session/example_test.go b/pkg/moqt/session/example_test.go index aa92d4c..4ce0e78 100644 --- a/pkg/moqt/session/example_test.go +++ b/pkg/moqt/session/example_test.go @@ -192,6 +192,46 @@ func drain(s *session.IncomingSubgroupStream, label string) { }() } +// Routing inbound requests on the server side. RequestMux is the request-stream +// counterpart of Demux: register a handler per message.Type instead of hand- +// rolling an AcceptRequest loop + type switch. +func ExampleRequestMux() { + var server *session.Session // e.g. from session.Server + ctx := context.Background() + + mux := session.NewRequestMux() + + // HandleType hands the handler the already-asserted typed message. A + // SUBSCRIBE handler keeps the request stream open for the subscription's + // lifetime, so it spawns a goroutine — Run dispatches synchronously, exactly + // like Demux. + session.HandleType(mux, func(r *session.Request, _ *message.Subscribe) { + go func() { + pub, err := r.AcceptSubscribe(nil) // writes SUBSCRIBE_OK, returns a Publication + if err != nil { + return + } + defer pub.Close() + // … push objects via pub.OpenSubgroup … then pub.Done(...). + _ = pub + }() + }) + mux.Handle(message.TypePublishNamespace, func(r *session.Request) { + _ = r.Reply(&message.RequestOK{}) + _ = r.Stream.Close() + }) + + // Optional: without OnUnknown, an unhandled type is rejected NOT_SUPPORTED. + mux.OnUnknown(func(r *session.Request) { + _ = r.RejectError(moqt.RequestNotSupported, "unsupported request type") + }) + + // Run returns when ctx is cancelled or AcceptRequest fails. A session-fatal + // error (e.g. *session.ErrDuplicateRequestID) should be escalated by closing + // the session with the mapped code. + _ = mux.Run(ctx, server) +} + // Backfilling with a Relative Joining FETCH (§10.12.2): a FilterLargestObject // subscription only delivers objects strictly after the live edge, so the // current group is invisible until the next one lands. A joining FETCH keyed diff --git a/pkg/moqt/session/requestmux.go b/pkg/moqt/session/requestmux.go new file mode 100644 index 0000000..521756f --- /dev/null +++ b/pkg/moqt/session/requestmux.go @@ -0,0 +1,128 @@ +package session + +import ( + "context" + "sync" + + "github.com/floatdrop/moq-go/pkg/moqt" + "github.com/floatdrop/moq-go/pkg/moqt/message" +) + +// RequestHandler handles one inbound request that a [RequestMux] routed to it by +// the [message.Type] of its first message. It is invoked synchronously by +// [RequestMux.Run]; spawn a goroutine inside it when a request must be serviced +// concurrently with accepting the next one (see [RequestMux.Run]). +type RequestHandler func(*Request) + +// RequestMux routes the requests accepted from a [Session] to per-type handlers, +// replacing the hand-rolled "AcceptRequest loop + type-switch + dispatch" a +// server otherwise writes. It is the request-stream counterpart of [Demux], +// which does the same for inbound data streams. +// +// Requests are dispatched by the [message.Type] of their first message — e.g. +// [message.TypeSubscribe] for an inbound SUBSCRIBE. A request whose type has no +// registered handler is passed to the OnUnknown callback. +// +// Handlers may be registered or replaced at any time, including while +// [RequestMux.Run] is executing. Registration is safe for concurrent use. +// +// The zero value is not ready for use — construct with [NewRequestMux]. +type RequestMux struct { + mu sync.RWMutex + handlers map[message.Type]RequestHandler + onUnknown func(*Request) +} + +// NewRequestMux returns an empty RequestMux ready for handler registration. +func NewRequestMux() *RequestMux { + return &RequestMux{handlers: make(map[message.Type]RequestHandler)} +} + +// Handle registers h for inbound requests whose first message is of type t +// (e.g. [message.TypeSubscribe]). A nil h unregisters t; registering a type +// that already has a handler replaces it. +func (m *RequestMux) Handle(t message.Type, h RequestHandler) { + m.mu.Lock() + defer m.mu.Unlock() + if h == nil { + delete(m.handlers, t) + return + } + m.handlers[t] = h +} + +// HandleType registers h for inbound requests whose first message is the +// concrete type T (e.g. *message.Subscribe), handing h the already-asserted +// typed message alongside the [*Request]. It is the generic form of +// [RequestMux.Handle]: the [message.Type] key is derived from T, and the type +// assertion a Handle callback would otherwise repeat (req.First.(*message.X)) is +// done once, here. +// +// Go methods cannot take type parameters, so this is a free function taking the +// mux as its first argument. A nil h unregisters T's type; registering a type +// that already has a handler replaces it. +func HandleType[T message.WithRequestID](m *RequestMux, h func(*Request, T)) { + var zero T // nil pointer; message Type() methods are constant returns + if h == nil { + m.Handle(zero.Type(), nil) + return + } + m.Handle(zero.Type(), func(req *Request) { + msg, _ := req.First.(T) + h(req, msg) + }) +} + +// OnUnknown sets the callback invoked for an accepted request whose type has no +// registered handler. With no callback set (the default, or a nil f), an +// unmatched request is rejected with REQUEST_ERROR NOT_SUPPORTED and its stream +// FIN'd so it does not leak. +func (m *RequestMux) OnUnknown(f func(*Request)) { + m.mu.Lock() + defer m.mu.Unlock() + m.onUnknown = f +} + +// Run accepts requests from sess and dispatches each to its registered handler +// until ctx is cancelled or [Session.AcceptRequest] returns an error, which Run +// returns. +// +// Some AcceptRequest errors are session-fatal protocol violations — a §10.1 +// Request-ID parity/monotonicity violation (*ErrRequestIDParityViolation / +// *ErrDuplicateRequestID) or a token-cache fault (*TokenCacheError) — that the +// caller MUST escalate by closing the session with the mapped code (see +// [Session.AcceptRequest]). Run surfaces the error unchanged so the caller can +// inspect it with errors.As and Close accordingly. +// +// Dispatch is synchronous: a handler runs to completion before Run accepts the +// next request, mirroring a hand-written accept loop and [Demux.Run]. A handler +// that keeps a request stream open for the lifetime of a subscription therefore +// blocks the loop, so spawn a goroutine inside the handler when requests must be +// serviced concurrently. +func (m *RequestMux) Run(ctx context.Context, sess *Session) error { + for { + req, err := sess.AcceptRequest(ctx) + if err != nil { + return err + } + m.dispatch(req) + } +} + +// dispatch routes one accepted request to its registered handler, or to the +// unknown path when none matches. +func (m *RequestMux) dispatch(req *Request) { + m.mu.RLock() + h := m.handlers[req.First.Type()] + f := m.onUnknown + m.mu.RUnlock() + if h != nil { + h(req) + return + } + if f != nil { + f(req) + return + } + _ = req.RejectError(moqt.RequestNotSupported, "moqt/session: no handler for request type") +} diff --git a/pkg/moqt/session/requestmux_test.go b/pkg/moqt/session/requestmux_test.go new file mode 100644 index 0000000..4218aec --- /dev/null +++ b/pkg/moqt/session/requestmux_test.go @@ -0,0 +1,151 @@ +package session_test + +import ( + "slices" + "testing" + "time" + + "github.com/floatdrop/moq-go/pkg/moqt" + "github.com/floatdrop/moq-go/pkg/moqt/message" + "github.com/floatdrop/moq-go/pkg/moqt/session" + "github.com/floatdrop/moq-go/pkg/moqt/wire" +) + +// TestRequestMuxRoutesByType checks that RequestMux dispatches requests by their +// first message's Type, routes an unregistered type to OnUnknown, and honours a +// handler registered after Run has already started. +func TestRequestMuxRoutesByType(t *testing.T) { + t.Parallel() + client, server := openPair(t) + + events := make(chan message.Type, 8) + firstSeen := make(chan struct{}, 1) + record := func(r *session.Request) { + events <- r.First.Type() + _ = r.Stream.Close() + } + + mux := session.NewRequestMux() + mux.Handle(message.TypeSubscribe, func(r *session.Request) { + record(r) + firstSeen <- struct{}{} + }) + mux.Handle(message.TypePublishNamespace, record) + mux.OnUnknown(record) + + go func() { _ = mux.Run(t.Context(), server) }() + + // Client sends even Request IDs (§10.1), strictly increasing. + + // 1. SUBSCRIBE on a registered type. + openRequest(t, client, &message.Subscribe{ + RequestID: 0, + Namespace: wire.Namespace("ns"), + Name: []byte("a"), + Parameters: message.Parameters{message.LargestObjectFilter()}, + }) + <-firstSeen // ensure Run is live before late registration + + // 2. Register TRACK_STATUS *after* Run started, then send one. + mux.Handle(message.TypeTrackStatus, record) + openRequest(t, client, &message.TrackStatus{ + RequestID: 2, + Namespace: wire.Namespace("ns"), + Name: []byte("a"), + }) + + // 3. PUBLISH_NAMESPACE on a registered type. + openRequest(t, client, &message.PublishNamespace{ + RequestID: 4, + Namespace: wire.Namespace("ns"), + }) + + // 4. SUBSCRIBE_NAMESPACE — unregistered → OnUnknown. + openRequest(t, client, &message.SubscribeNamespace{ + RequestID: 6, + TrackNamespacePrefix: wire.Namespace("ns"), + }) + + got := collectEvents(t, events, 4) + for _, want := range []message.Type{ + message.TypeSubscribe, + message.TypeTrackStatus, + message.TypePublishNamespace, + message.TypeSubscribeNamespace, + } { + if !slices.Contains(got, want) { + t.Errorf("missing dispatched type %s; got %v", want, got) + } + } +} + +// TestRequestMuxHandleType checks that HandleType derives the message.Type key +// from T and hands the handler the already-asserted typed message. +func TestRequestMuxHandleType(t *testing.T) { + t.Parallel() + client, server := openPair(t) + + got := make(chan string, 1) + mux := session.NewRequestMux() + session.HandleType(mux, func(_ *session.Request, msg *message.Subscribe) { + got <- string(msg.Name) // typed access without a manual assertion + }) + go func() { _ = mux.Run(t.Context(), server) }() + + openRequest(t, client, &message.Subscribe{ + RequestID: 0, + Namespace: wire.Namespace("ns"), + Name: []byte("typed-track"), + Parameters: message.Parameters{message.LargestObjectFilter()}, + }) + + select { + case name := <-got: + if name != "typed-track" { + t.Errorf("handler saw Name = %q, want %q", name, "typed-track") + } + case <-time.After(2 * time.Second): + t.Fatal("typed handler was not invoked") + } +} + +// TestRequestMuxDefaultRejectsUnhandled verifies that with no OnUnknown set, an +// unhandled request type is rejected with REQUEST_ERROR NOT_SUPPORTED and its +// stream FIN'd, so the requester learns the server cannot serve it. +func TestRequestMuxDefaultRejectsUnhandled(t *testing.T) { + t.Parallel() + client, server := openPair(t) + + mux := session.NewRequestMux() // no handlers, no OnUnknown + go func() { _ = mux.Run(t.Context(), server) }() + + stream, err := client.OpenRequest(&message.TrackStatus{ + RequestID: 0, + Namespace: wire.Namespace("ns"), + Name: []byte("x"), + }) + if err != nil { + t.Fatalf("OpenRequest: %v", err) + } + + resp, err := message.Parse(stream) + if err != nil { + t.Fatalf("read response: %v", err) + } + rerr, ok := resp.(*message.RequestError) + if !ok { + t.Fatalf("got %s, want REQUEST_ERROR", resp.Type()) + } + if rerr.ErrorCode != moqt.RequestNotSupported { + t.Errorf("ErrorCode = %#x, want NOT_SUPPORTED (%#x)", rerr.ErrorCode, moqt.RequestNotSupported) + } +} + +// openRequest opens a bidi request stream carrying first as its initial message +// and leaves it open (the mux handler or session cleanup tears it down). +func openRequest(t *testing.T, s *session.Session, first message.Message) { + t.Helper() + if _, err := s.OpenRequest(first); err != nil { + t.Fatalf("OpenRequest(%s): %v", first.Type(), err) + } +}