Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 9 additions & 26 deletions pkg/moqt/session/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,33 +48,16 @@ func (f *FetchRequest) Update(ctx context.Context, params message.Parameters) (*
//
// On REQUEST_ERROR the stream is closed and a *RequestRejectedError is
// returned.
//
//nolint:dupl // distinct §10.12 op; shares the mandated open/await-OK skeleton with TrackStatus (§10.14) but differs in message type and FETCH_OK handling.
func (s *Session) Fetch(ctx context.Context, m *message.Fetch) (*FetchRequest, error) {
stream, err := s.openAllocRequest(m)
if err != nil {
return nil, err
}
resp, err := s.readResponse(ctx, stream)
if err != nil {
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: read FETCH response: %w", err)
}
switch r := resp.(type) {
case *message.FetchOK:
// §2.5.1: reject tracks with unknown mandatory track properties.
if err := s.validateTrackProperties(r.TrackProperties, "FETCH_OK"); err != nil {
_ = stream.Close()
return nil, err
}
return &FetchRequest{Stream: stream, OK: r, s: s, requestID: m.RequestID}, nil
case *message.RequestError:
_ = stream.Close()
return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason}
default:
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: unexpected %s in FETCH response", resp.Type())
}
return awaitRequestResponse(ctx, s, m,
func(stream Stream, ok *message.FetchOK) (*FetchRequest, error) {
// §2.5.1: reject tracks with unknown mandatory track properties.
if err := s.validateTrackProperties(ok.TrackProperties, "FETCH_OK"); err != nil {
_ = stream.Close()
return nil, err
}
return &FetchRequest{Stream: stream, OK: ok, s: s, requestID: m.RequestID}, nil
})
}

// OpenFetchStream opens an outbound FETCH_HEADER uni-stream (§11.5),
Expand Down
69 changes: 12 additions & 57 deletions pkg/moqt/session/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,10 @@ func (s *Session) PublishNamespace(
ctx context.Context,
m *message.PublishNamespace,
) (*NamespacePublication, error) {
stream, err := s.openAllocRequest(m)
if err != nil {
return nil, err
}
resp, err := s.readResponse(ctx, stream)
if err != nil {
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: read PUBLISH_NAMESPACE response: %w", err)
}
switch r := resp.(type) {
case *message.RequestOK:
return &NamespacePublication{Stream: stream, OK: r}, nil
case *message.RequestError:
_ = stream.Close()
return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason}
default:
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: unexpected %s in PUBLISH_NAMESPACE response", resp.Type())
}
return awaitRequestResponse(ctx, s, m,
func(stream Stream, ok *message.RequestOK) (*NamespacePublication, error) {
return &NamespacePublication{Stream: stream, OK: ok}, nil
})
}

// SubscribeNamespace opens a SUBSCRIBE_NAMESPACE request stream (§10.18) and
Expand All @@ -89,25 +74,10 @@ func (s *Session) SubscribeNamespace(
ctx context.Context,
m *message.SubscribeNamespace,
) (*NamespaceSubscription, error) {
stream, err := s.openAllocRequest(m)
if err != nil {
return nil, err
}
resp, err := s.readResponse(ctx, stream)
if err != nil {
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: read SUBSCRIBE_NAMESPACE response: %w", err)
}
switch r := resp.(type) {
case *message.RequestOK:
return &NamespaceSubscription{Stream: stream, OK: r}, nil
case *message.RequestError:
_ = stream.Close()
return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason}
default:
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: unexpected %s in SUBSCRIBE_NAMESPACE response", resp.Type())
}
return awaitRequestResponse(ctx, s, m,
func(stream Stream, ok *message.RequestOK) (*NamespaceSubscription, error) {
return &NamespaceSubscription{Stream: stream, OK: ok}, nil
})
}

// SubscribeTracks opens a SUBSCRIBE_TRACKS request stream (§10.19) and awaits
Expand All @@ -118,25 +88,10 @@ func (s *Session) SubscribeNamespace(
// for PUBLISH_BLOCKED follow-ups (read via [TrackSubscription.ReadPublishBlocked]).
// On REQUEST_ERROR the stream is closed and a *RequestRejectedError is returned.
func (s *Session) SubscribeTracks(ctx context.Context, m *message.SubscribeTracks) (*TrackSubscription, error) {
stream, err := s.openAllocRequest(m)
if err != nil {
return nil, err
}
resp, err := s.readResponse(ctx, stream)
if err != nil {
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: read SUBSCRIBE_TRACKS response: %w", err)
}
switch r := resp.(type) {
case *message.RequestOK:
return &TrackSubscription{Stream: stream, OK: r}, nil
case *message.RequestError:
_ = stream.Close()
return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason}
default:
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: unexpected %s in SUBSCRIBE_TRACKS response", resp.Type())
}
return awaitRequestResponse(ctx, s, m,
func(stream Stream, ok *message.RequestOK) (*TrackSubscription, error) {
return &TrackSubscription{Stream: stream, OK: ok}, nil
})
}

