Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator {
ghostTableMigrated: make(chan bool),
firstThrottlingCollected: make(chan bool, 3),
rowCopyComplete: make(chan error),
// Buffered to MaxRetries() to prevent a deadlock when waitForEventsUpToLock times out.
// (see https://github.com/github/gh-ost/pull/1637)
allEventsUpToLockProcessed: make(chan *lockProcessedStruct, context.MaxRetries()),
// Buffered with capacity 1; the send uses overwrite-oldest semantics
// to prevent both deadlock (see https://github.com/github/gh-ost/pull/1637)
// and OOM when MaxRetries() is extremely large.
allEventsUpToLockProcessed: make(chan *lockProcessedStruct, 1),

copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize),
Expand Down Expand Up @@ -276,11 +277,32 @@ func (this *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e
// Use helper to prevent deadlock if migration aborts before receiver is ready
_ = base.SendWithContext(this.migrationContext.GetContext(), this.ghostTableMigrated, true)
case AllEventsUpToLockProcessed:
lps := &lockProcessedStruct{
state: changelogStateString,
coords: dmlEntry.Coordinates.Clone(),
}
var applyEventFunc tableWriteFunc = func() error {
return base.SendWithContext(this.migrationContext.GetContext(), this.allEventsUpToLockProcessed, &lockProcessedStruct{
state: changelogStateString,
coords: dmlEntry.Coordinates.Clone(),
})
// Non-blocking send with overwrite-oldest semantics: if the buffer is
// full (receiver timed out on a previous attempt), drain the stale
// message first so the current sentinel is always delivered. This
// prevents both goroutine leaks (the original PR #1637 issue) and OOM
// when MaxRetries() is very large.
select {
case this.allEventsUpToLockProcessed <- lps:
default:
// Buffer full — drain the stale value, then send the current one.
select {
case <-this.allEventsUpToLockProcessed:
default:
}
select {
case this.allEventsUpToLockProcessed <- lps:
default:
// Concurrent drain by another goroutine or receiver; the current
// value is no longer needed since a newer sentinel will follow.
}
}
return nil
}
// at this point we know all events up to lock have been read from the streamer,
// because the streamer works sequentially. So those events are either already handled,
Expand Down
60 changes: 60 additions & 0 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,66 @@ func TestMigratorOnChangelogEvent(t *testing.T) {
wg.Wait()
})

t.Run("state-AllEventsUpToLockProcessed-overwrite-oldest", func(t *testing.T) {
// Simulate the scenario where the receiver (waitForEventsUpToLock) timed out
// and a stale message sits in the channel buffer. The next sentinel must
// overwrite the stale one so the current attempt's message is delivered.
m := NewMigrator(base.NewMigrationContext(), "test")
m.applier = NewApplier(m.migrationContext)

sendChangelogEvent := func(challenge string) {
columnValues := sql.ToColumnValues([]interface{}{
123,
time.Now().Unix(),
"state",
challenge,
})
require.NoError(t, m.onChangelogEvent(&binlog.BinlogEntry{
DmlEvent: &binlog.BinlogDMLEvent{
DatabaseName: "test",
DML: binlog.InsertDML,
NewColumnValues: columnValues},
Coordinates: mysql.NewFileBinlogCoordinates("mysql-bin.000004", int64(4)),
}))
}

executeWriteFunc := func() {
es := <-m.applyEventsQueue
require.NotNil(t, es.writeFunc)
require.NoError(t, (*es.writeFunc)())
}

// Attempt 1: send sentinel and execute the writeFunc to deliver it
sendChangelogEvent("AllEventsUpToLockProcessed:attempt1")
executeWriteFunc()

// The message sits unconsumed in allEventsUpToLockProcessed (simulating a timeout)
require.Len(t, m.allEventsUpToLockProcessed, 1)

// Attempt 2: send a new sentinel — must overwrite the stale one
sendChangelogEvent("AllEventsUpToLockProcessed:attempt2")
executeWriteFunc()

// The channel should contain exactly the latest message
require.Len(t, m.allEventsUpToLockProcessed, 1)
msg := <-m.allEventsUpToLockProcessed
require.Equal(t, "AllEventsUpToLockProcessed:attempt2", msg.state)
})

t.Run("NewMigrator-with-extreme-MaxRetries", func(t *testing.T) {
// Regression test: an extremely large --default-retries value must not
// cause an OOM when creating the migrator. Before the fix,
// allEventsUpToLockProcessed was buffered to MaxRetries(), which tried
// to allocate a ~10 trillion element channel.
ctx := base.NewMigrationContext()
ctx.SetDefaultNumRetries(9999999999999)
require.Equal(t, int64(9999999999999), ctx.MaxRetries())

m := NewMigrator(ctx, "test")
require.NotNil(t, m)
require.Equal(t, 1, cap(m.allEventsUpToLockProcessed))
})

t.Run("state-GhostTableMigrated", func(t *testing.T) {
go func() {
require.True(t, <-migrator.ghostTableMigrated)
Expand Down
Loading