From 80e2fc4199fd180fb1d523caf84081f83fa507c2 Mon Sep 17 00:00:00 2001 From: Erik Dubbelboer Date: Fri, 27 Mar 2026 02:53:07 +0100 Subject: [PATCH] Improve leader election Don't do a full lock on `peers`. Instead of excluding timed our peers we only include not timed out peers that are in the lobby as potential leaders. --- internal/signaling/stores/postgres.go | 100 ++++++++++---------------- 1 file changed, 37 insertions(+), 63 deletions(-) diff --git a/internal/signaling/stores/postgres.go b/internal/signaling/stores/postgres.go index aa469d0..ce50b24 100644 --- a/internal/signaling/stores/postgres.go +++ b/internal/signaling/stores/postgres.go @@ -525,12 +525,6 @@ func (s *PostgresStore) ClaimNextTimedOutPeer(ctx context.Context, threshold tim } defer tx.Rollback(context.Background()) //nolint:errcheck - // DELETE FROM peers will lock the row for this peer in this transaction. - // This means we can safely remove the peer from lobbies without getting a - // race condition with DoLeaderElection. - // It is important that both ClaimNextTimedOutPeer and DoLeaderElection always - // lock peers first to avoid deadlocks. - var peerID string var disconnected bool err = tx.QueryRow(ctx, ` @@ -622,42 +616,6 @@ func (s *PostgresStore) DoLeaderElection(ctx context.Context, gameID, lobbyCode } defer tx.Rollback(context.Background()) //nolint:errcheck - // We need to lock the whole table as SELECT FOR UPDATE does not lock rows that do not exist yet - // And we can't have timed out peers being added during the election. - _, err = tx.Exec(ctx, ` - LOCK TABLE peers IN EXCLUSIVE MODE - `) - if err != nil { - return nil, err - } - - var timedOutPeers []string - rows, err := tx.Query(ctx, ` - SELECT peer - FROM peers - WHERE disconnected = TRUE - `) - if err != nil { - if !errors.Is(err, pgx.ErrNoRows) { - return nil, err - } - } else { - defer rows.Close() //nolint:errcheck - - for rows.Next() { - var peer string - err = rows.Scan(&peer) - if err != nil { - return nil, err - } - timedOutPeers = append(timedOutPeers, peer) - } - - if err = rows.Err(); err != nil { - return nil, err - } - } - var currentLeader string var currentTerm int var peers []string @@ -675,21 +633,41 @@ func (s *PostgresStore) DoLeaderElection(ctx context.Context, gameID, lobbyCode return nil, err } - needNewLeader := currentLeader == "" + // Locking the lobby row serializes membership and leader updates. We only + // consider peers that still exist and are connected, so a peer deleted by + // timeout cleanup cannot remain leader even before it is removed from peers[]. + connectedPeers := make([]string, 0, len(peers)) + if len(peers) > 0 { + rows, err := tx.Query(ctx, ` + SELECT peer + FROM peers + WHERE game = $1 + AND peer = ANY($2) + AND disconnected = FALSE + `, gameID, peers) + if err != nil { + if !errors.Is(err, pgx.ErrNoRows) { + return nil, err + } + } else { + defer rows.Close() //nolint:errcheck + + for rows.Next() { + var peer string + err = rows.Scan(&peer) + if err != nil { + return nil, err + } + connectedPeers = append(connectedPeers, peer) + } - if !needNewLeader { - found := slices.Contains(peers, currentLeader) - if !found { - needNewLeader = true + if err = rows.Err(); err != nil { + return nil, err + } } } - if !needNewLeader { - found := slices.Contains(timedOutPeers, currentLeader) - if found { - needNewLeader = true - } - } + needNewLeader := currentLeader == "" || !slices.Contains(connectedPeers, currentLeader) if !needNewLeader { return nil, nil @@ -697,21 +675,17 @@ func (s *PostgresStore) DoLeaderElection(ctx context.Context, gameID, lobbyCode if isTestEnv { // In tests we want to have a deterministic leader. - sort.Strings(peers) + sort.Strings(connectedPeers) } else { - // Randomize the order of the peers to avoid always picking the same leader. - rand.Shuffle(len(peers), func(i, j int) { - peers[i], peers[j] = peers[j], peers[i] + // Randomize the order of connected peers to avoid always picking the same leader. + rand.Shuffle(len(connectedPeers), func(i, j int) { + connectedPeers[i], connectedPeers[j] = connectedPeers[j], connectedPeers[i] }) } newLeader := "" - for _, peer := range peers { - found := slices.Contains(timedOutPeers, peer) - if !found { - newLeader = peer - break - } + if len(connectedPeers) > 0 { + newLeader = connectedPeers[0] } newTerm := currentTerm + 1