From 7fd3640a94b665d7610d97a3303090eb2bbb7fd0 Mon Sep 17 00:00:00 2001 From: dnovitski <54758025+dnovitski@users.noreply.github.com> Date: Thu, 30 Apr 2026 13:28:09 +0200 Subject: [PATCH] Fix OOM when allEventsUpToLockProcessed buffer equals MaxRetries() PR #1637 buffered allEventsUpToLockProcessed to MaxRetries() to prevent a goroutine deadlock when waitForEventsUpToLock times out during cutover. However, when --default-retries is set to a very large value (e.g. 9999999999999), Go tries to allocate a channel with trillions of buffer slots, causing an immediate OOM crash before the migration even starts. Replace the MaxRetries()-sized buffer with a buffer of 1 and overwrite-oldest (latest-wins) send semantics. When the buffer is full (receiver timed out on a previous attempt), the stale message is drained before sending the current sentinel. This guarantees: - The current sentinel is always delivered (no message loss) - The executeWriteFuncs worker is never blocked (no deadlock) - Memory usage is constant regardless of MaxRetries() (no OOM) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- go/logic/migrator.go | 36 ++++++++++++++++++----- go/logic/migrator_test.go | 60 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index e11a2266c..4e5f0cc42 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -111,9 +111,10 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { ghostTableMigrated: make(chan bool), firstThrottlingCollected: make(chan bool, 3), rowCopyComplete: make(chan error), - // Buffered to MaxRetries() to prevent a deadlock when waitForEventsUpToLock times out. - // (see https://github.com/github/gh-ost/pull/1637) - allEventsUpToLockProcessed: make(chan *lockProcessedStruct, context.MaxRetries()), + // Buffered with capacity 1; the send uses overwrite-oldest semantics + // to prevent both deadlock (see https://github.com/github/gh-ost/pull/1637) + // and OOM when MaxRetries() is extremely large. + allEventsUpToLockProcessed: make(chan *lockProcessedStruct, 1), copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), @@ -276,11 +277,32 @@ func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e // Use helper to prevent deadlock if migration aborts before receiver is ready _ = base.SendWithContext(this.migrationContext.GetContext(), this.ghostTableMigrated, true) case AllEventsUpToLockProcessed: + lps := &lockProcessedStruct{ + state: changelogStateString, + coords: dmlEntry.Coordinates.Clone(), + } var applyEventFunc tableWriteFunc = func() error { - return base.SendWithContext(this.migrationContext.GetContext(), this.allEventsUpToLockProcessed, &lockProcessedStruct{ - state: changelogStateString, - coords: dmlEntry.Coordinates.Clone(), - }) + // Non-blocking send with overwrite-oldest semantics: if the buffer is + // full (receiver timed out on a previous attempt), drain the stale + // message first so the current sentinel is always delivered. This + // prevents both goroutine leaks (the original PR #1637 issue) and OOM + // when MaxRetries() is very large. + select { + case this.allEventsUpToLockProcessed <- lps: + default: + // Buffer full — drain the stale value, then send the current one. + select { + case <-this.allEventsUpToLockProcessed: + default: + } + select { + case this.allEventsUpToLockProcessed <- lps: + default: + // Concurrent drain by another goroutine or receiver; the current + // value is no longer needed since a newer sentinel will follow. + } + } + return nil } // at this point we know all events up to lock have been read from the streamer, // because the streamer works sequentially. So those events are either already handled, diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index f731035e1..df0d21733 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -81,6 +81,66 @@ func TestMigratorOnChangelogEvent(t *testing.T) { wg.Wait() }) + t.Run("state-AllEventsUpToLockProcessed-overwrite-oldest", func(t *testing.T) { + // Simulate the scenario where the receiver (waitForEventsUpToLock) timed out + // and a stale message sits in the channel buffer. The next sentinel must + // overwrite the stale one so the current attempt's message is delivered. + m := NewMigrator(base.NewMigrationContext(), "test") + m.applier = NewApplier(m.migrationContext) + + sendChangelogEvent := func(challenge string) { + columnValues := sql.ToColumnValues([]interface{}{ + 123, + time.Now().Unix(), + "state", + challenge, + }) + require.NoError(t, m.onChangelogEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: columnValues}, + Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)), + })) + } + + executeWriteFunc := func() { + es := <-m.applyEventsQueue + require.NotNil(t, es.writeFunc) + require.NoError(t, (*es.writeFunc)()) + } + + // Attempt 1: send sentinel and execute the writeFunc to deliver it + sendChangelogEvent("AllEventsUpToLockProcessed:attempt1") + executeWriteFunc() + + // The message sits unconsumed in allEventsUpToLockProcessed (simulating a timeout) + require.Len(t, m.allEventsUpToLockProcessed, 1) + + // Attempt 2: send a new sentinel — must overwrite the stale one + sendChangelogEvent("AllEventsUpToLockProcessed:attempt2") + executeWriteFunc() + + // The channel should contain exactly the latest message + require.Len(t, m.allEventsUpToLockProcessed, 1) + msg := <-m.allEventsUpToLockProcessed + require.Equal(t, "AllEventsUpToLockProcessed:attempt2", msg.state) + }) + + t.Run("NewMigrator-with-extreme-MaxRetries", func(t *testing.T) { + // Regression test: an extremely large --default-retries value must not + // cause an OOM when creating the migrator. Before the fix, + // allEventsUpToLockProcessed was buffered to MaxRetries(), which tried + // to allocate a ~10 trillion element channel. + ctx := base.NewMigrationContext() + ctx.SetDefaultNumRetries(9999999999999) + require.Equal(t, int64(9999999999999), ctx.MaxRetries()) + + m := NewMigrator(ctx, "test") + require.NotNil(t, m) + require.Equal(t, 1, cap(m.allEventsUpToLockProcessed)) + }) + t.Run("state-GhostTableMigrated", func(t *testing.T) { go func() { require.True(t, <-migrator.ghostTableMigrated)