Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 0 additions & 115 deletions pkg/moqt/session/pubsub.go → pkg/moqt/session/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/floatdrop/moq-go/pkg/moqt"
"github.com/floatdrop/moq-go/pkg/moqt/message"
"github.com/floatdrop/moq-go/pkg/moqt/track"
)

// Publication is a live track this side publishes objects on. It owns the
Expand Down Expand Up @@ -128,117 +127,3 @@ func (s *Session) Publish(ctx context.Context, m *message.Publish) (*Publication
func (s *Session) OpenPublish(m *message.Publish) (Stream, error) {
return s.openAllocRequest(m)
}

// Subscription is a live subscriber-initiated track subscription. It owns the
// request stream (embedded, so Close / reads / message.Marshal work directly
// on it) plus the identifiers follow-up traffic needs — the Request ID and the
// publisher-assigned Track Alias — so the caller can send REQUEST_UPDATE via
// [Subscription.Update] without holding them separately. It is returned by
// [Session.Subscribe].
type Subscription struct {
// Stream is the SUBSCRIBE request stream, still open for follow-up
// traffic: REQUEST_UPDATE and inbound PUBLISH_DONE. Close it to end the
// subscription.
Stream

// OK is the parsed SUBSCRIBE_OK response — the publisher-assigned Track
// Alias, negotiated Parameters, and TrackProperties.
OK *message.SubscribeOK

s *Session
requestID uint64
}

// TrackAlias reports the §11.1 Track Alias the publisher assigned to this
// subscription — the integer inbound subgroup and datagram streams carry to
// identify the track (see [Session.AcceptDataStream]). It is shorthand for
// sub.OK.TrackAlias.
func (sub *Subscription) TrackAlias() uint64 { return sub.OK.TrackAlias }

// Update sends a REQUEST_UPDATE (§10.9) on the subscription stream and awaits
// the single REQUEST_OK / REQUEST_ERROR the spec mandates. params carries only
// the fields to change; any parameter omitted keeps its prior value on the
// peer. It is [Session.UpdateRequest] with this subscription's stream and
// Request ID filled in.
func (sub *Subscription) Update(ctx context.Context, params message.Parameters) (*message.RequestOK, error) {
return sub.s.UpdateRequest(ctx, sub.Stream, sub.requestID, params)
}

// Subscribe opens a SUBSCRIBE request stream (§10.7) and awaits SUBSCRIBE_OK.
// The session assigns m.RequestID; the caller supplies the rest. On success a
// [Subscription] is returned whose embedded stream stays open for follow-up
// traffic (REQUEST_UPDATE via [Subscription.Update], inbound PUBLISH_DONE) and
// whose [Subscription.TrackAlias] matches the alias on inbound subgroup
// streams. REQUEST_ERROR is surfaced as a *RequestRejectedError and the stream
// is closed.
func (s *Session) Subscribe(ctx context.Context, m *message.Subscribe) (*Subscription, error) {
stream, err := s.openAllocRequest(m)
if err != nil {
return nil, err
}
resp, err := s.readResponse(ctx, stream)
if err != nil {
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: read SUBSCRIBE response: %w", err)
}
switch r := resp.(type) {
case *message.SubscribeOK:
// §2.5.1: reject tracks with unknown mandatory track properties.
if err := s.validateTrackProperties(r.TrackProperties, "SUBSCRIBE_OK"); err != nil {
_ = stream.Close()
return nil, err
}
// §11.1: register the alias the publisher assigned so we can detect
// DUPLICATE_TRACK_ALIAS if the same alias is reused for a different track.
key := track.NewKey(m.Namespace, m.Name)
if err := s.RegisterInboundTrackAlias(r.TrackAlias, key); err != nil {
_ = stream.Close()
return nil, err
}
return &Subscription{Stream: stream, OK: r, s: s, requestID: m.RequestID}, nil
case *message.RequestError:
_ = stream.Close()
return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason}
default:
_ = stream.Close()
return nil, fmt.Errorf("moqt/session: unexpected %s in SUBSCRIBE response", resp.Type())
}
}

// UpdateRequest sends a REQUEST_UPDATE (§10.9) on an already-established
// request stream and awaits the single REQUEST_OK / REQUEST_ERROR the spec
// mandates in response. requestID MUST be the Request ID of the original
// request the stream carries — REQUEST_UPDATE rides the original bidi stream
// and does NOT consume a new Request ID. params carries only the fields the
// caller wants to change; any parameter omitted keeps its prior value on the
// peer (§10.9).
//
// On REQUEST_OK the parsed message is returned and the stream is left open
// for further traffic. REQUEST_ERROR is surfaced as a *RequestRejectedError;
// the stream is left open so the caller can decide how to tear down (a failed
// subscription update is followed by PUBLISH_DONE from the publisher, §10.9).
func (s *Session) UpdateRequest(
ctx context.Context,
stream Stream,
requestID uint64,
params message.Parameters,
) (*message.RequestOK, error) {
if err := message.Marshal(stream, &message.RequestUpdate{
RequestID: requestID,
Parameters: params,
}); err != nil {
return nil, fmt.Errorf("moqt/session: write REQUEST_UPDATE: %w", err)
}
resp, err := s.readResponse(ctx, stream)
if err != nil {
return nil, fmt.Errorf("moqt/session: read REQUEST_UPDATE response: %w", err)
}
switch r := resp.(type) {
case *message.RequestOK:
return r, nil
case *message.RequestError:
return nil, &RequestRejectedError{Code: r.ErrorCode, Reason: r.ErrorReason}
default:
return nil, fmt.Errorf("moqt/session: unexpected %s in REQUEST_UPDATE response", resp.Type())
}
}
Loading