From 00ae80d4d3d4d66a3df2aff13e945ec007a3bc02 Mon Sep 17 00:00:00 2001 From: Vsevolod Strukchinsky Date: Mon, 29 Jun 2026 07:47:26 +0500 Subject: [PATCH 1/2] feat(session): round out accept-side request helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every outbound request opener now has a matching Accept* counterpart, so a server gets the same typed-handle ergonomics AcceptSubscribe already gave: - AcceptPublish now returns *IncomingPublication (TrackAlias, Update) instead of bare error — the receiving side of a publisher push. - AcceptFetch writes FETCH_OK and returns *FetchResponder, whose OpenFetchStream binds the fetch's Request ID onto the FETCH_HEADER uni-stream automatically. - AcceptTrackStatus writes TRACK_STATUS_OK. It returns no handle by design (TRACK_STATUS is a one-shot status query with no push side); this is documented rather than left as an arbitrary asymmetry. - AcceptPublishNamespace / AcceptSubscribeNamespace / AcceptSubscribeTracks write REQUEST_OK and return *IncomingNamespacePublication / *IncomingNamespaceSubscription / *IncomingTrackSubscription. The last carries WritePublishBlocked, mirroring TrackSubscription.ReadPublishBlocked. Adds accept_test.go (round-trips incl. a FETCH stream and a PUBLISH_BLOCKED exchange), updates the AcceptPublish callers/example, and adds a README row. Co-Authored-By: Claude Opus 4.8 --- README.md | 1 + pkg/moqt/session/accept_test.go | 230 +++++++++++++++++++++++++++++ pkg/moqt/session/example_test.go | 14 +- pkg/moqt/session/fetch.go | 46 ++++++ pkg/moqt/session/namespace.go | 89 +++++++++++ pkg/moqt/session/publish.go | 33 +++++ pkg/moqt/session/publish_test.go | 2 +- pkg/moqt/session/request.go | 19 ++- pkg/moqt/session/subscribe_test.go | 2 +- pkg/moqt/session/track_status.go | 24 +++ 10 files changed, 449 insertions(+), 11 deletions(-) create mode 100644 pkg/moqt/session/accept_test.go diff --git a/README.md b/README.md index a8dedac..b68feee 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,7 @@ grouped here by the file they live in: | Stream exhaustion (PUBLISH_BLOCKED) | `ExampleSession_OpenPublish`, `ExampleSession_ReadPublishBlocked` | | Announce / discover namespaces | `ExampleSession_PublishNamespace`, `ExampleSession_SubscribeNamespace` | | Accept requests (server side) | `ExampleSession_AcceptRequest` | +| Accept-side helpers (`Accept*`) | `Request.AcceptSubscribe` / `AcceptPublish` / `AcceptFetch` / `AcceptTrackStatus` / `AcceptPublishNamespace` / `AcceptSubscribeNamespace` / `AcceptSubscribeTracks` | | Graceful shutdown (GOAWAY) | `ExampleSession_SendGoaway`, `ExampleSession_OnGoaway` | **[`relay`](pkg/relay/example_test.go)** diff --git a/pkg/moqt/session/accept_test.go b/pkg/moqt/session/accept_test.go new file mode 100644 index 0000000..f503a42 --- /dev/null +++ b/pkg/moqt/session/accept_test.go @@ -0,0 +1,230 @@ +package session_test + +import ( + "testing" + + "github.com/floatdrop/moq-go/pkg/moqt/message" + "github.com/floatdrop/moq-go/pkg/moqt/session" + "github.com/floatdrop/moq-go/pkg/moqt/wire" +) + +// TestAcceptFetchRepliesFetchOK checks AcceptFetch writes FETCH_OK with the +// supplied fields and that the responder's OpenFetchStream binds this fetch's +// Request ID onto the FETCH_HEADER uni-stream automatically. +// +// The server side runs in a goroutine and the client reads on the main +// goroutine in send order (FETCH_OK, then the FETCH_HEADER stream): the +// sessiontest streams are synchronous pipes, so a writer needs a concurrent +// reader. +func TestAcceptFetchRepliesFetchOK(t *testing.T) { + t.Parallel() + client, server := openPair(t) + ctx := t.Context() + + fetchMsg := &message.Fetch{ + RequestID: client.AllocRequestID(), + FetchType: message.FetchTypeStandalone, + Standalone: &message.StandaloneFetch{ + Namespace: wire.Namespace("ns"), + Name: []byte("clip"), + }, + } + + srvErr := make(chan error, 1) + go func() { + srvErr <- func() error { + req, err := server.AcceptRequest(ctx) + if err != nil { + return err + } + resp, err := req.AcceptFetch(&message.FetchOK{ + EndOfTrack: true, + EndLocation: message.Location{Group: 5, Object: 9}, + }) + if err != nil { + return err + } + fs, err := resp.OpenFetchStream() // Request ID bound automatically + if err != nil { + return err + } + return fs.Close() + }() + }() + + fr, err := client.Fetch(ctx, fetchMsg) + if err != nil { + t.Fatalf("client Fetch: %v", err) + } + if !fr.OK.EndOfTrack || fr.OK.EndLocation != (message.Location{Group: 5, Object: 9}) { + t.Errorf("FETCH_OK = %+v, want EndOfTrack and EndLocation {5,9}", fr.OK) + } + + ds, err := client.AcceptDataStream(ctx) + if err != nil { + t.Fatalf("AcceptDataStream: %v", err) + } + fin, ok := ds.(*session.IncomingFetchStream) + if !ok { + t.Fatalf("got %T, want *IncomingFetchStream", ds) + } + if fin.Header.RequestID != fetchMsg.RequestID { + t.Errorf("FETCH_HEADER RequestID = %d, want %d (bound by the responder)", + fin.Header.RequestID, fetchMsg.RequestID) + } + if err := <-srvErr; err != nil { + t.Fatalf("server: %v", err) + } +} + +// TestAcceptTrackStatusRepliesOK checks AcceptTrackStatus replies TRACK_STATUS_OK +// (a REQUEST_OK) carrying the supplied Track Properties. +func TestAcceptTrackStatusRepliesOK(t *testing.T) { + t.Parallel() + client, server := openPair(t) + + ts := &message.TrackStatus{ + RequestID: client.AllocRequestID(), + Namespace: wire.Namespace("ns"), + Name: []byte("t"), + } + _, resp := runRequestRoundTrip(t, client, server, ts, func(r *session.Request) error { + return r.AcceptTrackStatus(&message.TrackStatusOK{}) + }) + if _, ok := resp.(*message.RequestOK); !ok { + t.Fatalf("client got %s, want REQUEST_OK (TRACK_STATUS_OK)", resp.Type()) + } +} + +// TestAcceptNamespaceRequestsReplyOK checks the three namespace-request accept +// helpers each reply REQUEST_OK and return a handle. +func TestAcceptNamespaceRequestsReplyOK(t *testing.T) { + t.Parallel() + + t.Run("PublishNamespace", func(t *testing.T) { + t.Parallel() + client, server := openPair(t) + m := &message.PublishNamespace{RequestID: client.AllocRequestID(), Namespace: wire.Namespace("ns")} + _, resp := runRequestRoundTrip(t, client, server, m, func(r *session.Request) error { + _, err := r.AcceptPublishNamespace() + return err + }) + if _, ok := resp.(*message.RequestOK); !ok { + t.Fatalf("got %s, want REQUEST_OK", resp.Type()) + } + }) + + t.Run("SubscribeNamespace", func(t *testing.T) { + t.Parallel() + client, server := openPair(t) + m := &message.SubscribeNamespace{RequestID: client.AllocRequestID(), TrackNamespacePrefix: wire.Namespace("ns")} + _, resp := runRequestRoundTrip(t, client, server, m, func(r *session.Request) error { + _, err := r.AcceptSubscribeNamespace() + return err + }) + if _, ok := resp.(*message.RequestOK); !ok { + t.Fatalf("got %s, want REQUEST_OK", resp.Type()) + } + }) + + t.Run("SubscribeTracks", func(t *testing.T) { + t.Parallel() + client, server := openPair(t) + m := &message.SubscribeTracks{RequestID: client.AllocRequestID(), TrackNamespacePrefix: wire.Namespace("ns")} + _, resp := runRequestRoundTrip(t, client, server, m, func(r *session.Request) error { + _, err := r.AcceptSubscribeTracks() + return err + }) + if _, ok := resp.(*message.RequestOK); !ok { + t.Fatalf("got %s, want REQUEST_OK", resp.Type()) + } + }) +} + +// TestAcceptSubscribeTracksWritePublishBlocked checks the publisher-side +// WritePublishBlocked round-trips to the subscriber's ReadPublishBlocked. The +// server side runs in a goroutine so its REQUEST_OK and PUBLISH_BLOCKED writes +// have the client's concurrent reads (SubscribeTracks, ReadPublishBlocked) to +// drain the synchronous sessiontest pipes. +func TestAcceptSubscribeTracksWritePublishBlocked(t *testing.T) { + t.Parallel() + client, server := openPair(t) + ctx := t.Context() + + srvErr := make(chan error, 1) + go func() { + srvErr <- func() error { + req, err := server.AcceptRequest(ctx) + if err != nil { + return err + } + pub, err := req.AcceptSubscribeTracks() + if err != nil { + return err + } + return pub.WritePublishBlocked(&message.PublishBlocked{ + TrackNamespaceSuffix: wire.Namespace("ns"), + TrackName: []byte("blocked-track"), + }) + }() + }() + + ts, err := client.SubscribeTracks(ctx, &message.SubscribeTracks{ + TrackNamespacePrefix: wire.Namespace("ns"), + }) + if err != nil { + t.Fatalf("client SubscribeTracks: %v", err) + } + defer ts.Close() + + pb, err := ts.ReadPublishBlocked() + if err != nil { + t.Fatalf("ReadPublishBlocked: %v", err) + } + if string(pb.TrackName) != "blocked-track" { + t.Errorf("PUBLISH_BLOCKED TrackName = %q, want %q", pb.TrackName, "blocked-track") + } + if err := <-srvErr; err != nil { + t.Fatalf("server: %v", err) + } +} + +// TestAcceptPublishReturnsHandle checks AcceptPublish replies REQUEST_OK and +// returns a handle reporting the publisher-assigned Track Alias. +func TestAcceptPublishReturnsHandle(t *testing.T) { + t.Parallel() + client, server := openPair(t) + ctx := t.Context() + + type result struct { + pub *session.Publication + err error + } + done := make(chan result, 1) + go func() { + pub, err := client.Publish(ctx, &message.Publish{ + Namespace: wire.Namespace("ns"), + Name: []byte("track"), + TrackAlias: 11, + }) + done <- result{pub, err} + }() + + req, err := server.AcceptRequest(ctx) + if err != nil { + t.Fatalf("AcceptRequest: %v", err) + } + recv, err := req.AcceptPublish() + if err != nil { + t.Fatalf("AcceptPublish: %v", err) + } + if recv.TrackAlias() != 11 { + t.Errorf("IncomingPublication.TrackAlias = %d, want 11", recv.TrackAlias()) + } + + got := <-done + if got.err != nil { + t.Fatalf("client Publish: %v", got.err) + } + _ = got.pub +} diff --git a/pkg/moqt/session/example_test.go b/pkg/moqt/session/example_test.go index 4ce0e78..a78c389 100644 --- a/pkg/moqt/session/example_test.go +++ b/pkg/moqt/session/example_test.go @@ -442,9 +442,19 @@ func ExampleSession_AcceptRequest() { case *message.Publish: // AcceptPublish registers the Track Alias (§11.1) and replies // REQUEST_OK; objects arrive via Session.AcceptDataStream. - _ = req.AcceptPublish() + recv, err := req.AcceptPublish() + if err != nil { + return + } + _ = recv // recv.TrackAlias() / recv.Update(...) for follow-ups. case *message.Fetch: - _ = req.RejectError(moqt.RequestDoesNotExist, "no such range") + // AcceptFetch replies FETCH_OK and returns a responder whose + // OpenFetchStream is pre-bound to this fetch's Request ID. + resp, err := req.AcceptFetch(nil) + if err != nil { + return + } + _ = resp // resp.OpenFetchStream() to stream the response objects. } } } diff --git a/pkg/moqt/session/fetch.go b/pkg/moqt/session/fetch.go index 943714c..dff68c8 100644 --- a/pkg/moqt/session/fetch.go +++ b/pkg/moqt/session/fetch.go @@ -60,6 +60,52 @@ func (s *Session) Fetch(ctx context.Context, m *message.Fetch) (*FetchRequest, e }) } +// FetchResponder is the publisher side of a FETCH (§10.12) this endpoint +// accepted via [Request.AcceptFetch] — the accept-side counterpart of +// [Session.Fetch]. FETCH_OK has already been written on the embedded request +// stream; the response objects are streamed on a separate FETCH_HEADER +// uni-stream (§11.5) opened via [FetchResponder.OpenFetchStream], which binds +// this fetch's Request ID automatically. The embedded request stream stays open +// for REQUEST_UPDATE follow-ups. +type FetchResponder struct { + // Stream is the FETCH request stream, still open for REQUEST_UPDATE + // follow-ups. Close it to end the fetch. + Stream + + s *Session + requestID uint64 +} + +// OpenFetchStream opens the outbound FETCH_HEADER uni-stream (§11.5) carrying +// this fetch's response objects, with the Request ID bound automatically. The +// caller MUST Close the returned stream to FIN it once all objects are written, +// or Cancel to reset. It is [Session.OpenFetchStream] pre-bound to this fetch. +func (f *FetchResponder) OpenFetchStream() (*OutgoingFetchStream, error) { + return f.s.OpenFetchStream(message.FetchHeader{RequestID: f.requestID}) +} + +// AcceptFetch accepts an inbound FETCH (§10.12) and returns a [FetchResponder] +// for streaming the response objects — the accept-side counterpart of +// [Session.Fetch]. r.First MUST be a *message.Fetch. +// +// ok carries the FETCH_OK fields the caller wants to set (EndOfTrack, +// EndLocation, negotiated Parameters, TrackProperties); it may be nil for the +// all-default reply. AcceptFetch writes FETCH_OK and returns a responder whose +// [FetchResponder.OpenFetchStream] is pre-bound to this fetch's Request ID. +func (r *Request) AcceptFetch(ok *message.FetchOK) (*FetchResponder, error) { + f, isFetch := r.First.(*message.Fetch) + if !isFetch { + return nil, fmt.Errorf("moqt/session: AcceptFetch on a %s request", r.First.Type()) + } + if ok == nil { + ok = &message.FetchOK{} + } + if err := message.Marshal(r.Stream, ok); err != nil { + return nil, fmt.Errorf("moqt/session: write FETCH_OK: %w", err) + } + return &FetchResponder{Stream: r.Stream, s: r.s, requestID: f.RequestID}, nil +} + // OpenFetchStream opens an outbound FETCH_HEADER uni-stream (§11.5), // writes the header (Type + Request ID), and returns the body writer. The // caller MUST Close to FIN the stream once all fetch objects have been diff --git a/pkg/moqt/session/namespace.go b/pkg/moqt/session/namespace.go index 01f211f..6a90687 100644 --- a/pkg/moqt/session/namespace.go +++ b/pkg/moqt/session/namespace.go @@ -94,6 +94,95 @@ func (s *Session) SubscribeTracks(ctx context.Context, m *message.SubscribeTrack }) } +// IncomingNamespacePublication is an accepted inbound PUBLISH_NAMESPACE (§10.15) +// — the receiving side of [Session.PublishNamespace]'s [NamespacePublication], +// returned by [Request.AcceptPublishNamespace]. REQUEST_OK has been sent; the +// announcer's NAMESPACE / NAMESPACE_DONE follow-ups arrive by reading the +// embedded stream (e.g. via message.Parse). Close it to end the publication. +type IncomingNamespacePublication struct { + // Stream is the PUBLISH_NAMESPACE request stream, still open to receive + // NAMESPACE / NAMESPACE_DONE notifications. Close it to end the publication. + Stream +} + +// IncomingNamespaceSubscription is an accepted inbound SUBSCRIBE_NAMESPACE +// (§10.18) — the announcing side of [Session.SubscribeNamespace]'s +// [NamespaceSubscription], returned by [Request.AcceptSubscribeNamespace]. +// REQUEST_OK has been sent; the caller announces matching namespaces by writing +// NAMESPACE / NAMESPACE_DONE to the embedded stream (e.g. via message.Marshal). +// Close it to end the subscription. +type IncomingNamespaceSubscription struct { + // Stream is the SUBSCRIBE_NAMESPACE request stream, still open for + // NAMESPACE / NAMESPACE_DONE follow-ups. Close it to end the subscription. + Stream +} + +// IncomingTrackSubscription is an accepted inbound SUBSCRIBE_TRACKS (§10.19) — +// the publishing side of [Session.SubscribeTracks]'s [TrackSubscription], +// returned by [Request.AcceptSubscribeTracks]. REQUEST_OK has been sent; the +// publisher forwards matching tracks as PUBLISH requests on new streams (see +// [Session.OpenPublish]) and signals stream exhaustion with +// [IncomingTrackSubscription.WritePublishBlocked] (§6.1 / §10.20). Close it to +// end the subscription. +type IncomingTrackSubscription struct { + // Stream is the SUBSCRIBE_TRACKS request stream, still open for + // PUBLISH_BLOCKED follow-ups. Close it to end the subscription. + Stream +} + +// WritePublishBlocked sends a PUBLISH_BLOCKED (§6.1 / §10.20) on the +// SUBSCRIBE_TRACKS stream, telling the subscriber the publisher could not open a +// PUBLISH stream for the named track because it has no available bidirectional +// streams. It is the publisher-side counterpart of +// [TrackSubscription.ReadPublishBlocked]. +func (t *IncomingTrackSubscription) WritePublishBlocked(pb *message.PublishBlocked) error { + return message.Marshal(t.Stream, pb) +} + +// AcceptPublishNamespace accepts an inbound PUBLISH_NAMESPACE (§10.15), replies +// REQUEST_OK, and returns an [IncomingNamespacePublication] for receiving the +// announcer's NAMESPACE / NAMESPACE_DONE follow-ups — the accept-side +// counterpart of [Session.PublishNamespace]. r.First MUST be a +// *message.PublishNamespace. +func (r *Request) AcceptPublishNamespace() (*IncomingNamespacePublication, error) { + if _, ok := r.First.(*message.PublishNamespace); !ok { + return nil, fmt.Errorf("moqt/session: AcceptPublishNamespace on a %s request", r.First.Type()) + } + if err := message.Marshal(r.Stream, &message.RequestOK{}); err != nil { + return nil, fmt.Errorf("moqt/session: write PUBLISH_NAMESPACE REQUEST_OK: %w", err) + } + return &IncomingNamespacePublication{Stream: r.Stream}, nil +} + +// AcceptSubscribeNamespace accepts an inbound SUBSCRIBE_NAMESPACE (§10.18), +// replies REQUEST_OK, and returns an [IncomingNamespaceSubscription] for +// announcing matching namespaces via NAMESPACE / NAMESPACE_DONE — the +// accept-side counterpart of [Session.SubscribeNamespace]. r.First MUST be a +// *message.SubscribeNamespace. +func (r *Request) AcceptSubscribeNamespace() (*IncomingNamespaceSubscription, error) { + if _, ok := r.First.(*message.SubscribeNamespace); !ok { + return nil, fmt.Errorf("moqt/session: AcceptSubscribeNamespace on a %s request", r.First.Type()) + } + if err := message.Marshal(r.Stream, &message.RequestOK{}); err != nil { + return nil, fmt.Errorf("moqt/session: write SUBSCRIBE_NAMESPACE REQUEST_OK: %w", err) + } + return &IncomingNamespaceSubscription{Stream: r.Stream}, nil +} + +// AcceptSubscribeTracks accepts an inbound SUBSCRIBE_TRACKS (§10.19), replies +// REQUEST_OK, and returns an [IncomingTrackSubscription] for forwarding matching +// PUBLISHes and sending PUBLISH_BLOCKED follow-ups — the accept-side counterpart +// of [Session.SubscribeTracks]. r.First MUST be a *message.SubscribeTracks. +func (r *Request) AcceptSubscribeTracks() (*IncomingTrackSubscription, error) { + if _, ok := r.First.(*message.SubscribeTracks); !ok { + return nil, fmt.Errorf("moqt/session: AcceptSubscribeTracks on a %s request", r.First.Type()) + } + if err := message.Marshal(r.Stream, &message.RequestOK{}); err != nil { + return nil, fmt.Errorf("moqt/session: write SUBSCRIBE_TRACKS REQUEST_OK: %w", err) + } + return &IncomingTrackSubscription{Stream: r.Stream}, nil +} + // ReadPublishBlocked reads the next follow-up message on this SUBSCRIBE_TRACKS // response stream and returns it as a PUBLISH_BLOCKED. // diff --git a/pkg/moqt/session/publish.go b/pkg/moqt/session/publish.go index a6f329d..9520747 100644 --- a/pkg/moqt/session/publish.go +++ b/pkg/moqt/session/publish.go @@ -71,6 +71,39 @@ func (p *Publication) Done(code moqt.PublishDoneCode, reason string) error { return p.Stream.Close() } +// IncomingPublication is the receiving side of a publisher-initiated PUBLISH +// (§10.10) this endpoint accepted via [Request.AcceptPublish] — the accept-side +// counterpart of [Session.Subscribe]'s [Subscription]. The objects arrive on +// subgroup uni-streams (or datagrams) keyed by [IncomingPublication.TrackAlias] +// and are consumed via [Session.AcceptDataStream]; the embedded request stream +// stays open for follow-ups — PUBLISH_DONE from the publisher, or a +// REQUEST_UPDATE this side sends via [IncomingPublication.Update] to adjust +// forwarding (§10.9). Close it to end the reception. +type IncomingPublication struct { + // Stream is the PUBLISH request stream, still open for follow-up traffic + // (inbound PUBLISH_DONE, outbound REQUEST_UPDATE). Close it to end the + // reception. + Stream + + s *Session + alias uint64 + requestID uint64 +} + +// TrackAlias reports the §11.1 Track Alias the publisher assigned — the integer +// inbound subgroup and datagram streams carry to identify this track (resolve it +// via [Session.LookupInboundTrackAlias]). +func (p *IncomingPublication) TrackAlias() uint64 { return p.alias } + +// Update sends a REQUEST_UPDATE (§10.9) on the publication stream and awaits the +// single REQUEST_OK / REQUEST_ERROR the spec mandates. params carries only the +// fields to change; any parameter omitted keeps its prior value on the peer. It +// is [Session.UpdateRequest] with this publication's stream and Request ID +// filled in. +func (p *IncomingPublication) Update(ctx context.Context, params message.Parameters) (*message.RequestOK, error) { + return p.s.UpdateRequest(ctx, p.Stream, p.requestID, params) +} + // Publish opens a PUBLISH request stream (§10.10) and awaits the peer's // initial response. It is [Session.OpenPublish] plus the response wait: the // session assigns m.RequestID (after the stream opens, so a blocked open diff --git a/pkg/moqt/session/publish_test.go b/pkg/moqt/session/publish_test.go index f6ee78f..b080ee6 100644 --- a/pkg/moqt/session/publish_test.go +++ b/pkg/moqt/session/publish_test.go @@ -231,7 +231,7 @@ func TestRequestAcceptPublish(t *testing.T) { serverErr = err return } - if err := req.AcceptPublish(); err != nil { + if _, err := req.AcceptPublish(); err != nil { serverErr = err return } diff --git a/pkg/moqt/session/request.go b/pkg/moqt/session/request.go index f532756..ce0040e 100644 --- a/pkg/moqt/session/request.go +++ b/pkg/moqt/session/request.go @@ -398,20 +398,25 @@ func (r *Request) AcceptSubscribe(ok *message.SubscribeOK) (*Publication, error) // AcceptPublish accepts an inbound PUBLISH (§10.10): it registers the // publisher-assigned Track Alias (§11.1, so inbound subgroup/datagram streams -// resolve to this track and a reused alias is caught as DUPLICATE_TRACK_ALIAS) -// and replies REQUEST_OK. r.First MUST be a *message.Publish. The objects arrive -// on subgroup uni-streams via [Session.AcceptDataStream]. +// resolve to this track and a reused alias is caught as DUPLICATE_TRACK_ALIAS), +// replies REQUEST_OK, and returns an [IncomingPublication] for the receiving +// side — the accept-side counterpart of [Session.Publish]. r.First MUST be a +// *message.Publish. The objects arrive on subgroup uni-streams via +// [Session.AcceptDataStream]. // // If the alias collides with a different already-registered track, // *ErrDuplicateTrackAlias is returned WITHOUT replying OK; the caller MUST close // the session with [moqt.SessionDuplicateTrackAlias] (§11.1). -func (r *Request) AcceptPublish() error { +func (r *Request) AcceptPublish() (*IncomingPublication, error) { pub, isPub := r.First.(*message.Publish) if !isPub { - return fmt.Errorf("moqt/session: AcceptPublish on a %s request", r.First.Type()) + return nil, fmt.Errorf("moqt/session: AcceptPublish on a %s request", r.First.Type()) } if err := r.s.RegisterInboundTrackAlias(pub.TrackAlias, track.NewKey(pub.Namespace, pub.Name)); err != nil { - return err + return nil, err + } + if err := message.Marshal(r.Stream, &message.RequestOK{}); err != nil { + return nil, fmt.Errorf("moqt/session: write PUBLISH REQUEST_OK: %w", err) } - return message.Marshal(r.Stream, &message.RequestOK{}) + return &IncomingPublication{Stream: r.Stream, s: r.s, alias: pub.TrackAlias, requestID: pub.RequestID}, nil } diff --git a/pkg/moqt/session/subscribe_test.go b/pkg/moqt/session/subscribe_test.go index 29e26cf..4178da4 100644 --- a/pkg/moqt/session/subscribe_test.go +++ b/pkg/moqt/session/subscribe_test.go @@ -493,7 +493,7 @@ func TestAcceptWrongType(t *testing.T) { Name: []byte("t"), } _, gotResp := runRequestRoundTrip(t, client, server, sub, func(r *session.Request) error { - if err := r.AcceptPublish(); err == nil { + if _, err := r.AcceptPublish(); err == nil { t.Error("AcceptPublish on a SUBSCRIBE request: want error, got nil") } return r.RejectError(moqt.RequestDoesNotExist, "wrong type") diff --git a/pkg/moqt/session/track_status.go b/pkg/moqt/session/track_status.go index c595d75..ab16e92 100644 --- a/pkg/moqt/session/track_status.go +++ b/pkg/moqt/session/track_status.go @@ -2,6 +2,7 @@ package session import ( "context" + "fmt" "github.com/floatdrop/moq-go/pkg/moqt/message" ) @@ -33,6 +34,29 @@ func (t *TrackStatusRequest) Update(ctx context.Context, params message.Paramete return t.s.UpdateRequest(ctx, t.Stream, t.requestID, params) } +// AcceptTrackStatus accepts an inbound TRACK_STATUS (§10.14) and replies +// TRACK_STATUS_OK with the given status fields — the accept-side counterpart of +// [Session.TrackStatus]. r.First MUST be a *message.TrackStatus. +// +// ok carries the TRACK_STATUS_OK fields (status, largest location, Track +// Properties — [message.TrackStatusOK] is an alias of [message.RequestOK]); it +// may be nil for the all-default reply. Unlike the other Accept* helpers, +// TRACK_STATUS is a one-shot status query with no object-push or follow-up +// stream, so no handle is returned; use [Request.Stream] directly to service a +// later REQUEST_UPDATE. +func (r *Request) AcceptTrackStatus(ok *message.TrackStatusOK) error { + if _, isTS := r.First.(*message.TrackStatus); !isTS { + return fmt.Errorf("moqt/session: AcceptTrackStatus on a %s request", r.First.Type()) + } + if ok == nil { + ok = &message.TrackStatusOK{} + } + if err := message.Marshal(r.Stream, ok); err != nil { + return fmt.Errorf("moqt/session: write TRACK_STATUS_OK: %w", err) + } + return nil +} + // TrackStatus opens a TRACK_STATUS request stream (§10.14) and awaits // REQUEST_OK (TRACK_STATUS_OK) or REQUEST_ERROR. The session assigns // m.RequestID; the caller supplies Namespace, Name, and optional Parameters. From 0f01181c887d42a2f3bc3f0b7836f8c6cd1dcafe Mon Sep 17 00:00:00 2001 From: Vsevolod Strukchinsky Date: Mon, 29 Jun 2026 09:45:52 +0500 Subject: [PATCH 2/2] docs(session): demonstrate all Accept* helpers in the example The README "Examples" table maps each topic to a real, compile-checked Example function; the accept-side row instead listed bare method names, which is neither an example nor verified. Extend ExampleSession_AcceptRequest to dispatch and answer every request type via its Accept* helper (adding the TrackStatus and namespace cases) and point the single "Accept requests" row at it, dropping the off-convention method list. Co-Authored-By: Claude Opus 4.8 --- README.md | 3 +-- pkg/moqt/session/example_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b68feee..0a8dcfe 100644 --- a/README.md +++ b/README.md @@ -137,8 +137,7 @@ grouped here by the file they live in: | End a publication | `Example_endingAPublication` | | Stream exhaustion (PUBLISH_BLOCKED) | `ExampleSession_OpenPublish`, `ExampleSession_ReadPublishBlocked` | | Announce / discover namespaces | `ExampleSession_PublishNamespace`, `ExampleSession_SubscribeNamespace` | -| Accept requests (server side) | `ExampleSession_AcceptRequest` | -| Accept-side helpers (`Accept*`) | `Request.AcceptSubscribe` / `AcceptPublish` / `AcceptFetch` / `AcceptTrackStatus` / `AcceptPublishNamespace` / `AcceptSubscribeNamespace` / `AcceptSubscribeTracks` | +| Accept requests + reply (`Accept*` helpers) | `ExampleSession_AcceptRequest` | | Graceful shutdown (GOAWAY) | `ExampleSession_SendGoaway`, `ExampleSession_OnGoaway` | **[`relay`](pkg/relay/example_test.go)** diff --git a/pkg/moqt/session/example_test.go b/pkg/moqt/session/example_test.go index a78c389..5884b83 100644 --- a/pkg/moqt/session/example_test.go +++ b/pkg/moqt/session/example_test.go @@ -455,6 +455,34 @@ func ExampleSession_AcceptRequest() { return } _ = resp // resp.OpenFetchStream() to stream the response objects. + case *message.TrackStatus: + // AcceptTrackStatus replies TRACK_STATUS_OK. It is a one-shot status + // query, so (unlike the others) it returns no handle. + _ = req.AcceptTrackStatus(nil) + case *message.PublishNamespace: + // AcceptPublishNamespace replies REQUEST_OK; NAMESPACE / NAMESPACE_DONE + // follow-ups arrive by reading the returned handle (e.g. message.Parse). + ann, err := req.AcceptPublishNamespace() + if err != nil { + return + } + _ = ann + case *message.SubscribeNamespace: + // AcceptSubscribeNamespace replies REQUEST_OK; announce matching + // namespaces by writing NAMESPACE / NAMESPACE_DONE to the handle. + sub, err := req.AcceptSubscribeNamespace() + if err != nil { + return + } + _ = sub + case *message.SubscribeTracks: + // AcceptSubscribeTracks replies REQUEST_OK; forward matching tracks as + // PUBLISHes and signal stream exhaustion with WritePublishBlocked. + tracks, err := req.AcceptSubscribeTracks() + if err != nil { + return + } + _ = tracks // tracks.WritePublishBlocked(...) on §6.1 stream exhaustion. } } }