Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ grouped here by the file they live in:
| Publish a track | `ExampleSession_Publish` |
| Subscribe to a track | `ExampleSession_Subscribe` |
| Route many tracks' data streams | `ExampleDemux` |
| Route inbound requests (server side) | `ExampleRequestMux` |
| Joining / standalone FETCH | `ExampleSession_Fetch`, `ExampleSession_Fetch_standalone`, `ExampleIncomingFetchStream` |
| Update a live request | `ExampleSession_UpdateRequest` |
| End a publication | `Example_endingAPublication` |
Expand Down
40 changes: 40 additions & 0 deletions pkg/moqt/session/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,46 @@ func drain(s *session.IncomingSubgroupStream, label string) {
}()
}

// Routing inbound requests on the server side. RequestMux is the request-stream
// counterpart of Demux: register a handler per message.Type instead of hand-
// rolling an AcceptRequest loop + type switch.
func ExampleRequestMux() {
var server *session.Session // e.g. from session.Server
ctx := context.Background()

mux := session.NewRequestMux()

// HandleType hands the handler the already-asserted typed message. A
// SUBSCRIBE handler keeps the request stream open for the subscription's
// lifetime, so it spawns a goroutine — Run dispatches synchronously, exactly
// like Demux.
session.HandleType(mux, func(r *session.Request, _ *message.Subscribe) {
go func() {
pub, err := r.AcceptSubscribe(nil) // writes SUBSCRIBE_OK, returns a Publication
if err != nil {
return
}
defer pub.Close()
// … push objects via pub.OpenSubgroup … then pub.Done(...).
_ = pub
}()
})
mux.Handle(message.TypePublishNamespace, func(r *session.Request) {
_ = r.Reply(&message.RequestOK{})
_ = r.Stream.Close()
})

// Optional: without OnUnknown, an unhandled type is rejected NOT_SUPPORTED.
mux.OnUnknown(func(r *session.Request) {
_ = r.RejectError(moqt.RequestNotSupported, "unsupported request type")
})

// Run returns when ctx is cancelled or AcceptRequest fails. A session-fatal
// error (e.g. *session.ErrDuplicateRequestID) should be escalated by closing
// the session with the mapped code.
_ = mux.Run(ctx, server)
}

// Backfilling with a Relative Joining FETCH (§10.12.2): a FilterLargestObject
// subscription only delivers objects strictly after the live edge, so the
// current group is invisible until the next one lands. A joining FETCH keyed
Expand Down
128 changes: 128 additions & 0 deletions pkg/moqt/session/requestmux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package session

import (
"context"
"sync"

"github.com/floatdrop/moq-go/pkg/moqt"
"github.com/floatdrop/moq-go/pkg/moqt/message"
)

// RequestHandler handles one inbound request that a [RequestMux] routed to it by
// the [message.Type] of its first message. It is invoked synchronously by
// [RequestMux.Run]; spawn a goroutine inside it when a request must be serviced
// concurrently with accepting the next one (see [RequestMux.Run]).
type RequestHandler func(*Request)

// RequestMux routes the requests accepted from a [Session] to per-type handlers,
// replacing the hand-rolled "AcceptRequest loop + type-switch + dispatch" a
// server otherwise writes. It is the request-stream counterpart of [Demux],
// which does the same for inbound data streams.
//
// Requests are dispatched by the [message.Type] of their first message — e.g.
// [message.TypeSubscribe] for an inbound SUBSCRIBE. A request whose type has no
// registered handler is passed to the OnUnknown callback.
//
// Handlers may be registered or replaced at any time, including while
// [RequestMux.Run] is executing. Registration is safe for concurrent use.
//
// The zero value is not ready for use — construct with [NewRequestMux].
type RequestMux struct {
mu sync.RWMutex
handlers map[message.Type]RequestHandler
onUnknown func(*Request)
}

// NewRequestMux returns an empty RequestMux ready for handler registration.
func NewRequestMux() *RequestMux {
return &RequestMux{handlers: make(map[message.Type]RequestHandler)}
}

