From 11f45f3ca656ddaae0d352b760b0ef1475bf9335 Mon Sep 17 00:00:00 2001 From: Vsevolod Strukchinsky Date: Mon, 29 Jun 2026 17:26:24 +0500 Subject: [PATCH] refactor(session): drop openStreamWith prepare callback openStreamWith existed only to slot Request-ID allocation between OpenStream and Marshal via a parameterless prepare func() hook. With just two callers (OpenRequest, openAllocRequest), the callback was pure indirection. Inline the open in each opener, allocate the ID as a plain statement after a successful open, and share the marshal/reset tail via writeFirst. Co-Authored-By: Claude Opus 4.8 --- pkg/moqt/session/request.go | 42 ++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/pkg/moqt/session/request.go b/pkg/moqt/session/request.go index ce0040e6..96f2a160 100644 --- a/pkg/moqt/session/request.go +++ b/pkg/moqt/session/request.go @@ -207,23 +207,33 @@ func rejectStreamWithError(stream Stream, code moqt.RequestErrorCode, reason str // On any error before the stream is established and the first message // written, the stream (if any) is reset and the error is returned. func (s *Session) OpenRequest(first message.Message) (Stream, error) { - return s.openStreamWith(first, nil) + stream, err := s.conn.OpenStream() + if err != nil { + return nil, err + } + return writeFirst(stream, first) } -// openStreamWith opens a new outbound bidirectional stream, runs prepare (if -// non-nil) now that the open has succeeded, then writes first as the stream's -// initial message. prepare is the hook where a fresh Request ID is assigned, so -// a failed open (e.g. ErrNoStreamCredit) consumes no ID — the §10.1 sequence -// stays untouched. On a write failure the stream is reset and the error -// returned. -func (s *Session) openStreamWith(first message.Message, prepare func()) (Stream, error) { +// openAllocRequest opens a request stream for m and assigns m a freshly +// allocated Request ID (§10.1) only after the open succeeds — so a failed open +// (e.g. ErrNoStreamCredit) consumes no ID and the §10.1 sequence stays +// untouched — then writes it as the stream's first message. It does NOT await +// the peer's response — the caller owns the read side. It is the single +// primitive beneath every typed request opener (Publish, Subscribe, Fetch, +// TrackStatus, the namespace requests) and the non-blocking +// [Session.OpenPublish] used for relay fan-out. +func (s *Session) openAllocRequest(m message.WithRequestID) (Stream, error) { stream, err := s.conn.OpenStream() if err != nil { return nil, err } - if prepare != nil { - prepare() - } + m.SetRequestID(s.AllocRequestID()) + return writeFirst(stream, m) +} + +// writeFirst marshals first as the initial message of a freshly opened request +// stream. On a write failure the stream is reset and the error is returned. +func writeFirst(stream Stream, first message.Message) (Stream, error) { if err := message.Marshal(stream, first); err != nil { resetStream(stream) return nil, fmt.Errorf("moqt/session: write request first message: %w", err) @@ -231,16 +241,6 @@ func (s *Session) openStreamWith(first message.Message, prepare func()) (Stream, return stream, nil } -// openAllocRequest opens a request stream for m and assigns m a freshly -// allocated Request ID (§10.1) only after the open succeeds, then writes it as -// the stream's first message. It does NOT await the peer's response — the -// caller owns the read side. It is the single primitive beneath every typed -// request opener (Publish, Subscribe, Fetch, TrackStatus, the namespace -// requests) and the non-blocking [Session.OpenPublish] used for relay fan-out. -func (s *Session) openAllocRequest(m message.WithRequestID) (Stream, error) { - return s.openStreamWith(m, func() { m.SetRequestID(s.AllocRequestID()) }) -} - // readResponse parses one message from stream, honoring ctx. message.Parse // reads from a context-free io.Reader, so cancellation is bridged by resetting // the stream's read side with StreamResetCancelled (§3.3.3), which unblocks the