diff --git a/pkg/relay/doc.go b/pkg/relay/doc.go index e8a05bf..c3aa030 100644 --- a/pkg/relay/doc.go +++ b/pkg/relay/doc.go @@ -27,18 +27,18 @@ // (subscribe, publish, namespace, track_status, fetch, // fanout, datagram). Every handler_* function is a method on // the unexported sessionHandler and a façade over the shared -// registries. auth.go, metrics.go, limiter.go, and -// newgroup.go are leaf helpers. +// registries. auth.go, metrics.go, and limiter.go are leaf +// helpers. // // internal/registry Relay-wide shared state, created once and shared across -// every session handler. track.go routes objects and owns -// the per-track cache; namespace.go tracks PUBLISH_NAMESPACE -// / SUBSCRIBE_NAMESPACE state; fetch_router.go rendezvouses -// upstream FETCH response streams with the downstream -// handler that issued the FETCH; subscription.go is the -// Subscription / UpstreamSub / DownstreamSub state machine. -// This package never imports the parent — the dependency -// only ever points handler → registry. +// every session handler. track_registry.go routes objects to +// per-track track_entry.go entries (each owning a cache); +// namespace.go tracks PUBLISH_NAMESPACE / SUBSCRIBE_NAMESPACE +// state; fetch_router.go rendezvouses upstream FETCH response +// streams with the downstream handler that issued the FETCH; +// subscription.go is the UpstreamSub / DownstreamSub state +// machine. This package never imports the parent — the +// dependency only ever points handler → registry. // // The other sibling subpackages are cache (per-track LRU+TTL object cache), // discovery (cross-instance track + namespace advertisement fabric), and diff --git a/pkg/relay/handler_datagram.go b/pkg/relay/handler_datagram.go index 71b9234..1ac324e 100644 --- a/pkg/relay/handler_datagram.go +++ b/pkg/relay/handler_datagram.go @@ -47,10 +47,8 @@ func (h *sessionHandler) runDatagramLoop(ctx context.Context) error { func (h *sessionHandler) handleDatagram(ctx context.Context, d *message.ObjectDatagram) { key, ok := h.sess.LookupInboundTrackAlias(d.TrackAlias) if !ok { - // §11.3: "If an endpoint receives a datagram with an unknown - // Track Alias, it MAY drop the datagram or choose to buffer - // it for a brief period to handle reordering with the control - // message that establishes the Track Alias." We drop. + // §11.3: an unknown Track Alias MAY be dropped or briefly buffered for + // reordering against the establishing control message. We drop. h.log.LogAttrs(ctx, slog.LevelDebug, "datagram: unknown inbound Track Alias", slog.Uint64("alias", d.TrackAlias)) return diff --git a/pkg/relay/handler_fanout.go b/pkg/relay/handler_fanout.go index d03483a..b0df04a 100644 --- a/pkg/relay/handler_fanout.go +++ b/pkg/relay/handler_fanout.go @@ -72,14 +72,11 @@ type subgroupWriterSet struct { // second and later copy of each {GroupID, ObjectID} so the subscriber sees each // object exactly once. A single publisher is just the one-contributor case. // -// The per-subscriber forward path runs in a dedicated writer goroutine -// fed by a bounded send queue. §5.1.2 filters are evaluated pre-enqueue -// and ObjectIDDelta is re-encoded on the outbound side so filter drops -// don't shift the subscriber's decoded absolute IDs. §11.4.3 gap-driven -// reset/reopen is implemented; the inbound FIN-vs-reset distinction is -// propagated to the outbound streams when the LAST contributor leaves; -// [registry.TrackEntry.LargestObject] (§10.2.11) is bumped on every forwarded -// object. +// The per-subscriber forward path runs in a dedicated [subgroupWriter] +// goroutine fed by a bounded send queue: §5.1.2 filters are evaluated +// pre-enqueue and ObjectIDDelta re-encoded outbound so drops don't shift the +// subscriber's absolute IDs. The inbound FIN-vs-reset distinction propagates to +// the outbound streams only when the LAST contributor leaves. func (h *sessionHandler) runFanout(ctx context.Context, stream *session.IncomingSubgroupStream) { hdr := stream.Header @@ -458,28 +455,13 @@ func (w *subgroupWriter) publish(fwd fwdObject) { } } -// run is the writer goroutine. It exits when its inbox is closed; close -// signals whether the inbound side ended cleanly (FIN propagation) or -// abnormally (reset propagation). +// run is the writer goroutine. It drains the inbox and writes objects to the +// outbound stream until the inbox is closed, then decides the stream's fate +// (FIN / reset) from the flags close recorded — see the post-drain block below. // -// If WriteObject fails mid-stream (QUIC-level error: stream torn down or -// session dead) the writer cancels the outbound stream, marks writeFailed, -// and keeps draining the inbox until close is called. This keeps publish -// non-blocking from the producer's side without spawning a second goroutine -// just to discard the tail of the queue. -// -// Stream-lifecycle decisions: -// -// - The outbound stream was opened eagerly by runFanout, so the very -// first forwarded object writes against that stream (with the -// publisher's first absolute Object ID as the delta). -// - A non-consecutive Object ID (gap) cancels the current outbound -// stream and opens a new one before writing — §11.4.3 forbids -// forwarding non-consecutive objects on the same subgroup stream. -// - When the inbox closes, the post-drain path either FINs (clean -// inbound EOF), cancels with [moqt.StreamResetCancelled] (inbound -// reset propagation), or cancels with [moqt.StreamResetExcessiveLoad] -// (slow-reader escalation). +// If WriteObject fails mid-stream (QUIC-level error) the writer cancels the +// outbound stream, marks writeFailed, and keeps draining until close is called, +// keeping publish non-blocking without a second drain goroutine. func (w *subgroupWriter) run() { defer close(w.done) @@ -637,25 +619,12 @@ func (w *subgroupWriter) run() { _ = w.out.Close() } -// applyPriority pushes the §7.2 effective priority for this writer's -// current outbound stream into the transport. -// [session.OutgoingSubgroupStream.SetSendPriority] forwards to the -// underlying [session.SendStream] iff it implements -// [session.PrioritizedSendStream]; adapters that don't (currently all of -// them — see the interface docs) silently no-op. -// -// applyPriority is called on stream open and on §11.4.3 reopen, so a writer -// always picks up the latest subscriber + publisher values when it (re)opens -// its stream. A REQUEST_UPDATE that changes SUBSCRIBER_PRIORITY mid-stream -// updates the stored DownstreamSub priority (see handleSubscribeUpdate -> -// installSubscribeParams) but is NOT re-pushed to already-open writers — the -// new key takes effect on the next open/reopen, not in-flight. Live re-push is -// a follow-up that only matters once a transport honors the knob (§10.2.7; -// quic-go#437). -// -// The publisher-priority byte, Group ID and Subgroup ID all come from the -// inbound SubgroupHeader; the subscriber-priority and group-order halves come -// from the subscription. Together they form the full §7.2 key (rules 1–4). +// applyPriority pushes the §7.2 effective priority for this writer's current +// outbound stream into the transport. It is called on stream open and §11.4.3 +// reopen, so a mid-stream SUBSCRIBER_PRIORITY change takes effect on the next +// (re)open rather than in-flight. The key combines the publisher-priority, +// Group ID and Subgroup ID from the inbound header with the subscriber-priority +// and group-order from the subscription (§7.2 rules 1–4). func (w *subgroupWriter) applyPriority() { if w.out == nil { return diff --git a/pkg/relay/handler_fetch.go b/pkg/relay/handler_fetch.go index 3d7e468..40f2463 100644 --- a/pkg/relay/handler_fetch.go +++ b/pkg/relay/handler_fetch.go @@ -21,26 +21,16 @@ import ( // handler: the stitch degrades to cache-only once it elapses. const defaultUpstreamFetchTimeout = 5 * time.Second -// handleFetch implements FETCH (§9.4, §10.12): validate the requested -// range, reply FETCH_OK, open a FETCH_HEADER uni-stream, and serialise -// the cached objects in the requested group order. Gaps in the response -// stream are how the spec signals "objects do not exist" (§11.4.4, -// §3553). +// handleFetch implements FETCH (§9.4, §10.12): validate the requested range, +// reply FETCH_OK, open a FETCH_HEADER uni-stream, and serialise the cached +// objects in the requested group order. Gaps in the response stream are how the +// spec signals "objects do not exist" (§11.4.4, §3553). // // The below-floor portion of the range — objects the relay evicted or never // cached — is stitched from an upstream FETCH when one is reachable; see -// [sessionHandler.stitchedFetchObjects]. With no reachable upstream the relay -// still emits whatever it has and leaves the rest as gaps, which §3553 lets -// the subscriber read as non-existence. -// -// Current limitations: -// -// - Upstream stitching needs an Established upstream on another session that -// answers FETCH (i.e. a relay/origin). A leaf publisher that doesn't serve -// FETCH leaves below-floor gaps unfilled. -// - Upstream-fetched objects are not written back into the cache (the FIFO -// ring is keyed by arrival; inserting old backfill would evict live data). -// - FILL_TIMEOUT bounds both the response setup and the upstream-fetch wait. +// [sessionHandler.stitchedFetchObjects]. Otherwise the relay emits what it has +// and leaves the rest as gaps, which §3553 lets the subscriber read as +// non-existence. func (h *sessionHandler) handleFetch(ctx context.Context, req *session.Request, msg *message.Fetch) { if err := h.auth.AuthorizeFetch(ctx, h.sess, msg); err != nil { h.rejectAuth(ctx, req, "Fetch", err) @@ -129,12 +119,10 @@ func (h *sessionHandler) handleFetch(ctx context.Context, req *session.Request, } _ = out.Close() - // Read follow-up messages (§10.9 REQUEST_UPDATE, and a peer FIN/reset) - // on the bidi request stream until the peer tears it down or ctx is - // cancelled. Unlike the previous DrainAndWait, which discarded every - // follow-up byte, this loop parses and dispatches REQUEST_UPDATE so a - // malformed FETCH update is answered with REQUEST_ERROR and the FETCH - // data stream is reset per §10.9. + // Read follow-ups (§10.9 REQUEST_UPDATE, peer FIN/reset) on the bidi + // request stream until the peer tears it down or ctx is cancelled, so a + // malformed FETCH update is answered with REQUEST_ERROR and the data + // stream reset per §10.9. h.readFetchUpdates(ctx, req, out) } @@ -371,20 +359,16 @@ func capFetchEndLocation(requested, largest message.Location) message.Location { // the below-floor portion the relay does not hold from an upstream FETCH when // one is reachable (§9.4 upstream stitching). // -// The cache's retained set is a Location suffix (see [cache.ObjectCache.OldestRetained]): -// everything below the eviction floor was evicted or never cached, so a gap -// there might still exist upstream, whereas a gap at or above the floor is -// ground-truth non-existence. The handler therefore splits the request at the -// floor, fetches [requestStart, floor) from an established upstream, and -// concatenates it with the cached part in the requested group order — the two -// parts are disjoint by Location, so a concatenation is correctly ordered. -// -// When there is no FETCH-able upstream, or the upstream FETCH errors / times -// out, it degrades to "serve what the cache has": the below-floor gap stays a -// gap, which the subscriber reads as non-existence (§3553) — exactly the -// pre-stitching behaviour. Upstream-fetched objects are NOT written back into -// the cache: the cache is a FIFO ring keyed by arrival, and inserting old -// backfill would evict live objects. +// Everything below the cache's eviction floor (see +// [cache.ObjectCache.OldestRetained]) was evicted or never cached, so a gap +// there might still exist upstream whereas a gap at/above the floor is +// ground-truth non-existence. The handler splits the request at the floor, +// fetches [requestStart, floor) from an established upstream, and concatenates +// it with the cached part — the two are disjoint by Location, so the result is +// correctly ordered. With no FETCH-able upstream (or on error/timeout) it +// degrades to serving what the cache has. Upstream-fetched objects are NOT +// cached back: the FIFO ring is keyed by arrival, so old backfill would evict +// live objects. func (h *sessionHandler) stitchedFetchObjects( ctx context.Context, entry *registry.TrackEntry, diff --git a/pkg/relay/handler_publish.go b/pkg/relay/handler_publish.go index 8d10ef2..3e273a0 100644 --- a/pkg/relay/handler_publish.go +++ b/pkg/relay/handler_publish.go @@ -41,11 +41,9 @@ func (h *sessionHandler) handlePublish(ctx context.Context, req *session.Request fullName := track.FullTrackName{Namespace: msg.Namespace, Name: msg.Name} - // §11.1: register the alias the publisher chose so we can detect - // duplicates. A duplicate alias for a different track is a - // session-level error per the spec, but we tolerate per-request - // failure for now (the dispatch loop already handles single requests - // independently — closing the session would over-react). + // §11.1: register the publisher's chosen alias so the fanout path can map + // it back to the track and duplicates are detected. A duplicate alias is a + // session-level error per spec, but we scope the failure to this request. if err := h.sess.RegisterInboundTrackAlias(msg.TrackAlias, fullName.Key()); err != nil { h.log.LogAttrs(ctx, slog.LevelDebug, "PUBLISH alias registration failed", slog.String("err", err.Error())) diff --git a/pkg/relay/handler_subscribe.go b/pkg/relay/handler_subscribe.go index 58b5a97..2dbd8db 100644 --- a/pkg/relay/handler_subscribe.go +++ b/pkg/relay/handler_subscribe.go @@ -84,14 +84,9 @@ func (h *sessionHandler) handleSubscribe(ctx context.Context, req *session.Reque h.log.LogAttrs(ctx, slog.LevelDebug, "SUBSCRIBE serving from existing upstream") } - // Allocate the Track Alias the relay will use when publishing this track - // downstream to the subscriber. Per §11.1 this outbound alias space is - // independent of the inbound aliases the peer chose for its own PUBLISHes - // — a session that both publishes and subscribes (e.g. a conferencing - // client) would otherwise collide, since both spaces start at 0. The - // monotonic allocator already guarantees outbound uniqueness, so it is not - // registered in the inbound alias map (which is reserved for the peer's - // publisher-chosen aliases, used to route inbound data streams). + // Allocate the Track Alias the relay uses when publishing this track + // downstream. Per §11.1 the outbound alias space is independent of the + // inbound aliases the peer chose for its own PUBLISHes. alias := h.sess.AllocOutboundTrackAlias() sub := registry.NewDownstreamSub(h.allocSubID(), h.sess, req.Stream, alias) @@ -171,11 +166,9 @@ func (h *sessionHandler) handleSubscribe(ctx context.Context, req *session.Reque h.propagateNewGroupUpstream(ctx, fullName, newGroupReqParam.Varint) } - // Read follow-up messages (§10.9 REQUEST_UPDATE, and a peer FIN/reset) - // on the same bidi stream until the subscriber tears it down or ctx is - // cancelled. Unlike the previous DrainAndWait, which discarded every - // follow-up byte, this loop parses and dispatches REQUEST_UPDATE so the - // subscription's Forward / priority / filter can change mid-flight. + // Read follow-ups (§10.9 REQUEST_UPDATE, peer FIN/reset) on the bidi stream + // until the subscriber tears it down or ctx is cancelled, dispatching + // REQUEST_UPDATE so Forward / priority / filter can change mid-flight. h.readSubscribeUpdates(ctx, req, sub, fullName) h.log.LogAttrs(ctx, slog.LevelDebug, "SUBSCRIBE stream ended", slog.String("name", string(msg.Name))) @@ -337,24 +330,17 @@ func (h *sessionHandler) propagateForwardUpstream(ctx context.Context, fullName } // subscribeUpstream establishes upstream SUBSCRIBEs for fullName. Per §9.5 it -// subscribes to EVERY matching publisher for fault tolerance — all local -// publishers that advertised a covering namespace (§9.5 prefix match) AND all -// remote relays Discovery resolves (§9.4 cross-relay aggregation) — fanning them -// into one track via [runFanout]'s dedup so losing one origin doesn't interrupt -// delivery. A candidate whose SUBSCRIBE fails does not abort the rest. Returns -// (entry, true, nil) when at least one upstream was established, (nil, false, -// nil) when no upstream is available anywhere, and (nil, false, err) only when -// every candidate failed and the last failure was a hard error (e.g. a publisher -// session died mid-subscribe). +// subscribes to EVERY matching source for fault tolerance — every local +// publisher advertising a covering namespace and every remote relay Discovery +// resolves (§9.4 cross-relay aggregation) — deduping already-subscribed +// sessions and fanning the rest into one track. A failed candidate does not +// abort the others. Returns (entry, true, nil) when at least one upstream was +// established, (nil, false, nil) when none is available anywhere, and +// (nil, false, err) when every candidate failed with the last a hard error. // -// Already-subscribed sessions are skipped (source dedup) so a track that already -// has an upstream on a given session is never double-subscribed. -// -// extra carries additional parameters to fold into each upstream SUBSCRIBE -// alongside the mandatory §9.4 Largest Object filter — currently the -// NEW_GROUP_REQUEST a downstream SUBSCRIBE arrived with (§10.2.13 rule 1: a -// relay with no Established upstream MUST include NEW_GROUP_REQUEST when -// subscribing upstream). +// extra carries parameters folded into each upstream SUBSCRIBE alongside the +// §9.4 Largest Object filter — currently the NEW_GROUP_REQUEST a downstream +// SUBSCRIBE arrived with (§10.2.13 rule 1). func (h *sessionHandler) subscribeUpstream( ctx context.Context, fullName track.FullTrackName, @@ -472,12 +458,9 @@ func (h *sessionHandler) subscribeUpstreamOnSession( upstreamSub.FetchCapable = true entry, _ := h.tracks.AddUpstream(fullName, upstreamSub, registry.WithProperties(upstreamStream.OK.TrackProperties)) - // Watcher: keep the upstream stream alive until the publisher cancels - // it (FIN/reset) or this handler shuts down, then unregister. - // waitForStreamEnd handles the ctx-cancel + CancelRead dance; it's - // the same primitive every long-lived handler uses to keep a request - // stream open. The unregister runs in the watcher's goroutine so the - // handler's wg join in run() waits for it. + // Keep the upstream stream alive until the publisher cancels it (FIN/reset) + // or this handler shuts down, then unregister. The unregister runs under + // h.spawn so run()'s wg join waits for it. h.spawn(func() { session.DrainAndWait(ctx, upstreamStream) h.tracks.RemoveUpstream(fullName, upstreamSub.ID) @@ -499,24 +482,14 @@ func hasEstablishedUpstream(entry *registry.TrackEntry) bool { return false } -// installSubscribeParams extracts the per-subscription policy fields from -// the SUBSCRIBE message parameters (§10.2) and records them on sub. -// -// The §5.1.2 / §9.4 LargestObject snapshot is intentionally NOT taken -// here — it is captured atomically with [registry.TrackRegistry.AddDownstreamSnapshotLargest] -// at the call site so the snapshot is consistent with the moment sub -// becomes eligible for live fanout delivery. See that method's docstring -// for the race it solves. Callers must invoke -// [registry.DownstreamSub.SetLargestAtSubscribe] separately with the returned snapshot. +// installSubscribeParams extracts the per-subscription policy fields from the +// SUBSCRIBE parameters (§10.2) and records them on sub: SUBSCRIPTION_FILTER +// (§10.2.9), SUBSCRIBER_PRIORITY (§10.2.7, advisory), GROUP_ORDER (§10.2.8). // -// Installed parameters: -// - SUBSCRIPTION_FILTER (§10.2.9) — fanout consults it on every object -// pre-enqueue. -// - SUBSCRIBER_PRIORITY (§10.2.7) — stored for future cross-stream -// scheduling; subgroup streams don't expose a per-stream priority knob, -// so the value is currently advisory. -// - GROUP_ORDER (§10.2.8) — honoured by the FETCH responder (subgroup -// streams are §11.4.3 in-order and ignore the field). +// The §5.1.2 / §9.4 LargestObject snapshot is intentionally NOT taken here — the +// caller captures it atomically via +// [registry.TrackRegistry.AddDownstreamSnapshotLargest] and applies it with +// [registry.DownstreamSub.SetLargestAtSubscribe]. func installSubscribeParams(sub *registry.DownstreamSub, ps message.Parameters) error { filter, err := message.SubscriptionFilterFromParam(ps) if err != nil { diff --git a/pkg/relay/handler_track_status.go b/pkg/relay/handler_track_status.go index 2a7545e..a5bc3f6 100644 --- a/pkg/relay/handler_track_status.go +++ b/pkg/relay/handler_track_status.go @@ -11,31 +11,15 @@ import ( "github.com/floatdrop/moq-go/pkg/moqt/track" ) -// handleTrackStatus implements TRACK_STATUS (§10.14). It is a metadata-only -// query: the caller wants the track's Properties (and, implicitly, its -// existence) without creating a subscription. Per the spec the response is -// REQUEST_OK (aliased as [message.TrackStatusOK]) carrying the same Track -// Properties block that SUBSCRIBE_OK would. +// handleTrackStatus implements TRACK_STATUS (§10.14): a metadata-only query for +// a track's Properties and existence, answered without creating a subscription +// or round-tripping upstream. The reply is REQUEST_OK (aliased as +// [message.TrackStatusOK]) carrying the same Track Properties block SUBSCRIBE_OK +// would, plus §10.2.11 LARGEST_OBJECT when objects have been forwarded. // -// Flow: -// -// 1. Authorize. -// 2. Look up the track in [registry.TrackRegistry]. If found and a publisher has -// populated Properties (handlePublish / subscribeUpstream both do -// this), reply TRACK_STATUS_OK with those bytes plus a §10.2.11 -// LARGEST_OBJECT parameter sourced from the entry's watermark when -// any object has been forwarded. -// 3. Otherwise check the [registry.NamespaceRegistry] — if a local publisher has -// advertised the namespace, reply TRACK_STATUS_OK with empty -// Properties (and no LARGEST_OBJECT). The caller learns the track -// exists but the relay has no metadata to forward without issuing an -// upstream SUBSCRIBE. -// 4. If neither is true, reject with [moqt.RequestDoesNotExist]. -// -// The handler deliberately does NOT issue an upstream TRACK_STATUS on -// demand. TRACK_STATUS is a status query, not a subscription — the -// answer the relay can give without round-tripping upstream is sufficient -// for callers that just want to know "does this track exist?". +// It answers from the track registry when metadata exists, falls back to an +// empty TRACK_STATUS_OK when only the namespace is advertised locally, and +// otherwise rejects with [moqt.RequestDoesNotExist]. func (h *sessionHandler) handleTrackStatus(ctx context.Context, req *session.Request, msg *message.TrackStatus) { if err := h.auth.AuthorizeTrackStatus(ctx, h.sess, msg); err != nil { h.rejectAuth(ctx, req, "TrackStatus", err) @@ -44,11 +28,8 @@ func (h *sessionHandler) handleTrackStatus(ctx context.Context, req *session.Req fullName := track.FullTrackName{Namespace: msg.Namespace, Name: msg.Name} entry, known := h.tracks.Get(fullName.Key()) - // The relay can answer TRACK_STATUS_OK for any entry it has metadata - // to surface: either Properties (captured from the upstream publisher - // in PUBLISH / SUBSCRIBE_OK) or a §10.2.11 LargestObject watermark - // (the fanout advances this on every forwarded object). Either field - // — even on its own — is useful to the caller. + // Answer TRACK_STATUS_OK for any entry with metadata to surface: Properties + // or a §10.2.11 LargestObject watermark. Either field alone is useful. var ( largest message.Location hasLargest bool diff --git a/pkg/relay/relay.go b/pkg/relay/relay.go index 0aa3bb7..82291cc 100644 --- a/pkg/relay/relay.go +++ b/pkg/relay/relay.go @@ -439,13 +439,10 @@ func (r *Relay) serveSession(ctx context.Context, sess *session.Session) { r.addSession(sess) defer func() { r.removeSession(sess) - // Belt-and-suspenders cleanup: per-request handler defers - // remove their own registrations on a clean shutdown, but if a - // handler raced past Stop or wedged on a stale stream, the - // registries can hold dangling refs to a Session that is no - // longer reachable. Sweep both registries unconditionally so - // the post-condition "handleConn returned ⇒ no registry entry - // references sess" holds. + // Belt-and-suspenders: per-request handlers unregister themselves on + // clean shutdown, but a handler that raced past Stop or wedged could + // leave dangling refs. Sweep both registries so the post-condition + // "serveSession returned ⇒ no registry entry references sess" holds. r.tracks.RemoveSession(sess) r.names.RemoveSession(sess) }() diff --git a/pkg/relay/session_handler.go b/pkg/relay/session_handler.go index 15aa1a3..f075faa 100644 --- a/pkg/relay/session_handler.go +++ b/pkg/relay/session_handler.go @@ -15,26 +15,13 @@ import ( "github.com/floatdrop/moq-go/pkg/relay/internal/registry" ) -// sessionHandler owns the per-session request and data-stream loops. One -// instance is created per accepted [session.Session] in [Relay.handleConn] -// and torn down when the session terminates. +// sessionHandler owns the per-session request and data-stream loops, one per +// accepted [session.Session]. The relay's shared state (registries, authorizer) +// is injected by [Relay.handleConn] and referenced read-only. // -// The handler is purely a façade over the relay's shared state — it does not -// own the [registry.TrackRegistry], [registry.NamespaceRegistry], or [Authorizer]. They are -// injected by [Relay.handleConn] and referenced read-only on the handler. -// -// Concurrency model: -// -// - One goroutine drives the request-dispatch loop ([sessionHandler.runRequestLoop]). -// - One goroutine drives the data-stream loop ([sessionHandler.runDataLoop]). -// - One goroutine drives the datagram loop ([sessionHandler.runDatagramLoop]). -// - Per-request handler goroutines may be spawned by the dispatch loop -// (e.g. a SUBSCRIBE handler that needs to wait for an upstream reply). -// The handler tracks them via wg so [sessionHandler.run] can join cleanly -// before returning. -// -// All registry interactions for this session pass through the handler so -// bulk-cleanup on session teardown is centralised. +// Concurrency: [sessionHandler.run] drives the request, data-stream, and +// datagram loops on separate goroutines, plus per-request handler goroutines +// spawned by the dispatch loop and tracked via wg for a clean join on teardown. type sessionHandler struct { sess *session.Session log *slog.Logger @@ -51,11 +38,9 @@ type sessionHandler struct { // limiter enforces the §13.1 / §13.7.1 per-session resource caps. limiter sessionLimiter - // nextSubID allocates relay-internal subscription IDs for the - // registry.UpstreamSub / registry.DownstreamSub records this handler installs into the - // registry.TrackRegistry. The counter is per-handler because IDs only have to - // be unique within a registry.TrackEntry's slices — there is no global - // requirement — and a per-handler counter avoids contention. + // subID allocates relay-internal subscription IDs for the UpstreamSub / + // DownstreamSub records this handler installs into the registry. Per-handler + // because IDs only need to be unique within a TrackEntry's slices. subIDMu sync.Mutex subID uint64 @@ -115,30 +100,17 @@ func newSessionHandler( } } -// handleInboundGoaway implements §10.4: when the peer sends GOAWAY, -// honour the timeout it declared by giving in-flight subscriptions -// that long to wrap up, then close the session. +// handleInboundGoaway implements §10.4: when the peer sends GOAWAY, grant the +// timeout it declared for in-flight subscriptions to wrap up, then close the +// session. Per-session registry cleanup drops the entries on teardown. // -// Per §10.4 the peer MUST NOT issue new requests after sending GOAWAY; -// existing in-flight work continues. The relay does no role-based -// distinction here: +// The relay does not migrate upstream subscriptions to the peer's NewSessionURI +// (§9.5.1); dependent DownstreamSubs see their tracks end and the client +// re-subscribes, which may re-establish the track via the on-demand upstream +// subscribe path. // -// - Inbound GOAWAY from a downstream subscriber: the subscriber is -// migrating. Their request streams will close as part of that; -// when the timer fires the relay tears the session down. The -// per-session registry cleanup drops registry entries. -// - Inbound GOAWAY from an upstream publisher: §9.5.1 would have the -// relay migrate its subscriptions to a new URI; until the -// UpstreamPool lands the relay just terminates the session at the -// timeout. Dependent DownstreamSubs see their tracks end and the -// client re-subscribes — possibly succeeding via the on-demand -// upstream subscribe path, possibly failing cleanly if the -// publisher's namespace is gone. -// -// The function blocks on either the peer's declared timeout or -// sess.Done() (the peer closed the session earlier) or ctx (relay-level -// shutdown). When it returns, the caller's defer cancels runCtx, which -// unblocks the per-session loops. +// Blocks on the declared timeout, sess.Done() (peer closed earlier), or ctx +// (relay shutdown). The caller's defer then cancels runCtx to unblock the loops. func (h *sessionHandler) handleInboundGoaway(ctx context.Context) { g := h.sess.PeerGoaway() if g == nil { @@ -173,24 +145,14 @@ func (h *sessionHandler) allocSubID() uint64 { return h.subID } -// run blocks until the session ends, returning the cause: -// -// - nil when the peer closed the session cleanly or our side initiated -// shutdown via ctx cancellation. -// - the request-loop or data-loop error otherwise. -// -// run spawns the two protocol loops, waits for both to finish (joining via -// a WaitGroup so neither can be left dangling), then joins all in-flight -// per-request goroutines before returning. The session itself is closed by -// [Relay.Stop] or the peer; run does not call [session.Session.Close] except -// on a protocol violation detected by a loop. +// run blocks until the session ends, returning nil on a clean close (peer or +// ctx-driven shutdown) or the request/data-loop error otherwise. It spawns the +// protocol loops, joins them and all in-flight per-request goroutines, and does +// not close the session itself except on a protocol violation detected by a loop. func (h *sessionHandler) run(ctx context.Context) error { - // Tie our local ctx to both the parent ctx and the session's Done - // channel so loops unblock as soon as the session terminates for any - // reason. Inbound GOAWAY handling is folded into the same watcher: - // when the peer sends GOAWAY, the relay grants the peer's declared - // Timeout for any in-flight subscriptions to drain, then closes the - // session. + // Watcher ties runCtx to the parent ctx and the session's Done channel so + // loops unblock as soon as the session terminates, and folds in inbound + // GOAWAY handling (see handleInboundGoaway). runCtx, cancel := context.WithCancel(ctx) defer cancel() go func() { @@ -219,12 +181,9 @@ func (h *sessionHandler) run(ctx context.Context) error { cancel() // wake sibling loops if the data loop dies first }) // Datagrams are OPTIONAL (§11.6): a transport or peer without DATAGRAM - // support makes ReceiveDatagram fail on the very first call. That must not - // take down SUBSCRIBE/PUBLISH handling, so the datagram loop neither - // cancels its siblings nor promotes its error as a session fault — it just - // stops. Session death is still observed: the request/data loops cancel on - // real transport errors, and a datagram-level PROTOCOL_VIOLATION closes the - // session, which the runCtx watcher above turns into a cancel. + // support fails ReceiveDatagram on the first call, which must not take down + // SUBSCRIBE/PUBLISH handling. So the datagram loop neither cancels its + // siblings nor promotes its error as a session fault — it just stops. loops.Go(func() { if err := h.runDatagramLoop(runCtx); err != nil && !isShutdownErr(err) { h.log.LogAttrs(ctx, slog.LevelDebug, @@ -284,17 +243,12 @@ func (h *sessionHandler) runRequestLoop(ctx context.Context) error { } } -// runDataLoop accepts inbound data streams and routes each to the -// appropriate fanout entry point. Subgroup streams go to -// [sessionHandler.runFanout]; fetch response streams (the body side of -// a FETCH the relay issued upstream) are handed to the downstream handler -// waiting on the matching (session, RequestID) via the fetch router, and -// reset when no reader is registered. Unknown stream types are logged and -// the stream is reset to keep the publisher's flow control unblocked. +// runDataLoop accepts inbound data streams and routes each by type: subgroup +// streams to [sessionHandler.runFanout], fetch response streams to the fetch +// router (see the inline comments below). // -// Per-stream errors do not terminate the loop: §9.5 forbids "one bad data -// stream kills the session" semantics. Transport-level errors from -// AcceptDataStream do terminate, matching the request-loop convention. +// Per-stream errors do not terminate the loop (§9.5: one bad data stream must +// not kill the session); transport-level errors from AcceptDataStream do. func (h *sessionHandler) runDataLoop(ctx context.Context) error { for { ds, err := h.sess.AcceptDataStream(ctx) @@ -345,12 +299,9 @@ func (h *sessionHandler) dispatch(ctx context.Context, req *session.Request) { h.log.LogAttrs(ctx, slog.LevelDebug, "relay dispatching request", slog.String("type", fmt.Sprintf("%T", req.First))) - // §10.2.2: authorize the request's resolved AUTHORIZATION_TOKEN(s) - // before any handler runs. The session has already resolved aliases - // against the inbound cache and attached them as req.Tokens; here we - // apply the application's TokenVerifier (configured via - // session.WithTokenVerifier). A denial is per-request — reply - // REQUEST_ERROR with the mapped code and keep the session running. + // §10.2.2: apply the application's TokenVerifier to the request's resolved + // AUTHORIZATION_TOKEN(s) before any handler runs. A denial is per-request: + // reply REQUEST_ERROR with the mapped code and keep the session running. if err := h.sess.VerifyRequestTokens(ctx, req); err != nil { h.rejectTokenDenied(ctx, req, err) return