Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 54 additions & 56 deletions pkg/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,13 @@ type Relay struct {

// sessions tracks every Session that has completed SETUP and not yet
// been torn down. Stop iterates it under sessionsMu to broadcast GOAWAY
// and to wait for drain.
sessionsMu sync.Mutex
sessions map[*session.Session]struct{}
// and to wait for drain. shuttingDown is set (under sessionsMu, by
// beginShutdown) when Stop snapshots the set; addSession reads it under the
// same lock to decide whether a newly-registered session is a straggler
// Stop's snapshot missed.
sessionsMu sync.Mutex
sessions map[*session.Session]struct{}
shuttingDown bool

// stopOnce guards Stop so the second caller short-circuits. stopCh is
// closed by Stop to signal the accept loop to exit and to release any
Expand Down Expand Up @@ -447,44 +451,10 @@ func (r *Relay) serveSession(ctx context.Context, sess *session.Session) {
r.names.RemoveSession(sess)
}()

// Watch for Stop: if it fires while this handler is running, we send
// GOAWAY and then either wait for the session to drain or close it
// when GoawayTimeout elapses. This is BOTH the failsafe for the race
// where Stop snapshots the live-session set BEFORE addSession
// registers us AND the normal per-session drain path — Stop's
// broadcast loop above sends GOAWAY too, but SendGoaway is idempotent,
// so the second call from here is a silent no-op when we won the
// race. The wait-and-close logic in this goroutine is what gives the
// peer a real chance to migrate before we tear the conn down.
stopWatchDone := make(chan struct{})
defer func() {
close(stopWatchDone)
}()
go func() {
select {
case <-r.stopCh:
case <-sess.Done():
return
case <-stopWatchDone:
return
}
// Send GOAWAY (idempotent — Stop's broadcast may have done
// it already). Then wait for either the peer to drain or
// the configured timeout.
if r.cfg.GoawayTimeout > 0 {
_ = sess.SendGoaway(r.cfg.GoawayTimeout, "")
timer := time.NewTimer(r.cfg.GoawayTimeout)
defer timer.Stop()
select {
case <-timer.C:
case <-sess.Done():
return // peer drained cleanly, nothing else to do
case <-stopWatchDone:
return
}
}
_ = sess.Close(moqt.SessionGoawayTimeout, "relay shutdown")
}()
// Shutdown drain is owned elsewhere: Stop runs the GOAWAY / grace / close
// lifecycle for every session in the snapshot it takes under sessionsMu,
// and addSession runs it for any straggler that registered after that
// snapshot (see addSession). serveSession itself does not watch for Stop.

handler := newSessionHandler(
sess, r.log, r.tracks, r.names,
Expand Down Expand Up @@ -520,9 +490,13 @@ func (r *Relay) Stop(ctx context.Context) error {
r.upstreams.close()
}

// 2. Snapshot the session set so we can iterate without holding
// the lock while doing potentially-blocking session work.
sessions := r.snapshotSessions()
// 2. Mark shutdown in progress and snapshot the session set in one
// atomic step, so we can iterate without holding the lock while
// doing potentially-blocking session work. The atomicity partitions
// sessions cleanly: every session is either in this snapshot (its
// drain is owned by steps 3–5 below) or registered later (it observes
// shuttingDown in addSession and owns its own drain) — never both.
sessions := r.beginShutdown()

// 3. Send GOAWAY to each session if a grace period is set. A
// zero timeout means "don't bother with GOAWAY"; close
Expand Down Expand Up @@ -584,22 +558,40 @@ func (r *Relay) Stop(ctx context.Context) error {
func (r *Relay) addSession(s *session.Session) {
r.sessionsMu.Lock()
r.sessions[s] = struct{}{}
shuttingDown := r.shuttingDown
r.sessionsMu.Unlock()
r.cfg.Metrics.SessionOpened()

// Race-window cover: if Stop already closed stopCh BEFORE we got here,
// Stop's broadcast loop missed us (it snapshotted the session set
// before we registered). Send our own GOAWAY so the peer still sees
// notification before the per-handler failsafe forcibly closes us.
// SendGoaway is idempotent — calling it twice (once here, once from
// Stop) just returns "already sent" on the second call.
select {
case <-r.stopCh:
if r.cfg.GoawayTimeout > 0 {
_ = s.SendGoaway(r.cfg.GoawayTimeout, "")
// Straggler cover: if shutdown was already in progress when we registered,
// Stop's snapshot — taken under sessionsMu together with the shuttingDown
// flag (see beginShutdown) — does NOT include this session, so Stop will
// neither GOAWAY nor close it. Own that lifecycle here. When shutdown began
// after we registered, shuttingDown is false and Stop's snapshot covers us;
// exactly one owner either way. The drain runs under r.handlers so Stop's
// handlers.Wait joins it (safe: this runs inside serveSession, itself a
// tracked handler, so the WaitGroup counter is already non-zero).
if shuttingDown {
r.handlers.Go(func() { r.drainStraggler(s) })
}
}

// drainStraggler runs the GOAWAY grace + force-close lifecycle for a single
// session that registered after Stop snapshotted the live-session set, so
// Stop's bulk drain (Stop steps 3–5) does not cover it. It mirrors that bulk
// drain for one session: GOAWAY, wait for the peer to drain or the grace period
// to elapse, then force-close. Spawned by addSession only during shutdown.
func (r *Relay) drainStraggler(s *session.Session) {
if r.cfg.GoawayTimeout > 0 {
_ = s.SendGoaway(r.cfg.GoawayTimeout, "")
timer := time.NewTimer(r.cfg.GoawayTimeout)
defer timer.Stop()
select {
case <-timer.C:
case <-s.Done():
return // peer drained within the grace period
}
default:
}
_ = s.Close(moqt.SessionGoawayTimeout, "relay shutdown")
}

func (r *Relay) removeSession(s *session.Session) {
Expand All @@ -609,9 +601,15 @@ func (r *Relay) removeSession(s *session.Session) {
r.cfg.Metrics.SessionClosed()
}

func (r *Relay) snapshotSessions() []*session.Session {
// beginShutdown marks the relay as shutting down and returns a snapshot of the
// currently-registered sessions, atomically under sessionsMu. The atomicity is
// what lets addSession partition sessions into exactly two non-overlapping
// groups: those in the returned snapshot (drained by Stop) and those registered
// afterward (which see shuttingDown and drain themselves via drainStraggler).
func (r *Relay) beginShutdown() []*session.Session {
r.sessionsMu.Lock()
defer r.sessionsMu.Unlock()
r.shuttingDown = true
out := make([]*session.Session, 0, len(r.sessions))
for s := range r.sessions {
out = append(out, s)
Expand Down
84 changes: 84 additions & 0 deletions pkg/relay/shutdown_straggler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package relay

import (
"context"
"net"
"testing"
"time"

"github.com/floatdrop/moq-go/pkg/moqt/session"
"github.com/floatdrop/moq-go/pkg/moqt/session/sessiontest"
)

// stragglerListener is a do-nothing [Listener]: New requires a non-nil
// listener, but this test drives addSession directly and never calls Start.
type stragglerListener struct{}

func (stragglerListener) Accept(ctx context.Context) (session.Conn, error) {
<-ctx.Done()
return nil, ctx.Err()
}
func (stragglerListener) Addr() net.Addr { return nil }
func (stragglerListener) Close() error { return nil }

// TestRelay_addSessionDrainsStraggler pins the straggler partition that
// beginShutdown + addSession enforce. A session that registers AFTER Stop has
// snapshotted the live-session set (so the snapshot misses it) must still be
// driven through the full GOAWAY / grace / force-close lifecycle — by
// addSession's drainStraggler, since Stop's bulk drain never saw it.
//
// This is the path that the deleted per-session stopWatch goroutine used to
// cover; the test guards against a regression in the move to drainStraggler.
func TestRelay_addSessionDrainsStraggler(t *testing.T) {
t.Parallel()
const grace = 150 * time.Millisecond

r := New(stragglerListener{}, Config{GoawayTimeout: grace})

// Establish a real session pair. The SETUP handshake is symmetric, so the
// client and server ends must run concurrently.
clientConn, serverConn := sessiontest.NewConnPair()
type result struct {
s *session.Session
err error
}
clientCh := make(chan result, 1)
serverCh := make(chan result, 1)
go func() { s, err := session.Client(t.Context(), clientConn); clientCh <- result{s, err} }()
go func() { s, err := session.Server(t.Context(), serverConn); serverCh <- result{s, err} }()
cl, sv := <-clientCh, <-serverCh
if cl.err != nil || sv.err != nil {
t.Fatalf("handshake failed: client=%v server=%v", cl.err, sv.err)
}
clientSess, serverSess := cl.s, sv.s

// Simulate Stop having already begun: beginShutdown marks shuttingDown and
// snapshots the (still empty) session set. The straggler registers next.
if snap := r.beginShutdown(); len(snap) != 0 {
t.Fatalf("beginShutdown snapshot = %d sessions, want 0", len(snap))
}

// addSession must observe shuttingDown and take ownership of the drain.
r.addSession(serverSess)

// drainStraggler must GOAWAY the peer...
select {
case <-clientSess.GoawayReceived():
case <-time.After(2 * time.Second):
t.Fatal("client did not receive GOAWAY from straggler drain")
}

// ...and, because this client ignores the GOAWAY, force-close at the grace
// boundary so the session terminates.
select {
case <-serverSess.Done():
case <-time.After(2 * time.Second):
t.Fatal("straggler session was not closed after the grace period")
}

// drainStraggler runs under r.handlers, so Stop's handlers.Wait joins it.
// Wait here too: it must return promptly now that the session is closed.
r.handlers.Wait()

_ = clientSess.Close(0, "")
}