// ReadPublishBlocked reads the next follow-up message on this SUBSCRIBE_TRACKS
Expand Down
23 changes: 4 additions & 19 deletions pkg/moqt/session/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,10 @@ func (s *Session) Publish(ctx context.Context, m *message.Publish) (*Publication
if m.TrackAlias == 0 {
m.TrackAlias = s.AllocOutboundTrackAlias()
}
stream, err := s.OpenPublish(m)
if err != nil {
return nil, err
}
resp, err := s.readResponse(ctx, stream)
if err != nil {
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: read PUBLISH response: %w", err)
}
switch r := resp.(type) {
case *message.RequestOK:
return &Publication{Stream: stream, s: s, alias: m.TrackAlias}, nil
case *message.RequestError:
_ = stream.Close()
return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason}
default:
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: unexpected %s in PUBLISH response", resp.Type())
}
return awaitRequestResponse(ctx, s, m,
func(stream Stream, _ *message.RequestOK) (*Publication, error) {
return &Publication{Stream: stream, s: s, alias: m.TrackAlias}, nil
})
}

// OpenPublish opens a PUBLISH request stream (§10.10) without blocking on
Expand Down
43 changes: 43 additions & 0 deletions pkg/moqt/session/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,49 @@ func (s *Session) readResponse(ctx context.Context, stream Stream) (message.Mess
return msg, err
}

// awaitRequestResponse opens a request stream for m (allocating its Request ID
// only after the open succeeds — see [Session.openAllocRequest]), awaits the
// peer's initial response, and dispatches it:
//
// - the expected success type OK is handed to onOK, which owns the still-open
// stream from that point: it wraps the stream in the typed handle, or closes
// it and returns an error (e.g. on Track Property validation failure);
// - REQUEST_ERROR (§10.5) is surfaced as a *RequestRejectedError and the
// stream is closed;
// - any other message is an unexpected-response error and the stream is closed.
//
// Error messages name the operation via m.Type() (e.g. "SUBSCRIBE"). It is the
// single primitive beneath [Session.Publish], [Session.Subscribe],
// [Session.Fetch], [Session.TrackStatus], and the three namespace request
// openers, which share this §10.1 open / await-OK skeleton and differ only in
// OK type and success handling (Publish additionally pre-allocates its Track
// Alias before the open).
func awaitRequestResponse[OK message.Message, R any](
ctx context.Context,
s *Session,
m message.WithRequestID,
onOK func(stream Stream, ok OK) (R, error),
) (R, error) {
var zero R
stream, err := s.openAllocRequest(m)
if err != nil {
return zero, err
}
resp, err := s.readResponse(ctx, stream)
if err != nil {
_ = stream.Close()
return zero, fmt.Errorf("moqt/session: read %s response: %w", m.Type(), err)
}
if ok, isOK := resp.(OK); isOK {
return onOK(stream, ok)
}
_ = stream.Close()
if rerr, isErr := resp.(*message.RequestError); isErr {
return zero, &RequestRejectedError{Code: rerr.ErrorCode, Reason: rerr.ErrorReason}
}
return zero, fmt.Errorf("moqt/session: unexpected %s in %s response", resp.Type(), m.Type())
}

// UpdateRequest sends a REQUEST_UPDATE (§10.9) on an already-established
// request stream and awaits the single REQUEST_OK / REQUEST_ERROR the spec
// mandates in response. requestID MUST be the Request ID of the original
Expand Down
48 changes: 16 additions & 32 deletions pkg/moqt/session/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package session

