Skip to content

feat(session): add RequestMux for routing inbound requests by type#31

Merged
floatdrop merged 2 commits into
draft-18from
feat/request-mux
Jun 28, 2026
Merged

feat(session): add RequestMux for routing inbound requests by type#31
floatdrop merged 2 commits into
draft-18from
feat/request-mux

Conversation

@floatdrop

Copy link
Copy Markdown
Owner

What

RequestMux is the request-stream counterpart of Demux. Today a server (the relay, or any session.Server-based app) hand-rolls an AcceptRequest loop + a big type switch + dispatch. RequestMux lets it register a handler per message.Type instead:

mux := session.NewRequestMux()
mux.Handle(message.TypeSubscribe, func(r *session.Request) {
	go func() {                       // long-lived stream → spawn (Run is synchronous)
		pub, err := r.AcceptSubscribe(nil)
		...
	}()
})
mux.Handle(message.TypePublishNamespace, func(r *session.Request) {
	_ = r.Reply(&message.RequestOK{}); _ = r.Stream.Close()
})
mux.OnUnknown(func(r *session.Request) { _ = r.RejectError(moqt.RequestNotSupported, "…") })
_ = mux.Run(ctx, server)

The API and concurrency contract deliberately mirror Demux:

  • Handle(t message.Type, h) / OnUnknown(f) / Run(ctx, *Session) error.
  • Registration is concurrency-safe and may happen while Run is executing (a server learns some types to serve only after startup).
  • Dispatch is synchronous — a handler runs to completion before the next AcceptRequest, exactly like Demux.Run. A handler that keeps a request stream open for a subscription's lifetime spawns a goroutine.
  • Unmatched type with no OnUnknownREQUEST_ERROR NOT_SUPPORTED + FIN, so the stream can't leak.

Run surfaces AcceptRequest errors unchanged, with a doc note that the session-fatal ones (§10.1 Request-ID parity/monotonicity, token-cache faults) must be escalated by closing the session — this is the natural seam for the later "self-close on protocol violation" cleanup.

Why

Removes the boilerplate accept-loop/type-switch from every server built on the library, matching the ergonomics Demux already gives the subscriber/data side.

Relay adoption is deferred on purpose: the relay's dispatch also runs a per-type limiter and token verification before handing off, which needs a pre-dispatch hook rather than a like-for-like swap. Keeping that out of this PR keeps a hot path untouched and the change reviewable.

Testing

  • TestRequestMuxRoutesByType — routing by type, a handler registered after Run starts, and OnUnknown for an unregistered type.
  • TestRequestMuxDefaultRejectsUnhandled — no OnUnknown → requester receives REQUEST_ERROR NOT_SUPPORTED.
  • ExampleRequestMux (compile-checked) + README row.
  • go test -race ./pkg/moqt/session/..., full go test ./..., golangci-lint run, and the modernize standalone check — all clean.

🤖 Generated with Claude Code

floatdrop and others added 2 commits June 29, 2026 00:59
RequestMux is the request-stream counterpart of Demux: it replaces the
hand-rolled "AcceptRequest loop + type switch + dispatch" a server writes
with per-message-type handler registration. Handle(message.Type, h),
OnUnknown, and Run(ctx, sess) mirror Demux's API and concurrency contract
(synchronous dispatch; handlers spawn a goroutine for long-lived streams).

Run surfaces AcceptRequest errors unchanged so the caller can escalate the
session-fatal ones (§10.1 Request-ID violations, token-cache faults) by
closing the session — the unmatched-type default rejects with REQUEST_ERROR
NOT_SUPPORTED.

Includes table tests (routing by type, late registration, OnUnknown, and
the default reject), an ExampleRequestMux, and a README row. Relay adoption
is deferred: its per-type limiter + token-verify pre-dispatch hooks are a
separate concern.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
HandleType[T](mux, func(*Request, T)) registers a handler keyed by T's
message.Type and hands the handler the already-asserted typed message, so
callers don't repeat req.First.(*message.X). It's a free function because
Go methods can't take type parameters; the type key is derived from a zero
T (the message Type() methods are constant returns, safe on a nil pointer).

Adds TestRequestMuxHandleType and switches the SUBSCRIBE branch of
ExampleRequestMux to HandleType.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@floatdrop floatdrop merged commit f701aef into draft-18 Jun 28, 2026
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant