Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ grouped here by the file they live in:
| End a publication | `Example_endingAPublication` |
| Stream exhaustion (PUBLISH_BLOCKED) | `ExampleSession_OpenPublish`, `ExampleSession_ReadPublishBlocked` |
| Announce / discover namespaces | `ExampleSession_PublishNamespace`, `ExampleSession_SubscribeNamespace` |
| Accept requests (server side) | `ExampleSession_AcceptRequest` |
| Accept requests + reply (`Accept*` helpers) | `ExampleSession_AcceptRequest` |
| Graceful shutdown (GOAWAY) | `ExampleSession_SendGoaway`, `ExampleSession_OnGoaway` |

**[`relay`](pkg/relay/example_test.go)**
Expand Down
230 changes: 230 additions & 0 deletions pkg/moqt/session/accept_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package session_test

import (
"testing"

"github.com/floatdrop/moq-go/pkg/moqt/message"
"github.com/floatdrop/moq-go/pkg/moqt/session"
"github.com/floatdrop/moq-go/pkg/moqt/wire"
)

// TestAcceptFetchRepliesFetchOK checks AcceptFetch writes FETCH_OK with the
// supplied fields and that the responder's OpenFetchStream binds this fetch's
// Request ID onto the FETCH_HEADER uni-stream automatically.
//
// The server side runs in a goroutine and the client reads on the main
// goroutine in send order (FETCH_OK, then the FETCH_HEADER stream): the
// sessiontest streams are synchronous pipes, so a writer needs a concurrent
// reader.
func TestAcceptFetchRepliesFetchOK(t *testing.T) {
t.Parallel()
client, server := openPair(t)
ctx := t.Context()

fetchMsg := &message.Fetch{
RequestID: client.AllocRequestID(),
FetchType: message.FetchTypeStandalone,
Standalone: &message.StandaloneFetch{
Namespace: wire.Namespace("ns"),
Name: []byte("clip"),
},
}

srvErr := make(chan error, 1)
go func() {
srvErr <- func() error {
req, err := server.AcceptRequest(ctx)
if err != nil {
return err
}
resp, err := req.AcceptFetch(&message.FetchOK{
EndOfTrack: true,
EndLocation: message.Location{Group: 5, Object: 9},
})
if err != nil {
return err
}
fs, err := resp.OpenFetchStream() // Request ID bound automatically
if err != nil {
return err
}
return fs.Close()
}()
}()

fr, err := client.Fetch(ctx, fetchMsg)
if err != nil {
t.Fatalf("client Fetch: %v", err)
}
if !fr.OK.EndOfTrack || fr.OK.EndLocation != (message.Location{Group: 5, Object: 9}) {
t.Errorf("FETCH_OK = %+v, want EndOfTrack and EndLocation {5,9}", fr.OK)
}

ds, err := client.AcceptDataStream(ctx)
if err != nil {
t.Fatalf("AcceptDataStream: %v", err)
}
fin, ok := ds.(*session.IncomingFetchStream)
if !ok {
t.Fatalf("got %T, want *IncomingFetchStream", ds)
}
if fin.Header.RequestID != fetchMsg.RequestID {
t.Errorf("FETCH_HEADER RequestID = %d, want %d (bound by the responder)",
fin.Header.RequestID, fetchMsg.RequestID)
}
if err := <-srvErr; err != nil {
t.Fatalf("server: %v", err)
}
}

// TestAcceptTrackStatusRepliesOK checks AcceptTrackStatus replies TRACK_STATUS_OK
// (a REQUEST_OK) carrying the supplied Track Properties.
func TestAcceptTrackStatusRepliesOK(t *testing.T) {
t.Parallel()
client, server := openPair(t)

ts := &message.TrackStatus{
RequestID: client.AllocRequestID(),
Namespace: wire.Namespace("ns"),
Name: []byte("t"),
}
_, resp := runRequestRoundTrip(t, client, server, ts, func(r *session.Request) error {
return r.AcceptTrackStatus(&message.TrackStatusOK{})
})
if _, ok := resp.(*message.RequestOK); !ok {
t.Fatalf("client got %s, want REQUEST_OK (TRACK_STATUS_OK)", resp.Type())
}
}