import (
"context"
"fmt"

"github.com/floatdrop/moq-go/pkg/moqt/message"
"github.com/floatdrop/moq-go/pkg/moqt/track"
Expand Down Expand Up @@ -51,35 +50,20 @@ func (sub *Subscription) Update(ctx context.Context, params message.Parameters)
// streams. REQUEST_ERROR is surfaced as a *RequestRejectedError and the stream
// is closed.
func (s *Session) Subscribe(ctx context.Context, m *message.Subscribe) (*Subscription, error) {
stream, err := s.openAllocRequest(m)
if err != nil {
return nil, err
}
resp, err := s.readResponse(ctx, stream)
if err != nil {
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: read SUBSCRIBE response: %w", err)
}
switch r := resp.(type) {
case *message.SubscribeOK:
// §2.5.1: reject tracks with unknown mandatory track properties.
if err := s.validateTrackProperties(r.TrackProperties, "SUBSCRIBE_OK"); err != nil {
_ = stream.Close()
return nil, err
}
// §11.1: register the alias the publisher assigned so we can detect
// DUPLICATE_TRACK_ALIAS if the same alias is reused for a different track.
key := track.NewKey(m.Namespace, m.Name)
if err := s.RegisterInboundTrackAlias(r.TrackAlias, key); err != nil {
_ = stream.Close()
return nil, err
}
return &Subscription{Stream: stream, OK: r, s: s, requestID: m.RequestID}, nil
case *message.RequestError:
_ = stream.Close()
return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason}
default:
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: unexpected %s in SUBSCRIBE response", resp.Type())
}
return awaitRequestResponse(ctx, s, m,
func(stream Stream, ok *message.SubscribeOK) (*Subscription, error) {
// §2.5.1: reject tracks with unknown mandatory track properties.
if err := s.validateTrackProperties(ok.TrackProperties, "SUBSCRIBE_OK"); err != nil {
_ = stream.Close()
return nil, err
}
// §11.1: register the alias the publisher assigned so we can detect
// DUPLICATE_TRACK_ALIAS if the same alias is reused for a different track.
key := track.NewKey(m.Namespace, m.Name)
if err := s.RegisterInboundTrackAlias(ok.TrackAlias, key); err != nil {
_ = stream.Close()
return nil, err
}
return &Subscription{Stream: stream, OK: ok, s: s, requestID: m.RequestID}, nil
})
}
38 changes: 10 additions & 28 deletions pkg/moqt/session/track_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package session

import (
"context"
"fmt"

"github.com/floatdrop/moq-go/pkg/moqt/message"
)
Expand Down Expand Up @@ -42,32 +41,15 @@ func (t *TrackStatusRequest) Update(ctx context.Context, params message.Paramete
// open (the caller may send REQUEST_UPDATE via [TrackStatusRequest.Update]) and
// whose OK holds the parsed TRACK_STATUS_OK. On REQUEST_ERROR the stream is
// closed and a *RequestRejectedError is returned.
//
//nolint:dupl // distinct §10.14 op; shares the mandated open/await-OK skeleton with Fetch (§10.12) but differs in message type and TRACK_STATUS_OK property validation.
func (s *Session) TrackStatus(ctx context.Context, m *message.TrackStatus) (*TrackStatusRequest, error) {
stream, err := s.openAllocRequest(m)
if err != nil {
return nil, err
}
resp, err := s.readResponse(ctx, stream)
if err != nil {
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: read TRACK_STATUS response: %w", err)
}
switch r := resp.(type) {
case *message.RequestOK:
// §2.5.1: reject tracks with unknown mandatory track properties.
// TRACK_STATUS_OK carries the same Track Properties as SUBSCRIBE_OK.
if err := s.validateTrackProperties(r.TrackProperties, "TRACK_STATUS_OK"); err != nil {
_ = stream.Close()
return nil, err
}
return &TrackStatusRequest{Stream: stream, OK: r, s: s, requestID: m.RequestID}, nil
case *message.RequestError:
_ = stream.Close()
return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason}
default:
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: unexpected %s in TRACK_STATUS response", resp.Type())
}
return awaitRequestResponse(ctx, s, m,
func(stream Stream, ok *message.RequestOK) (*TrackStatusRequest, error) {
// §2.5.1: reject tracks with unknown mandatory track properties.
// TRACK_STATUS_OK carries the same Track Properties as SUBSCRIBE_OK.
if err := s.validateTrackProperties(ok.TrackProperties, "TRACK_STATUS_OK"); err != nil {
_ = stream.Close()
return nil, err
}
return &TrackStatusRequest{Stream: stream, OK: ok, s: s, requestID: m.RequestID}, nil
})
}