// Handle registers h for inbound requests whose first message is of type t
// (e.g. [message.TypeSubscribe]). A nil h unregisters t; registering a type
// that already has a handler replaces it.
func (m *RequestMux) Handle(t message.Type, h RequestHandler) {
m.mu.Lock()
defer m.mu.Unlock()
if h == nil {
delete(m.handlers, t)
return
}
m.handlers[t] = h
}

// HandleType registers h for inbound requests whose first message is the
// concrete type T (e.g. *message.Subscribe), handing h the already-asserted
// typed message alongside the [*Request]. It is the generic form of
// [RequestMux.Handle]: the [message.Type] key is derived from T, and the type
// assertion a Handle callback would otherwise repeat (req.First.(*message.X)) is
// done once, here.
//
// Go methods cannot take type parameters, so this is a free function taking the
// mux as its first argument. A nil h unregisters T's type; registering a type
// that already has a handler replaces it.
func HandleType[T message.WithRequestID](m *RequestMux, h func(*Request, T)) {
var zero T // nil pointer; message Type() methods are constant returns
if h == nil {
m.Handle(zero.Type(), nil)
return
}
m.Handle(zero.Type(), func(req *Request) {
msg, _ := req.First.(T)
h(req, msg)
})
}

// OnUnknown sets the callback invoked for an accepted request whose type has no
// registered handler. With no callback set (the default, or a nil f), an
// unmatched request is rejected with REQUEST_ERROR NOT_SUPPORTED and its stream
// FIN'd so it does not leak.
func (m *RequestMux) OnUnknown(f func(*Request)) {
m.mu.Lock()
defer m.mu.Unlock()
m.onUnknown = f
}

// Run accepts requests from sess and dispatches each to its registered handler
// until ctx is cancelled or [Session.AcceptRequest] returns an error, which Run
// returns.
//
// Some AcceptRequest errors are session-fatal protocol violations — a §10.1
// Request-ID parity/monotonicity violation (*ErrRequestIDParityViolation /
// *ErrDuplicateRequestID) or a token-cache fault (*TokenCacheError) — that the
// caller MUST escalate by closing the session with the mapped code (see
// [Session.AcceptRequest]). Run surfaces the error unchanged so the caller can
// inspect it with errors.As and Close accordingly.
//
// Dispatch is synchronous: a handler runs to completion before Run accepts the
// next request, mirroring a hand-written accept loop and [Demux.Run]. A handler
// that keeps a request stream open for the lifetime of a subscription therefore
// blocks the loop, so spawn a goroutine inside the handler when requests must be
// serviced concurrently.
func (m *RequestMux) Run(ctx context.Context, sess *Session) error {
for {
req, err := sess.AcceptRequest(ctx)
if err != nil {
return err
}
m.dispatch(req)
}
}

// dispatch routes one accepted request to its registered handler, or to the
// unknown path when none matches.
func (m *RequestMux) dispatch(req *Request) {
m.mu.RLock()
h := m.handlers[req.First.Type()]
f := m.onUnknown
m.mu.RUnlock()
if h != nil {
h(req)
return
}
if f != nil {
f(req)
return
}
_ = req.RejectError(moqt.RequestNotSupported, "moqt/session: no handler for request type")
}
151 changes: 151 additions & 0 deletions pkg/moqt/session/requestmux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package session_test

import (
"slices"
"testing"
"time"

"github.com/floatdrop/moq-go/pkg/moqt"
"github.com/floatdrop/moq-go/pkg/moqt/message"
"github.com/floatdrop/moq-go/pkg/moqt/session"
"github.com/floatdrop/moq-go/pkg/moqt/wire"
)

