diff --git a/pkg/relay/relay.go b/pkg/relay/relay.go index 82291cc..8bf1d99 100644 --- a/pkg/relay/relay.go +++ b/pkg/relay/relay.go @@ -219,9 +219,13 @@ type Relay struct { // sessions tracks every Session that has completed SETUP and not yet // been torn down. Stop iterates it under sessionsMu to broadcast GOAWAY - // and to wait for drain. - sessionsMu sync.Mutex - sessions map[*session.Session]struct{} + // and to wait for drain. shuttingDown is set (under sessionsMu, by + // beginShutdown) when Stop snapshots the set; addSession reads it under the + // same lock to decide whether a newly-registered session is a straggler + // Stop's snapshot missed. + sessionsMu sync.Mutex + sessions map[*session.Session]struct{} + shuttingDown bool // stopOnce guards Stop so the second caller short-circuits. stopCh is // closed by Stop to signal the accept loop to exit and to release any @@ -447,44 +451,10 @@ func (r *Relay) serveSession(ctx context.Context, sess *session.Session) { r.names.RemoveSession(sess) }() - // Watch for Stop: if it fires while this handler is running, we send - // GOAWAY and then either wait for the session to drain or close it - // when GoawayTimeout elapses. This is BOTH the failsafe for the race - // where Stop snapshots the live-session set BEFORE addSession - // registers us AND the normal per-session drain path — Stop's - // broadcast loop above sends GOAWAY too, but SendGoaway is idempotent, - // so the second call from here is a silent no-op when we won the - // race. The wait-and-close logic in this goroutine is what gives the - // peer a real chance to migrate before we tear the conn down. - stopWatchDone := make(chan struct{}) - defer func() { - close(stopWatchDone) - }() - go func() { - select { - case <-r.stopCh: - case <-sess.Done(): - return - case <-stopWatchDone: - return - } - // Send GOAWAY (idempotent — Stop's broadcast may have done - // it already). Then wait for either the peer to drain or - // the configured timeout. - if r.cfg.GoawayTimeout > 0 { - _ = sess.SendGoaway(r.cfg.GoawayTimeout, "") - timer := time.NewTimer(r.cfg.GoawayTimeout) - defer timer.Stop() - select { - case <-timer.C: - case <-sess.Done(): - return // peer drained cleanly, nothing else to do - case <-stopWatchDone: - return - } - } - _ = sess.Close(moqt.SessionGoawayTimeout, "relay shutdown") - }() + // Shutdown drain is owned elsewhere: Stop runs the GOAWAY / grace / close + // lifecycle for every session in the snapshot it takes under sessionsMu, + // and addSession runs it for any straggler that registered after that + // snapshot (see addSession). serveSession itself does not watch for Stop. handler := newSessionHandler( sess, r.log, r.tracks, r.names, @@ -520,9 +490,13 @@ func (r *Relay) Stop(ctx context.Context) error { r.upstreams.close() } - // 2. Snapshot the session set so we can iterate without holding - // the lock while doing potentially-blocking session work. - sessions := r.snapshotSessions() + // 2. Mark shutdown in progress and snapshot the session set in one + // atomic step, so we can iterate without holding the lock while + // doing potentially-blocking session work. The atomicity partitions + // sessions cleanly: every session is either in this snapshot (its + // drain is owned by steps 3–5 below) or registered later (it observes + // shuttingDown in addSession and owns its own drain) — never both. + sessions := r.beginShutdown() // 3. Send GOAWAY to each session if a grace period is set. A // zero timeout means "don't bother with GOAWAY"; close @@ -584,22 +558,40 @@ func (r *Relay) Stop(ctx context.Context) error { func (r *Relay) addSession(s *session.Session) { r.sessionsMu.Lock() r.sessions[s] = struct{}{} + shuttingDown := r.shuttingDown r.sessionsMu.Unlock() r.cfg.Metrics.SessionOpened() - // Race-window cover: if Stop already closed stopCh BEFORE we got here, - // Stop's broadcast loop missed us (it snapshotted the session set - // before we registered). Send our own GOAWAY so the peer still sees - // notification before the per-handler failsafe forcibly closes us. - // SendGoaway is idempotent — calling it twice (once here, once from - // Stop) just returns "already sent" on the second call. - select { - case <-r.stopCh: - if r.cfg.GoawayTimeout > 0 { - _ = s.SendGoaway(r.cfg.GoawayTimeout, "") + // Straggler cover: if shutdown was already in progress when we registered, + // Stop's snapshot — taken under sessionsMu together with the shuttingDown + // flag (see beginShutdown) — does NOT include this session, so Stop will + // neither GOAWAY nor close it. Own that lifecycle here. When shutdown began + // after we registered, shuttingDown is false and Stop's snapshot covers us; + // exactly one owner either way. The drain runs under r.handlers so Stop's + // handlers.Wait joins it (safe: this runs inside serveSession, itself a + // tracked handler, so the WaitGroup counter is already non-zero). + if shuttingDown { + r.handlers.Go(func() { r.drainStraggler(s) }) + } +} + +// drainStraggler runs the GOAWAY grace + force-close lifecycle for a single +// session that registered after Stop snapshotted the live-session set, so +// Stop's bulk drain (Stop steps 3–5) does not cover it. It mirrors that bulk +// drain for one session: GOAWAY, wait for the peer to drain or the grace period +// to elapse, then force-close. Spawned by addSession only during shutdown. +func (r *Relay) drainStraggler(s *session.Session) { + if r.cfg.GoawayTimeout > 0 { + _ = s.SendGoaway(r.cfg.GoawayTimeout, "") + timer := time.NewTimer(r.cfg.GoawayTimeout) + defer timer.Stop() + select { + case <-timer.C: + case <-s.Done(): + return // peer drained within the grace period } - default: } + _ = s.Close(moqt.SessionGoawayTimeout, "relay shutdown") } func (r *Relay) removeSession(s *session.Session) { @@ -609,9 +601,15 @@ func (r *Relay) removeSession(s *session.Session) { r.cfg.Metrics.SessionClosed() } -func (r *Relay) snapshotSessions() []*session.Session { +// beginShutdown marks the relay as shutting down and returns a snapshot of the +// currently-registered sessions, atomically under sessionsMu. The atomicity is +// what lets addSession partition sessions into exactly two non-overlapping +// groups: those in the returned snapshot (drained by Stop) and those registered +// afterward (which see shuttingDown and drain themselves via drainStraggler). +func (r *Relay) beginShutdown() []*session.Session { r.sessionsMu.Lock() defer r.sessionsMu.Unlock() + r.shuttingDown = true out := make([]*session.Session, 0, len(r.sessions)) for s := range r.sessions { out = append(out, s) diff --git a/pkg/relay/shutdown_straggler_test.go b/pkg/relay/shutdown_straggler_test.go new file mode 100644 index 0000000..7a8c9f3 --- /dev/null +++ b/pkg/relay/shutdown_straggler_test.go @@ -0,0 +1,84 @@ +package relay + +import ( + "context" + "net" + "testing" + "time" + + "github.com/floatdrop/moq-go/pkg/moqt/session" + "github.com/floatdrop/moq-go/pkg/moqt/session/sessiontest" +) + +// stragglerListener is a do-nothing [Listener]: New requires a non-nil +// listener, but this test drives addSession directly and never calls Start. +type stragglerListener struct{} + +func (stragglerListener) Accept(ctx context.Context) (session.Conn, error) { + <-ctx.Done() + return nil, ctx.Err() +} +func (stragglerListener) Addr() net.Addr { return nil } +func (stragglerListener) Close() error { return nil } + +// TestRelay_addSessionDrainsStraggler pins the straggler partition that +// beginShutdown + addSession enforce. A session that registers AFTER Stop has +// snapshotted the live-session set (so the snapshot misses it) must still be +// driven through the full GOAWAY / grace / force-close lifecycle — by +// addSession's drainStraggler, since Stop's bulk drain never saw it. +// +// This is the path that the deleted per-session stopWatch goroutine used to +// cover; the test guards against a regression in the move to drainStraggler. +func TestRelay_addSessionDrainsStraggler(t *testing.T) { + t.Parallel() + const grace = 150 * time.Millisecond + + r := New(stragglerListener{}, Config{GoawayTimeout: grace}) + + // Establish a real session pair. The SETUP handshake is symmetric, so the + // client and server ends must run concurrently. + clientConn, serverConn := sessiontest.NewConnPair() + type result struct { + s *session.Session + err error + } + clientCh := make(chan result, 1) + serverCh := make(chan result, 1) + go func() { s, err := session.Client(t.Context(), clientConn); clientCh <- result{s, err} }() + go func() { s, err := session.Server(t.Context(), serverConn); serverCh <- result{s, err} }() + cl, sv := <-clientCh, <-serverCh + if cl.err != nil || sv.err != nil { + t.Fatalf("handshake failed: client=%v server=%v", cl.err, sv.err) + } + clientSess, serverSess := cl.s, sv.s + + // Simulate Stop having already begun: beginShutdown marks shuttingDown and + // snapshots the (still empty) session set. The straggler registers next. + if snap := r.beginShutdown(); len(snap) != 0 { + t.Fatalf("beginShutdown snapshot = %d sessions, want 0", len(snap)) + } + + // addSession must observe shuttingDown and take ownership of the drain. + r.addSession(serverSess) + + // drainStraggler must GOAWAY the peer... + select { + case <-clientSess.GoawayReceived(): + case <-time.After(2 * time.Second): + t.Fatal("client did not receive GOAWAY from straggler drain") + } + + // ...and, because this client ignores the GOAWAY, force-close at the grace + // boundary so the session terminates. + select { + case <-serverSess.Done(): + case <-time.After(2 * time.Second): + t.Fatal("straggler session was not closed after the grace period") + } + + // drainStraggler runs under r.handlers, so Stop's handlers.Wait joins it. + // Wait here too: it must return promptly now that the session is closed. + r.handlers.Wait() + + _ = clientSess.Close(0, "") +}