// TestAcceptNamespaceRequestsReplyOK checks the three namespace-request accept
// helpers each reply REQUEST_OK and return a handle.
func TestAcceptNamespaceRequestsReplyOK(t *testing.T) {
t.Parallel()

t.Run("PublishNamespace", func(t *testing.T) {
t.Parallel()
client, server := openPair(t)
m := &message.PublishNamespace{RequestID: client.AllocRequestID(), Namespace: wire.Namespace("ns")}
_, resp := runRequestRoundTrip(t, client, server, m, func(r *session.Request) error {
_, err := r.AcceptPublishNamespace()
return err
})
if _, ok := resp.(*message.RequestOK); !ok {
t.Fatalf("got %s, want REQUEST_OK", resp.Type())
}
})

t.Run("SubscribeNamespace", func(t *testing.T) {
t.Parallel()
client, server := openPair(t)
m := &message.SubscribeNamespace{RequestID: client.AllocRequestID(), TrackNamespacePrefix: wire.Namespace("ns")}
_, resp := runRequestRoundTrip(t, client, server, m, func(r *session.Request) error {
_, err := r.AcceptSubscribeNamespace()
return err
})
if _, ok := resp.(*message.RequestOK); !ok {
t.Fatalf("got %s, want REQUEST_OK", resp.Type())
}
})

t.Run("SubscribeTracks", func(t *testing.T) {
t.Parallel()
client, server := openPair(t)
m := &message.SubscribeTracks{RequestID: client.AllocRequestID(), TrackNamespacePrefix: wire.Namespace("ns")}
_, resp := runRequestRoundTrip(t, client, server, m, func(r *session.Request) error {
_, err := r.AcceptSubscribeTracks()
return err
})
if _, ok := resp.(*message.RequestOK); !ok {
t.Fatalf("got %s, want REQUEST_OK", resp.Type())
}
})
}

// TestAcceptSubscribeTracksWritePublishBlocked checks the publisher-side
// WritePublishBlocked round-trips to the subscriber's ReadPublishBlocked. The
// server side runs in a goroutine so its REQUEST_OK and PUBLISH_BLOCKED writes
// have the client's concurrent reads (SubscribeTracks, ReadPublishBlocked) to
// drain the synchronous sessiontest pipes.
func TestAcceptSubscribeTracksWritePublishBlocked(t *testing.T) {
t.Parallel()
client, server := openPair(t)
ctx := t.Context()

srvErr := make(chan error, 1)
go func() {
srvErr <- func() error {
req, err := server.AcceptRequest(ctx)
if err != nil {
return err
}
pub, err := req.AcceptSubscribeTracks()
if err != nil {
return err
}
return pub.WritePublishBlocked(&message.PublishBlocked{
TrackNamespaceSuffix: wire.Namespace("ns"),
TrackName: []byte("blocked-track"),
})
}()
}()

ts, err := client.SubscribeTracks(ctx, &message.SubscribeTracks{
TrackNamespacePrefix: wire.Namespace("ns"),
})
if err != nil {
t.Fatalf("client SubscribeTracks: %v", err)
}
defer ts.Close()

pb, err := ts.ReadPublishBlocked()
if err != nil {
t.Fatalf("ReadPublishBlocked: %v", err)
}
if string(pb.TrackName) != "blocked-track" {
t.Errorf("PUBLISH_BLOCKED TrackName = %q, want %q", pb.TrackName, "blocked-track")
}
if err := <-srvErr; err != nil {
t.Fatalf("server: %v", err)
}
}

