Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions pkg/relay/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@
// (subscribe, publish, namespace, track_status, fetch,
// fanout, datagram). Every handler_* function is a method on
// the unexported sessionHandler and a façade over the shared
// registries. auth.go, metrics.go, limiter.go, and
// newgroup.go are leaf helpers.
// registries. auth.go, metrics.go, and limiter.go are leaf
// helpers.
//
// internal/registry Relay-wide shared state, created once and shared across
// every session handler. track.go routes objects and owns
// the per-track cache; namespace.go tracks PUBLISH_NAMESPACE
// / SUBSCRIBE_NAMESPACE state; fetch_router.go rendezvouses
// upstream FETCH response streams with the downstream
// handler that issued the FETCH; subscription.go is the
// Subscription / UpstreamSub / DownstreamSub state machine.
// This package never imports the parent — the dependency
// only ever points handler → registry.
// every session handler. track_registry.go routes objects to
// per-track track_entry.go entries (each owning a cache);
// namespace.go tracks PUBLISH_NAMESPACE / SUBSCRIBE_NAMESPACE
// state; fetch_router.go rendezvouses upstream FETCH response
// streams with the downstream handler that issued the FETCH;
// subscription.go is the UpstreamSub / DownstreamSub state
// machine. This package never imports the parent — the
// dependency only ever points handler → registry.
//
// The other sibling subpackages are cache (per-track LRU+TTL object cache),
// discovery (cross-instance track + namespace advertisement fabric), and
Expand Down
6 changes: 2 additions & 4 deletions pkg/relay/handler_datagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,8 @@ func (h *sessionHandler) runDatagramLoop(ctx context.Context) error {
func (h *sessionHandler) handleDatagram(ctx context.Context, d *message.ObjectDatagram) {
key, ok := h.sess.LookupInboundTrackAlias(d.TrackAlias)
if !ok {
// §11.3: "If an endpoint receives a datagram with an unknown
// Track Alias, it MAY drop the datagram or choose to buffer
// it for a brief period to handle reordering with the control
// message that establishes the Track Alias." We drop.
// §11.3: an unknown Track Alias MAY be dropped or briefly buffered for
// reordering against the establishing control message. We drop.
h.log.LogAttrs(ctx, slog.LevelDebug, "datagram: unknown inbound Track Alias",
slog.Uint64("alias", d.TrackAlias))
return
Expand Down
65 changes: 17 additions & 48 deletions pkg/relay/handler_fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,11 @@ type subgroupWriterSet struct {
// second and later copy of each {GroupID, ObjectID} so the subscriber sees each
// object exactly once. A single publisher is just the one-contributor case.
//
// The per-subscriber forward path runs in a dedicated writer goroutine
// fed by a bounded send queue. §5.1.2 filters are evaluated pre-enqueue
// and ObjectIDDelta is re-encoded on the outbound side so filter drops
// don't shift the subscriber's decoded absolute IDs. §11.4.3 gap-driven
// reset/reopen is implemented; the inbound FIN-vs-reset distinction is
// propagated to the outbound streams when the LAST contributor leaves;
// [registry.TrackEntry.LargestObject] (§10.2.11) is bumped on every forwarded
// object.
// The per-subscriber forward path runs in a dedicated [subgroupWriter]
// goroutine fed by a bounded send queue: §5.1.2 filters are evaluated
// pre-enqueue and ObjectIDDelta re-encoded outbound so drops don't shift the
// subscriber's absolute IDs. The inbound FIN-vs-reset distinction propagates to
// the outbound streams only when the LAST contributor leaves.
func (h *sessionHandler) runFanout(ctx context.Context, stream *session.IncomingSubgroupStream) {
hdr := stream.Header

Expand Down Expand Up @@ -458,28 +455,13 @@ func (w *subgroupWriter) publish(fwd fwdObject) {
}
}

// run is the writer goroutine. It exits when its inbox is closed; close
// signals whether the inbound side ended cleanly (FIN propagation) or
// abnormally (reset propagation).
// run is the writer goroutine. It drains the inbox and writes objects to the
// outbound stream until the inbox is closed, then decides the stream's fate
// (FIN / reset) from the flags close recorded — see the post-drain block below.
//
// If WriteObject fails mid-stream (QUIC-level error: stream torn down or
// session dead) the writer cancels the outbound stream, marks writeFailed,
// and keeps draining the inbox until close is called. This keeps publish
// non-blocking from the producer's side without spawning a second goroutine
// just to discard the tail of the queue.
//
// Stream-lifecycle decisions:
//
// - The outbound stream was opened eagerly by runFanout, so the very
// first forwarded object writes against that stream (with the
// publisher's first absolute Object ID as the delta).
// - A non-consecutive Object ID (gap) cancels the current outbound
// stream and opens a new one before writing — §11.4.3 forbids
// forwarding non-consecutive objects on the same subgroup stream.
// - When the inbox closes, the post-drain path either FINs (clean
// inbound EOF), cancels with [moqt.StreamResetCancelled] (inbound
// reset propagation), or cancels with [moqt.StreamResetExcessiveLoad]
// (slow-reader escalation).
// If WriteObject fails mid-stream (QUIC-level error) the writer cancels the
// outbound stream, marks writeFailed, and keeps draining until close is called,
// keeping publish non-blocking without a second drain goroutine.
func (w *subgroupWriter) run() {
defer close(w.done)

Expand Down Expand Up @@ -637,25 +619,12 @@ func (w *subgroupWriter) run() {
_ = w.out.Close()
}

// applyPriority pushes the §7.2 effective priority for this writer's
// current outbound stream into the transport.
// [session.OutgoingSubgroupStream.SetSendPriority] forwards to the
// underlying [session.SendStream] iff it implements
// [session.PrioritizedSendStream]; adapters that don't (currently all of
// them — see the interface docs) silently no-op.
//
// applyPriority is called on stream open and on §11.4.3 reopen, so a writer
// always picks up the latest subscriber + publisher values when it (re)opens
// its stream. A REQUEST_UPDATE that changes SUBSCRIBER_PRIORITY mid-stream
// updates the stored DownstreamSub priority (see handleSubscribeUpdate ->
// installSubscribeParams) but is NOT re-pushed to already-open writers — the
// new key takes effect on the next open/reopen, not in-flight. Live re-push is
// a follow-up that only matters once a transport honors the knob (§10.2.7;
// quic-go#437).
//
// The publisher-priority byte, Group ID and Subgroup ID all come from the
// inbound SubgroupHeader; the subscriber-priority and group-order halves come
// from the subscription. Together they form the full §7.2 key (rules 1–4).
// applyPriority pushes the §7.2 effective priority for this writer's current
// outbound stream into the transport. It is called on stream open and §11.4.3
// reopen, so a mid-stream SUBSCRIBER_PRIORITY change takes effect on the next
// (re)open rather than in-flight. The key combines the publisher-priority,
// Group ID and Subgroup ID from the inbound header with the subscriber-priority
// and group-order from the subscription (§7.2 rules 1–4).
func (w *subgroupWriter) applyPriority() {
if w.out == nil {
return
Expand Down
58 changes: 21 additions & 37 deletions pkg/relay/handler_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,16 @@ import (
// handler: the stitch degrades to cache-only once it elapses.
const defaultUpstreamFetchTimeout = 5 * time.Second

// handleFetch implements FETCH (§9.4, §10.12): validate the requested
// range, reply FETCH_OK, open a FETCH_HEADER uni-stream, and serialise
// the cached objects in the requested group order. Gaps in the response
// stream are how the spec signals "objects do not exist" (§11.4.4,
// §3553).
// handleFetch implements FETCH (§9.4, §10.12): validate the requested range,
// reply FETCH_OK, open a FETCH_HEADER uni-stream, and serialise the cached
// objects in the requested group order. Gaps in the response stream are how the
// spec signals "objects do not exist" (§11.4.4, §3553).
//
// The below-floor portion of the range — objects the relay evicted or never
// cached — is stitched from an upstream FETCH when one is reachable; see
// [sessionHandler.stitchedFetchObjects]. With no reachable upstream the relay
// still emits whatever it has and leaves the rest as gaps, which §3553 lets
// the subscriber read as non-existence.
//
// Current limitations:
//
// - Upstream stitching needs an Established upstream on another session that
// answers FETCH (i.e. a relay/origin). A leaf publisher that doesn't serve
// FETCH leaves below-floor gaps unfilled.
// - Upstream-fetched objects are not written back into the cache (the FIFO
// ring is keyed by arrival; inserting old backfill would evict live data).
// - FILL_TIMEOUT bounds both the response setup and the upstream-fetch wait.
// [sessionHandler.stitchedFetchObjects]. Otherwise the relay emits what it has
// and leaves the rest as gaps, which §3553 lets the subscriber read as
// non-existence.
func (h *sessionHandler) handleFetch(ctx context.Context, req *session.Request, msg *message.Fetch) {
if err := h.auth.AuthorizeFetch(ctx, h.sess, msg); err != nil {
h.rejectAuth(ctx, req, "Fetch", err)
Expand Down Expand Up @@ -129,12 +119,10 @@ func (h *sessionHandler) handleFetch(ctx context.Context, req *session.Request,
}
_ = out.Close()

// Read follow-up messages (§10.9 REQUEST_UPDATE, and a peer FIN/reset)
// on the bidi request stream until the peer tears it down or ctx is
// cancelled. Unlike the previous DrainAndWait, which discarded every
// follow-up byte, this loop parses and dispatches REQUEST_UPDATE so a
// malformed FETCH update is answered with REQUEST_ERROR and the FETCH
// data stream is reset per §10.9.
// Read follow-ups (§10.9 REQUEST_UPDATE, peer FIN/reset) on the bidi
// request stream until the peer tears it down or ctx is cancelled, so a
// malformed FETCH update is answered with REQUEST_ERROR and the data
// stream reset per §10.9.
h.readFetchUpdates(ctx, req, out)
}

Expand Down Expand Up @@ -371,20 +359,16 @@ func capFetchEndLocation(requested, largest message.Location) message.Location {
// the below-floor portion the relay does not hold from an upstream FETCH when
// one is reachable (§9.4 upstream stitching).
//
// The cache's retained set is a Location suffix (see [cache.ObjectCache.OldestRetained]):
// everything below the eviction floor was evicted or never cached, so a gap
// there might still exist upstream, whereas a gap at or above the floor is
// ground-truth non-existence. The handler therefore splits the request at the
// floor, fetches [requestStart, floor) from an established upstream, and
// concatenates it with the cached part in the requested group order — the two
// parts are disjoint by Location, so a concatenation is correctly ordered.
//
// When there is no FETCH-able upstream, or the upstream FETCH errors / times
// out, it degrades to "serve what the cache has": the below-floor gap stays a
// gap, which the subscriber reads as non-existence (§3553) — exactly the
// pre-stitching behaviour. Upstream-fetched objects are NOT written back into
// the cache: the cache is a FIFO ring keyed by arrival, and inserting old
// backfill would evict live objects.
// Everything below the cache's eviction floor (see
// [cache.ObjectCache.OldestRetained]) was evicted or never cached, so a gap
// there might still exist upstream whereas a gap at/above the floor is
// ground-truth non-existence. The handler splits the request at the floor,
// fetches [requestStart, floor) from an established upstream, and concatenates
// it with the cached part — the two are disjoint by Location, so the result is
// correctly ordered. With no FETCH-able upstream (or on error/timeout) it
// degrades to serving what the cache has. Upstream-fetched objects are NOT
// cached back: the FIFO ring is keyed by arrival, so old backfill would evict
// live objects.
func (h *sessionHandler) stitchedFetchObjects(
ctx context.Context,
entry *registry.TrackEntry,
Expand Down
8 changes: 3 additions & 5 deletions pkg/relay/handler_publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ func (h *sessionHandler) handlePublish(ctx context.Context, req *session.Request

fullName := track.FullTrackName{Namespace: msg.Namespace, Name: msg.Name}

// §11.1: register the alias the publisher chose so we can detect
// duplicates. A duplicate alias for a different track is a
// session-level error per the spec, but we tolerate per-request
// failure for now (the dispatch loop already handles single requests
// independently — closing the session would over-react).
// §11.1: register the publisher's chosen alias so the fanout path can map
// it back to the track and duplicates are detected. A duplicate alias is a
// session-level error per spec, but we scope the failure to this request.
if err := h.sess.RegisterInboundTrackAlias(msg.TrackAlias, fullName.Key()); err != nil {
h.log.LogAttrs(ctx, slog.LevelDebug, "PUBLISH alias registration failed",
slog.String("err", err.Error()))
Expand Down
Loading