// TestRequestMuxRoutesByType checks that RequestMux dispatches requests by their
// first message's Type, routes an unregistered type to OnUnknown, and honours a
// handler registered after Run has already started.
func TestRequestMuxRoutesByType(t *testing.T) {
t.Parallel()
client, server := openPair(t)

events := make(chan message.Type, 8)
firstSeen := make(chan struct{}, 1)
record := func(r *session.Request) {
events <- r.First.Type()
_ = r.Stream.Close()
}

mux := session.NewRequestMux()
mux.Handle(message.TypeSubscribe, func(r *session.Request) {
record(r)
firstSeen <- struct{}{}
})
mux.Handle(message.TypePublishNamespace, record)
mux.OnUnknown(record)

go func() { _ = mux.Run(t.Context(), server) }()

// Client sends even Request IDs (§10.1), strictly increasing.

// 1. SUBSCRIBE on a registered type.
openRequest(t, client, &message.Subscribe{
RequestID: 0,
Namespace: wire.Namespace("ns"),
Name: []byte("a"),
Parameters: message.Parameters{message.LargestObjectFilter()},
})
<-firstSeen // ensure Run is live before late registration

// 2. Register TRACK_STATUS *after* Run started, then send one.
mux.Handle(message.TypeTrackStatus, record)
openRequest(t, client, &message.TrackStatus{
RequestID: 2,
Namespace: wire.Namespace("ns"),
Name: []byte("a"),
})

// 3. PUBLISH_NAMESPACE on a registered type.
openRequest(t, client, &message.PublishNamespace{
RequestID: 4,
Namespace: wire.Namespace("ns"),
})

// 4. SUBSCRIBE_NAMESPACE — unregistered → OnUnknown.
openRequest(t, client, &message.SubscribeNamespace{
RequestID: 6,
TrackNamespacePrefix: wire.Namespace("ns"),
})

got := collectEvents(t, events, 4)
for _, want := range []message.Type{
message.TypeSubscribe,
message.TypeTrackStatus,
message.TypePublishNamespace,
message.TypeSubscribeNamespace,
} {
if !slices.Contains(got, want) {
t.Errorf("missing dispatched type %s; got %v", want, got)
}
}
}

// TestRequestMuxHandleType checks that HandleType derives the message.Type key
// from T and hands the handler the already-asserted typed message.
func TestRequestMuxHandleType(t *testing.T) {
t.Parallel()
client, server := openPair(t)

got := make(chan string, 1)
mux := session.NewRequestMux()
session.HandleType(mux, func(_ *session.Request, msg *message.Subscribe) {
got <- string(msg.Name) // typed access without a manual assertion
})
go func() { _ = mux.Run(t.Context(), server) }()

openRequest(t, client, &message.Subscribe{
RequestID: 0,
Namespace: wire.Namespace("ns"),
Name: []byte("typed-track"),
Parameters: message.Parameters{message.LargestObjectFilter()},
})

select {
case name := <-got:
if name != "typed-track" {
t.Errorf("handler saw Name = %q, want %q", name, "typed-track")
}
case <-time.After(2 * time.Second):
t.Fatal("typed handler was not invoked")
}
}

// TestRequestMuxDefaultRejectsUnhandled verifies that with no OnUnknown set, an
// unhandled request type is rejected with REQUEST_ERROR NOT_SUPPORTED and its
// stream FIN'd, so the requester learns the server cannot serve it.
func TestRequestMuxDefaultRejectsUnhandled(t *testing.T) {
t.Parallel()
client, server := openPair(t)

mux := session.NewRequestMux() // no handlers, no OnUnknown
go func() { _ = mux.Run(t.Context(), server) }()

stream, err := client.OpenRequest(&message.TrackStatus{
RequestID: 0,
Namespace: wire.Namespace("ns"),
Name: []byte("x"),
})
if err != nil {
t.Fatalf("OpenRequest: %v", err)
}

resp, err := message.Parse(stream)
if err != nil {
t.Fatalf("read response: %v", err)
}
rerr, ok := resp.(*message.RequestError)
if !ok {
t.Fatalf("got %s, want REQUEST_ERROR", resp.Type())
}
if rerr.ErrorCode != moqt.RequestNotSupported {
t.Errorf("ErrorCode = %#x, want NOT_SUPPORTED (%#x)", rerr.ErrorCode, moqt.RequestNotSupported)
}
}

// openRequest opens a bidi request stream carrying first as its initial message
// and leaves it open (the mux handler or session cleanup tears it down).
func openRequest(t *testing.T, s *session.Session, first message.Message) {
t.Helper()
if _, err := s.OpenRequest(first); err != nil {
t.Fatalf("OpenRequest(%s): %v", first.Type(), err)
}
}