Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 37 additions & 63 deletions internal/signaling/stores/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,12 +525,6 @@ func (s *PostgresStore) ClaimNextTimedOutPeer(ctx context.Context, threshold tim
}
defer tx.Rollback(context.Background()) //nolint:errcheck

// DELETE FROM peers will lock the row for this peer in this transaction.
// This means we can safely remove the peer from lobbies without getting a
// race condition with DoLeaderElection.
// It is important that both ClaimNextTimedOutPeer and DoLeaderElection always
// lock peers first to avoid deadlocks.

var peerID string
var disconnected bool
err = tx.QueryRow(ctx, `
Expand Down Expand Up @@ -622,42 +616,6 @@ func (s *PostgresStore) DoLeaderElection(ctx context.Context, gameID, lobbyCode
}
defer tx.Rollback(context.Background()) //nolint:errcheck

// We need to lock the whole table as SELECT FOR UPDATE does not lock rows that do not exist yet
// And we can't have timed out peers being added during the election.
_, err = tx.Exec(ctx, `
LOCK TABLE peers IN EXCLUSIVE MODE
`)
if err != nil {
return nil, err
}

var timedOutPeers []string
rows, err := tx.Query(ctx, `
SELECT peer
FROM peers
WHERE disconnected = TRUE
`)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return nil, err
}
} else {
defer rows.Close() //nolint:errcheck

for rows.Next() {
var peer string
err = rows.Scan(&peer)
if err != nil {
return nil, err
}
timedOutPeers = append(timedOutPeers, peer)
}

if err = rows.Err(); err != nil {
return nil, err
}
}

var currentLeader string
var currentTerm int
var peers []string
Expand All @@ -675,43 +633,59 @@ func (s *PostgresStore) DoLeaderElection(ctx context.Context, gameID, lobbyCode
return nil, err
}

needNewLeader := currentLeader == ""
// Locking the lobby row serializes membership and leader updates. We only
// consider peers that still exist and are connected, so a peer deleted by
// timeout cleanup cannot remain leader even before it is removed from peers[].
connectedPeers := make([]string, 0, len(peers))
if len(peers) > 0 {
rows, err := tx.Query(ctx, `
SELECT peer
FROM peers
WHERE game = $1
AND peer = ANY($2)
AND disconnected = FALSE
`, gameID, peers)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return nil, err
}
} else {
defer rows.Close() //nolint:errcheck

for rows.Next() {
var peer string
err = rows.Scan(&peer)
if err != nil {
return nil, err
}
connectedPeers = append(connectedPeers, peer)
}

if !needNewLeader {
found := slices.Contains(peers, currentLeader)
if !found {
needNewLeader = true
if err = rows.Err(); err != nil {
return nil, err
}
}
}

if !needNewLeader {
found := slices.Contains(timedOutPeers, currentLeader)
if found {
needNewLeader = true
}
}
needNewLeader := currentLeader == "" || !slices.Contains(connectedPeers, currentLeader)

if !needNewLeader {
return nil, nil
}

if isTestEnv {
// In tests we want to have a deterministic leader.
sort.Strings(peers)
sort.Strings(connectedPeers)
} else {
// Randomize the order of the peers to avoid always picking the same leader.
rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
// Randomize the order of connected peers to avoid always picking the same leader.
rand.Shuffle(len(connectedPeers), func(i, j int) {
connectedPeers[i], connectedPeers[j] = connectedPeers[j], connectedPeers[i]
})
}

newLeader := ""
for _, peer := range peers {
found := slices.Contains(timedOutPeers, peer)
if !found {
newLeader = peer
break
}
if len(connectedPeers) > 0 {
newLeader = connectedPeers[0]
}

newTerm := currentTerm + 1
Expand Down
Loading