// TestAcceptPublishReturnsHandle checks AcceptPublish replies REQUEST_OK and
// returns a handle reporting the publisher-assigned Track Alias.
func TestAcceptPublishReturnsHandle(t *testing.T) {
t.Parallel()
client, server := openPair(t)
ctx := t.Context()

type result struct {
pub *session.Publication
err error
}
done := make(chan result, 1)
go func() {
pub, err := client.Publish(ctx, &message.Publish{
Namespace: wire.Namespace("ns"),
Name: []byte("track"),
TrackAlias: 11,
})
done <- result{pub, err}
}()

req, err := server.AcceptRequest(ctx)
if err != nil {
t.Fatalf("AcceptRequest: %v", err)
}
recv, err := req.AcceptPublish()
if err != nil {
t.Fatalf("AcceptPublish: %v", err)
}
if recv.TrackAlias() != 11 {
t.Errorf("IncomingPublication.TrackAlias = %d, want 11", recv.TrackAlias())
}

got := <-done
if got.err != nil {
t.Fatalf("client Publish: %v", got.err)
}
_ = got.pub
}
42 changes: 40 additions & 2 deletions pkg/moqt/session/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,47 @@ func ExampleSession_AcceptRequest() {
case *message.Publish:
// AcceptPublish registers the Track Alias (§11.1) and replies
// REQUEST_OK; objects arrive via Session.AcceptDataStream.
_ = req.AcceptPublish()
recv, err := req.AcceptPublish()
if err != nil {
return
}
_ = recv // recv.TrackAlias() / recv.Update(...) for follow-ups.
case *message.Fetch:
_ = req.RejectError(moqt.RequestDoesNotExist, "no such range")
// AcceptFetch replies FETCH_OK and returns a responder whose
// OpenFetchStream is pre-bound to this fetch's Request ID.
resp, err := req.AcceptFetch(nil)
if err != nil {
return
}
_ = resp // resp.OpenFetchStream() to stream the response objects.
case *message.TrackStatus:
// AcceptTrackStatus replies TRACK_STATUS_OK. It is a one-shot status
// query, so (unlike the others) it returns no handle.
_ = req.AcceptTrackStatus(nil)
case *message.PublishNamespace:
// AcceptPublishNamespace replies REQUEST_OK; NAMESPACE / NAMESPACE_DONE
// follow-ups arrive by reading the returned handle (e.g. message.Parse).
ann, err := req.AcceptPublishNamespace()
if err != nil {
return
}
_ = ann
case *message.SubscribeNamespace:
// AcceptSubscribeNamespace replies REQUEST_OK; announce matching
// namespaces by writing NAMESPACE / NAMESPACE_DONE to the handle.
sub, err := req.AcceptSubscribeNamespace()
if err != nil {
return
}
_ = sub
case *message.SubscribeTracks:
// AcceptSubscribeTracks replies REQUEST_OK; forward matching tracks as
// PUBLISHes and signal stream exhaustion with WritePublishBlocked.
tracks, err := req.AcceptSubscribeTracks()
if err != nil {
return
}
_ = tracks // tracks.WritePublishBlocked(...) on §6.1 stream exhaustion.
}
}
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/moqt/session/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,52 @@ func (s *Session) Fetch(ctx context.Context, m *message.Fetch) (*FetchRequest, e
})
}

// FetchResponder is the publisher side of a FETCH (§10.12) this endpoint
// accepted via [Request.AcceptFetch] — the accept-side counterpart of
// [Session.Fetch]. FETCH_OK has already been written on the embedded request
// stream; the response objects are streamed on a separate FETCH_HEADER
// uni-stream (§11.5) opened via [FetchResponder.OpenFetchStream], which binds
// this fetch's Request ID automatically. The embedded request stream stays open
// for REQUEST_UPDATE follow-ups.
type FetchResponder struct {
// Stream is the FETCH request stream, still open for REQUEST_UPDATE
// follow-ups. Close it to end the fetch.
Stream

s *Session
requestID uint64
}

// OpenFetchStream opens the outbound FETCH_HEADER uni-stream (§11.5) carrying
// this fetch's response objects, with the Request ID bound automatically. The
// caller MUST Close the returned stream to FIN it once all objects are written,
// or Cancel to reset. It is [Session.OpenFetchStream] pre-bound to this fetch.
func (f *FetchResponder) OpenFetchStream() (*OutgoingFetchStream, error) {
return f.s.OpenFetchStream(message.FetchHeader{RequestID: f.requestID})
}

// AcceptFetch accepts an inbound FETCH (§10.12) and returns a [FetchResponder]
// for streaming the response objects — the accept-side counterpart of
// [Session.Fetch]. r.First MUST be a *message.Fetch.
//
// ok carries the FETCH_OK fields the caller wants to set (EndOfTrack,
// EndLocation, negotiated Parameters, TrackProperties); it may be nil for the
// all-default reply. AcceptFetch writes FETCH_OK and returns a responder whose
// [FetchResponder.OpenFetchStream] is pre-bound to this fetch's Request ID.
func (r *Request) AcceptFetch(ok *message.FetchOK) (*FetchResponder, error) {
f, isFetch := r.First.(*message.Fetch)
if !isFetch {
return nil, fmt.Errorf("moqt/session: AcceptFetch on a %s request", r.First.Type())
}
if ok == nil {
ok = &message.FetchOK{}
}
if err := message.Marshal(r.Stream, ok); err != nil {
return nil, fmt.Errorf("moqt/session: write FETCH_OK: %w", err)
}
return &FetchResponder{Stream: r.Stream, s: r.s, requestID: f.RequestID}, nil
}

// OpenFetchStream opens an outbound FETCH_HEADER uni-stream (§11.5),
// writes the header (Type + Request ID), and returns the body writer. The
// caller MUST Close to FIN the stream once all fetch objects have been
Expand Down
Loading