From ba32e92bdb9849b4bf3700044dc7edb946f155a9 Mon Sep 17 00:00:00 2001 From: Vsevolod Strukchinsky Date: Mon, 29 Jun 2026 00:27:33 +0500 Subject: [PATCH 1/2] refactor(session): collapse six request openers into a generic helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Subscribe, Fetch, TrackStatus, PublishNamespace, SubscribeNamespace, and SubscribeTracks all repeated the same §10.1 skeleton: openAllocRequest → readResponse → switch on OK / REQUEST_ERROR / default → wrap. Fetch and TrackStatus even carried //nolint:dupl waivers for it. Extract awaitRequestResponse[OK, R], a generic primitive that owns the open / await-OK / error-mapping skeleton and delegates only the success case (OK type + handle construction) to a per-opener closure. The REQUEST_ERROR → *RequestRejectedError mapping and the "unexpected response" and "read X response" error strings now live in one place. The operation name in those errors is derived from m.Type() rather than a passed-in label. Net change removes both dupl waivers and ~150 lines of boilerplate; no behavior change. Co-Authored-By: Claude Opus 4.8 --- pkg/moqt/session/fetch.go | 35 +++++----------- pkg/moqt/session/namespace.go | 69 ++++++-------------------------- pkg/moqt/session/request.go | 42 +++++++++++++++++++ pkg/moqt/session/subscribe.go | 48 ++++++++-------------- pkg/moqt/session/track_status.go | 38 +++++------------- 5 files changed, 89 insertions(+), 143 deletions(-) diff --git a/pkg/moqt/session/fetch.go b/pkg/moqt/session/fetch.go index 3f5cba3..943714c 100644 --- a/pkg/moqt/session/fetch.go +++ b/pkg/moqt/session/fetch.go @@ -48,33 +48,16 @@ func (f *FetchRequest) Update(ctx context.Context, params message.Parameters) (* // // On REQUEST_ERROR the stream is closed and a *RequestRejectedError is // returned. -// -//nolint:dupl // distinct §10.12 op; shares the mandated open/await-OK skeleton with TrackStatus (§10.14) but differs in message type and FETCH_OK handling. func (s *Session) Fetch(ctx context.Context, m *message.Fetch) (*FetchRequest, error) { - stream, err := s.openAllocRequest(m) - if err != nil { - return nil, err - } - resp, err := s.readResponse(ctx, stream) - if err != nil { - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: read FETCH response: %w", err) - } - switch r := resp.(type) { - case *message.FetchOK: - // §2.5.1: reject tracks with unknown mandatory track properties. - if err := s.validateTrackProperties(r.TrackProperties, "FETCH_OK"); err != nil { - _ = stream.Close() - return nil, err - } - return &FetchRequest{Stream: stream, OK: r, s: s, requestID: m.RequestID}, nil - case *message.RequestError: - _ = stream.Close() - return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason} - default: - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: unexpected %s in FETCH response", resp.Type()) - } + return awaitRequestResponse(ctx, s, m, + func(stream Stream, ok *message.FetchOK) (*FetchRequest, error) { + // §2.5.1: reject tracks with unknown mandatory track properties. + if err := s.validateTrackProperties(ok.TrackProperties, "FETCH_OK"); err != nil { + _ = stream.Close() + return nil, err + } + return &FetchRequest{Stream: stream, OK: ok, s: s, requestID: m.RequestID}, nil + }) } // OpenFetchStream opens an outbound FETCH_HEADER uni-stream (§11.5), diff --git a/pkg/moqt/session/namespace.go b/pkg/moqt/session/namespace.go index cca97b1..01f211f 100644 --- a/pkg/moqt/session/namespace.go +++ b/pkg/moqt/session/namespace.go @@ -57,25 +57,10 @@ func (s *Session) PublishNamespace( ctx context.Context, m *message.PublishNamespace, ) (*NamespacePublication, error) { - stream, err := s.openAllocRequest(m) - if err != nil { - return nil, err - } - resp, err := s.readResponse(ctx, stream) - if err != nil { - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: read PUBLISH_NAMESPACE response: %w", err) - } - switch r := resp.(type) { - case *message.RequestOK: - return &NamespacePublication{Stream: stream, OK: r}, nil - case *message.RequestError: - _ = stream.Close() - return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason} - default: - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: unexpected %s in PUBLISH_NAMESPACE response", resp.Type()) - } + return awaitRequestResponse(ctx, s, m, + func(stream Stream, ok *message.RequestOK) (*NamespacePublication, error) { + return &NamespacePublication{Stream: stream, OK: ok}, nil + }) } // SubscribeNamespace opens a SUBSCRIBE_NAMESPACE request stream (§10.18) and @@ -89,25 +74,10 @@ func (s *Session) SubscribeNamespace( ctx context.Context, m *message.SubscribeNamespace, ) (*NamespaceSubscription, error) { - stream, err := s.openAllocRequest(m) - if err != nil { - return nil, err - } - resp, err := s.readResponse(ctx, stream) - if err != nil { - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: read SUBSCRIBE_NAMESPACE response: %w", err) - } - switch r := resp.(type) { - case *message.RequestOK: - return &NamespaceSubscription{Stream: stream, OK: r}, nil - case *message.RequestError: - _ = stream.Close() - return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason} - default: - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: unexpected %s in SUBSCRIBE_NAMESPACE response", resp.Type()) - } + return awaitRequestResponse(ctx, s, m, + func(stream Stream, ok *message.RequestOK) (*NamespaceSubscription, error) { + return &NamespaceSubscription{Stream: stream, OK: ok}, nil + }) } // SubscribeTracks opens a SUBSCRIBE_TRACKS request stream (§10.19) and awaits @@ -118,25 +88,10 @@ func (s *Session) SubscribeNamespace( // for PUBLISH_BLOCKED follow-ups (read via [TrackSubscription.ReadPublishBlocked]). // On REQUEST_ERROR the stream is closed and a *RequestRejectedError is returned. func (s *Session) SubscribeTracks(ctx context.Context, m *message.SubscribeTracks) (*TrackSubscription, error) { - stream, err := s.openAllocRequest(m) - if err != nil { - return nil, err - } - resp, err := s.readResponse(ctx, stream) - if err != nil { - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: read SUBSCRIBE_TRACKS response: %w", err) - } - switch r := resp.(type) { - case *message.RequestOK: - return &TrackSubscription{Stream: stream, OK: r}, nil - case *message.RequestError: - _ = stream.Close() - return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason} - default: - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: unexpected %s in SUBSCRIBE_TRACKS response", resp.Type()) - } + return awaitRequestResponse(ctx, s, m, + func(stream Stream, ok *message.RequestOK) (*TrackSubscription, error) { + return &TrackSubscription{Stream: stream, OK: ok}, nil + }) } // ReadPublishBlocked reads the next follow-up message on this SUBSCRIBE_TRACKS diff --git a/pkg/moqt/session/request.go b/pkg/moqt/session/request.go index ab451f5..b51f299 100644 --- a/pkg/moqt/session/request.go +++ b/pkg/moqt/session/request.go @@ -264,6 +264,48 @@ func (s *Session) readResponse(ctx context.Context, stream Stream) (message.Mess return msg, err } +// awaitRequestResponse opens a request stream for m (allocating its Request ID +// only after the open succeeds — see [Session.openAllocRequest]), awaits the +// peer's initial response, and dispatches it: +// +// - the expected success type OK is handed to onOK, which owns the still-open +// stream from that point: it wraps the stream in the typed handle, or closes +// it and returns an error (e.g. on Track Property validation failure); +// - REQUEST_ERROR (§10.5) is surfaced as a *RequestRejectedError and the +// stream is closed; +// - any other message is an unexpected-response error and the stream is closed. +// +// Error messages name the operation via m.Type() (e.g. "SUBSCRIBE"). It is the +// single primitive beneath [Session.Subscribe], [Session.Fetch], +// [Session.TrackStatus], and the three namespace request openers, which share +// this §10.1 open / await-OK skeleton and differ only in OK type and success +// handling. +func awaitRequestResponse[OK message.Message, R any]( + ctx context.Context, + s *Session, + m message.WithRequestID, + onOK func(stream Stream, ok OK) (R, error), +) (R, error) { + var zero R + stream, err := s.openAllocRequest(m) + if err != nil { + return zero, err + } + resp, err := s.readResponse(ctx, stream) + if err != nil { + _ = stream.Close() + return zero, fmt.Errorf("moqt/session: read %s response: %w", m.Type(), err) + } + if ok, isOK := resp.(OK); isOK { + return onOK(stream, ok) + } + _ = stream.Close() + if rerr, isErr := resp.(*message.RequestError); isErr { + return zero, &RequestRejectedError{Code: rerr.ErrorCode, Reason: rerr.ErrorReason} + } + return zero, fmt.Errorf("moqt/session: unexpected %s in %s response", resp.Type(), m.Type()) +} + // UpdateRequest sends a REQUEST_UPDATE (§10.9) on an already-established // request stream and awaits the single REQUEST_OK / REQUEST_ERROR the spec // mandates in response. requestID MUST be the Request ID of the original diff --git a/pkg/moqt/session/subscribe.go b/pkg/moqt/session/subscribe.go index 4bf44e3..d15fc68 100644 --- a/pkg/moqt/session/subscribe.go +++ b/pkg/moqt/session/subscribe.go @@ -2,7 +2,6 @@ package session import ( "context" - "fmt" "github.com/floatdrop/moq-go/pkg/moqt/message" "github.com/floatdrop/moq-go/pkg/moqt/track" @@ -51,35 +50,20 @@ func (sub *Subscription) Update(ctx context.Context, params message.Parameters) // streams. REQUEST_ERROR is surfaced as a *RequestRejectedError and the stream // is closed. func (s *Session) Subscribe(ctx context.Context, m *message.Subscribe) (*Subscription, error) { - stream, err := s.openAllocRequest(m) - if err != nil { - return nil, err - } - resp, err := s.readResponse(ctx, stream) - if err != nil { - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: read SUBSCRIBE response: %w", err) - } - switch r := resp.(type) { - case *message.SubscribeOK: - // §2.5.1: reject tracks with unknown mandatory track properties. - if err := s.validateTrackProperties(r.TrackProperties, "SUBSCRIBE_OK"); err != nil { - _ = stream.Close() - return nil, err - } - // §11.1: register the alias the publisher assigned so we can detect - // DUPLICATE_TRACK_ALIAS if the same alias is reused for a different track. - key := track.NewKey(m.Namespace, m.Name) - if err := s.RegisterInboundTrackAlias(r.TrackAlias, key); err != nil { - _ = stream.Close() - return nil, err - } - return &Subscription{Stream: stream, OK: r, s: s, requestID: m.RequestID}, nil - case *message.RequestError: - _ = stream.Close() - return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason} - default: - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: unexpected %s in SUBSCRIBE response", resp.Type()) - } + return awaitRequestResponse(ctx, s, m, + func(stream Stream, ok *message.SubscribeOK) (*Subscription, error) { + // §2.5.1: reject tracks with unknown mandatory track properties. + if err := s.validateTrackProperties(ok.TrackProperties, "SUBSCRIBE_OK"); err != nil { + _ = stream.Close() + return nil, err + } + // §11.1: register the alias the publisher assigned so we can detect + // DUPLICATE_TRACK_ALIAS if the same alias is reused for a different track. + key := track.NewKey(m.Namespace, m.Name) + if err := s.RegisterInboundTrackAlias(ok.TrackAlias, key); err != nil { + _ = stream.Close() + return nil, err + } + return &Subscription{Stream: stream, OK: ok, s: s, requestID: m.RequestID}, nil + }) } diff --git a/pkg/moqt/session/track_status.go b/pkg/moqt/session/track_status.go index 5c244c5..c595d75 100644 --- a/pkg/moqt/session/track_status.go +++ b/pkg/moqt/session/track_status.go @@ -2,7 +2,6 @@ package session import ( "context" - "fmt" "github.com/floatdrop/moq-go/pkg/moqt/message" ) @@ -42,32 +41,15 @@ func (t *TrackStatusRequest) Update(ctx context.Context, params message.Paramete // open (the caller may send REQUEST_UPDATE via [TrackStatusRequest.Update]) and // whose OK holds the parsed TRACK_STATUS_OK. On REQUEST_ERROR the stream is // closed and a *RequestRejectedError is returned. -// -//nolint:dupl // distinct §10.14 op; shares the mandated open/await-OK skeleton with Fetch (§10.12) but differs in message type and TRACK_STATUS_OK property validation. func (s *Session) TrackStatus(ctx context.Context, m *message.TrackStatus) (*TrackStatusRequest, error) { - stream, err := s.openAllocRequest(m) - if err != nil { - return nil, err - } - resp, err := s.readResponse(ctx, stream) - if err != nil { - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: read TRACK_STATUS response: %w", err) - } - switch r := resp.(type) { - case *message.RequestOK: - // §2.5.1: reject tracks with unknown mandatory track properties. - // TRACK_STATUS_OK carries the same Track Properties as SUBSCRIBE_OK. - if err := s.validateTrackProperties(r.TrackProperties, "TRACK_STATUS_OK"); err != nil { - _ = stream.Close() - return nil, err - } - return &TrackStatusRequest{Stream: stream, OK: r, s: s, requestID: m.RequestID}, nil - case *message.RequestError: - _ = stream.Close() - return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason} - default: - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: unexpected %s in TRACK_STATUS response", resp.Type()) - } + return awaitRequestResponse(ctx, s, m, + func(stream Stream, ok *message.RequestOK) (*TrackStatusRequest, error) { + // §2.5.1: reject tracks with unknown mandatory track properties. + // TRACK_STATUS_OK carries the same Track Properties as SUBSCRIBE_OK. + if err := s.validateTrackProperties(ok.TrackProperties, "TRACK_STATUS_OK"); err != nil { + _ = stream.Close() + return nil, err + } + return &TrackStatusRequest{Stream: stream, OK: ok, s: s, requestID: m.RequestID}, nil + }) } From a58acd7fc5ba7d5a4c41b9baeec85cee4fde3797 Mon Sep 17 00:00:00 2001 From: Vsevolod Strukchinsky Date: Mon, 29 Jun 2026 00:31:08 +0500 Subject: [PATCH 2/2] refactor(session): fold Publish into the generic request helper Publish shares the same open / await-OK / error-mapping skeleton as the other six openers; it differed only by pre-allocating its Track Alias before opening (and OpenPublish is just openAllocRequest, which the helper already calls). Route it through awaitRequestResponse too, keeping the pre-open alias allocation in Publish itself. No behavior change. Co-Authored-By: Claude Opus 4.8 --- pkg/moqt/session/publish.go | 23 ++++------------------- pkg/moqt/session/request.go | 9 +++++---- 2 files changed, 9 insertions(+), 23 deletions(-) diff --git a/pkg/moqt/session/publish.go b/pkg/moqt/session/publish.go index 9ffb77f..a6f329d 100644 --- a/pkg/moqt/session/publish.go +++ b/pkg/moqt/session/publish.go @@ -89,25 +89,10 @@ func (s *Session) Publish(ctx context.Context, m *message.Publish) (*Publication if m.TrackAlias == 0 { m.TrackAlias = s.AllocOutboundTrackAlias() } - stream, err := s.OpenPublish(m) - if err != nil { - return nil, err - } - resp, err := s.readResponse(ctx, stream) - if err != nil { - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: read PUBLISH response: %w", err) - } - switch r := resp.(type) { - case *message.RequestOK: - return &Publication{Stream: stream, s: s, alias: m.TrackAlias}, nil - case *message.RequestError: - _ = stream.Close() - return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason} - default: - _ = stream.Close() - return nil, fmt.Errorf("moqt/session: unexpected %s in PUBLISH response", resp.Type()) - } + return awaitRequestResponse(ctx, s, m, + func(stream Stream, _ *message.RequestOK) (*Publication, error) { + return &Publication{Stream: stream, s: s, alias: m.TrackAlias}, nil + }) } // OpenPublish opens a PUBLISH request stream (§10.10) without blocking on diff --git a/pkg/moqt/session/request.go b/pkg/moqt/session/request.go index b51f299..f532756 100644 --- a/pkg/moqt/session/request.go +++ b/pkg/moqt/session/request.go @@ -276,10 +276,11 @@ func (s *Session) readResponse(ctx context.Context, stream Stream) (message.Mess // - any other message is an unexpected-response error and the stream is closed. // // Error messages name the operation via m.Type() (e.g. "SUBSCRIBE"). It is the -// single primitive beneath [Session.Subscribe], [Session.Fetch], -// [Session.TrackStatus], and the three namespace request openers, which share -// this §10.1 open / await-OK skeleton and differ only in OK type and success -// handling. +// single primitive beneath [Session.Publish], [Session.Subscribe], +// [Session.Fetch], [Session.TrackStatus], and the three namespace request +// openers, which share this §10.1 open / await-OK skeleton and differ only in +// OK type and success handling (Publish additionally pre-allocates its Track +// Alias before the open). func awaitRequestResponse[OK message.Message, R any]( ctx context.Context, s *Session,