Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,8 @@ func (d *BasicDispatcher) isFirstEvent(event commonEvent.Event) bool {
if event.GetCommitTs() > d.startTs {
return true
}
// the first syncpoint event can be same as startTs
case commonEvent.TypeResolvedEvent, commonEvent.TypeSyncPointEvent:
// the first syncpoint / resolved / handshake event can be same as startTs
case commonEvent.TypeResolvedEvent, commonEvent.TypeSyncPointEvent, commonEvent.TypeHandshakeEvent:
if event.GetCommitTs() >= d.startTs {
return true
}
Expand Down Expand Up @@ -709,9 +709,14 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
})
d.DealWithBlockEvent(syncPoint)
case commonEvent.TypeHandshakeEvent:
log.Warn("Receive handshake event unexpectedly",
// Handshake means the event collector has registered/reset to the requested startTs and can
// safely consume this span. Idle tables may not receive a later resolved/DML promptly, so we
// treat handshake as the first live event and let the earlier updateDispatcherStatusToWorking()
// unblock the maintainer add operator.
log.Debug("dispatcher receive handshake event",
zap.Stringer("dispatcher", d.id),
zap.Any("event", event))
zap.Uint64("commitTs", event.GetCommitTs()),
zap.Uint64("seq", event.GetSeq()))
default:
log.Panic("Unexpected event type",
zap.Int("eventType", event.GetType()),
Expand Down
70 changes: 44 additions & 26 deletions downstreamadapter/dispatcher/event_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/ticdc/logservice/schemastore"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/sink/codec"
"go.uber.org/zap"
Expand All @@ -44,7 +45,7 @@ type EventDispatcher struct {
// cacheEvents is used to store events with a commit-ts greater than redoGlobalTs
cacheEvents struct {
sync.Mutex
events chan cacheEvents
events []cacheEvents
}
}

Expand Down Expand Up @@ -85,22 +86,34 @@ func NewEventDispatcher(
redoEnable: redoEnable,
redoGlobalTs: redoGlobalTs,
}
dispatcher.cacheEvents.events = make(chan cacheEvents, 1)
dispatcher.cacheEvents.events = make([]cacheEvents, 0, 1)
return dispatcher
}

// HandleCacheEvents called when redoGlobalTs is updated
func (d *EventDispatcher) HandleCacheEvents() {
select {
case cacheEvents, ok := <-d.cacheEvents.events:
if !ok {
return
}
block := d.HandleEvents(cacheEvents.events, cacheEvents.wakeCallback)
if !block {
cacheEvents.wakeCallback()
d.cacheEvents.Lock()
if len(d.cacheEvents.events) == 0 {
d.cacheEvents.Unlock()
return
}
cached := d.cacheEvents.events[0]
d.cacheEvents.events[0] = cacheEvents{}
d.cacheEvents.events = d.cacheEvents.events[1:]
d.cacheEvents.Unlock()

wakeAndContinue := func() {
// Drain cached redo-gated batches before waking the live stream again so
// already-buffered older events keep their ordering priority.
d.HandleCacheEvents()
if cached.wakeCallback != nil {
cached.wakeCallback()
}
default:
}

block := d.HandleEvents(cached.events, wakeAndContinue)
if !block {
wakeAndContinue()
}
}

Expand All @@ -117,22 +130,27 @@ func (d *EventDispatcher) cache(dispatcherEvents []DispatcherEvent, wakeCallback
events: append(make([]DispatcherEvent, 0, len(dispatcherEvents)), dispatcherEvents...),
wakeCallback: wakeCallback,
}
select {
case d.cacheEvents.events <- cacheEvents:
log.Debug("cache events",
zap.Stringer("dispatcher", d.id),
zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()),
zap.Int("length", len(dispatcherEvents)),
zap.Int("eventType", dispatcherEvents[len(dispatcherEvents)-1].Event.GetType()),
zap.Uint64("commitTs", dispatcherEvents[len(dispatcherEvents)-1].Event.GetCommitTs()),
zap.Uint64("redoGlobalTs", d.redoGlobalTs.Load()),
)
default:
log.Panic("dispatcher cache events is full", zap.Stringer("dispatcher", d.id), zap.Int("len", len(d.cacheEvents.events)))
}
d.cacheEvents.events = append(d.cacheEvents.events, cacheEvents)
log.Debug("cache events",
zap.Stringer("dispatcher", d.id),
zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()),
zap.Int("length", len(dispatcherEvents)),
zap.Int("eventType", dispatcherEvents[len(dispatcherEvents)-1].Event.GetType()),
zap.Uint64("commitTs", dispatcherEvents[len(dispatcherEvents)-1].Event.GetCommitTs()),
zap.Uint64("redoGlobalTs", d.redoGlobalTs.Load()),
zap.Int("cached", len(d.cacheEvents.events)),
)
}

func (d *EventDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool {
// Handshake has no downstream side effect. If we cache it behind redoGlobalTs,
// a recreated normal dispatcher can stay Initializing forever and keep the add
// operator from finishing, which in turn pins the global checkpoint.
if len(dispatcherEvents) == 1 &&
dispatcherEvents[0].Event.GetType() == commonEvent.TypeHandshakeEvent {
return d.handleEvents(dispatcherEvents, wakeCallback)
}

// if the commit-ts of last event of dispatcherEvents is greater than redoGlobalTs,
// the dispatcherEvents will be cached util the redoGlobalTs is updated.
if d.redoEnable && len(dispatcherEvents) > 0 && d.redoGlobalTs.Load() < dispatcherEvents[len(dispatcherEvents)-1].Event.GetCommitTs() {
Expand All @@ -148,7 +166,7 @@ func (d *EventDispatcher) Remove() {
if d.isRemoving.CompareAndSwap(false, true) {
d.cacheEvents.Lock()
defer d.cacheEvents.Unlock()
close(d.cacheEvents.events)
d.cacheEvents.events = nil
}
d.removeDispatcher()
}
Expand Down Expand Up @@ -226,7 +244,7 @@ func (d *EventDispatcher) EmitBootstrap() bool {
return true
}

// cacheEvents cache the events which commit-ts is less than or equal the redoGlobalTs
// cacheEvents caches events whose commit-ts is greater than redoGlobalTs.
// it will be used when redoEnable is true
type cacheEvents struct {
events []DispatcherEvent
Expand Down
202 changes: 202 additions & 0 deletions downstreamadapter/dispatcher/event_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,69 @@ func TestDispatcherHandleEvents(t *testing.T) {
t.Run("cloud storage wake callback after batch enqueue", verifyDMLWakeCallbackStorageAfterBatchEnqueue)
}

func TestDispatcherHandshakePromotesToWorking(t *testing.T) {
tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID())
require.NoError(t, err)

dispatcher := newDispatcherForTest(newDispatcherTestSink(t, common.MysqlSinkType).Sink(), tableSpan)
require.Equal(t, heartbeatpb.ComponentState_Initializing, dispatcher.GetComponentStatus())

nodeID := node.NewID()
handshake := commonEvent.NewHandshakeEvent(dispatcher.GetId(), dispatcher.GetStartTs(), 1, &common.TableInfo{})
block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, &handshake)}, func() {})
require.False(t, block)
require.Equal(t, heartbeatpb.ComponentState_Working, dispatcher.GetComponentStatus())

select {
case status := <-dispatcher.sharedInfo.statusesChan:
require.Equal(t, dispatcher.GetId().ToPB(), status.ID)
require.Equal(t, heartbeatpb.ComponentState_Working, status.ComponentStatus)
require.Equal(t, dispatcher.GetCheckpointTs(), status.CheckpointTs)
default:
t.Fatal("expected dispatcher working status after handshake")
}
}

func TestDispatcherHandshakeBypassesRedoCache(t *testing.T) {
tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID())
require.NoError(t, err)

var redoTs atomic.Uint64
redoTs.Store(0)
sharedInfo := newTestSharedInfo(false, false, newTestSyncPointConfig())
dispatcher := NewEventDispatcher(
common.NewDispatcherID(),
tableSpan,
common.Ts(100),
1,
NewSchemaIDToDispatchers(),
false,
false,
common.Ts(100),
newDispatcherTestSink(t, common.MysqlSinkType).Sink(),
sharedInfo,
true,
&redoTs,
)
require.Equal(t, heartbeatpb.ComponentState_Initializing, dispatcher.GetComponentStatus())

nodeID := node.NewID()
handshake := commonEvent.NewHandshakeEvent(dispatcher.GetId(), dispatcher.GetStartTs(), 1, &common.TableInfo{})
block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, &handshake)}, func() {})
require.False(t, block)
require.Equal(t, heartbeatpb.ComponentState_Working, dispatcher.GetComponentStatus())
require.Empty(t, dispatcher.cacheEvents.events)

select {
case status := <-dispatcher.sharedInfo.statusesChan:
require.Equal(t, dispatcher.GetId().ToPB(), status.ID)
require.Equal(t, heartbeatpb.ComponentState_Working, status.ComponentStatus)
require.Equal(t, dispatcher.GetCheckpointTs(), status.CheckpointTs)
default:
t.Fatal("expected dispatcher working status after handshake")
}
}

func TestDispatcherIgnoresStaleIgnoredBlockStatus(t *testing.T) {
tableSpan := getUncompleteTableSpan()
tableSpan.KeyspaceID = getTestingKeyspaceID()
Expand Down Expand Up @@ -1236,6 +1299,145 @@ func TestDispatcher_SkipDMLAsStartTs_Disabled(t *testing.T) {
require.Equal(t, 1, len(mockSink.GetDMLs()), "DML at commitTs=100 should be processed when skipDMLAsStartTs=false")
}

func TestEventDispatcherRedoCachesMultipleBlockedDDLEvents(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")
helper.DDL2Job("create table t(id int primary key, v int)")
dmlEvent := helper.DML2Event("test", "t", "insert into t values(1, 1)")
require.NotNil(t, dmlEvent)

testSink := newDispatcherTestSink(t, common.MysqlSinkType)
tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID())
require.NoError(t, err)

var redoTs atomic.Uint64
redoTs.Store(1)
sharedInfo := newTestSharedInfo(false, false, newTestSyncPointConfig())
dispatcher := NewEventDispatcher(
common.NewDispatcherID(),
tableSpan,
common.Ts(0),
1,
NewSchemaIDToDispatchers(),
false,
false,
common.Ts(0),
testSink.Sink(),
sharedInfo,
true,
&redoTs,
)

ddlEvent1 := &commonEvent.DDLEvent{
FinishedTs: 5,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{0},
},
TableInfo: dmlEvent.TableInfo,
Query: "alter table t add column c1 int",
}
ddlEvent2 := &commonEvent.DDLEvent{
FinishedTs: 6,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{0},
},
TableInfo: dmlEvent.TableInfo,
Query: "alter table t add column c2 int",
}

nodeID := node.NewID()
var wakeCount atomic.Int32
wake := func() {
wakeCount.Add(1)
}

block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent1)}, wake)
require.True(t, block)
require.Equal(t, 1, len(dispatcher.cacheEvents.events))

require.NotPanics(t, func() {
block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent2)}, wake)
})
require.True(t, block)
require.Equal(t, 2, len(dispatcher.cacheEvents.events))
require.Equal(t, int32(0), wakeCount.Load())

redoTs.Store(math.MaxUint64)
dispatcher.HandleCacheEvents()
dispatcher.HandleCacheEvents()

require.Eventually(t, func() bool {
return wakeCount.Load() == 2
}, 5*time.Second, 50*time.Millisecond)
require.Equal(t, 0, len(dispatcher.cacheEvents.events))
}

func TestEventDispatcherRedoDrainsAllReleasableCachedEvents(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")
helper.DDL2Job("create table t(id int primary key, v int)")
dmlEvent := helper.DML2Event("test", "t", "insert into t values(1, 1)")
require.NotNil(t, dmlEvent)

testSink := newDispatcherTestSink(t, common.MysqlSinkType)
tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID())
require.NoError(t, err)

var redoTs atomic.Uint64
redoTs.Store(1)
sharedInfo := newTestSharedInfo(false, false, newTestSyncPointConfig())
dispatcher := NewEventDispatcher(
common.NewDispatcherID(),
tableSpan,
common.Ts(0),
1,
NewSchemaIDToDispatchers(),
false,
false,
common.Ts(0),
testSink.Sink(),
sharedInfo,
true,
&redoTs,
)

nodeID := node.NewID()
makeDDL := func(commitTs uint64, query string) *commonEvent.DDLEvent {
return &commonEvent.DDLEvent{
FinishedTs: commitTs,
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{0},
},
TableInfo: dmlEvent.TableInfo,
Query: query,
}
}

var wakeCount atomic.Int32
wake := func() {
wakeCount.Add(1)
}

require.True(t, dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, makeDDL(5, "alter table t add column c1 int"))}, wake))
require.True(t, dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, makeDDL(6, "alter table t add column c2 int"))}, wake))
require.Equal(t, 2, len(dispatcher.cacheEvents.events))

redoTs.Store(math.MaxUint64)
dispatcher.HandleCacheEvents()

require.Eventually(t, func() bool {
return wakeCount.Load() == 2
}, 5*time.Second, 50*time.Millisecond)
require.Equal(t, 0, len(dispatcher.cacheEvents.events))
}

func TestHoldBlockEventUntilNoResendTasks(t *testing.T) {
keyspaceID := getTestingKeyspaceID()
ddlTableSpan := common.KeyspaceDDLSpan(keyspaceID)
Expand Down
6 changes: 6 additions & 0 deletions downstreamadapter/eventcollector/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,12 @@ func (d *dispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent)
}
state.lastEventSeq.Store(handshakeEvent.Seq)
d.observeCurrentEpochMaxEventTs(state, handshakeEvent.GetCommitTs())

// Forward the handshake to the dispatcher so an idle table can still move from
// Initializing to Working after register/reset/handshake, even before any later
// resolved or DML event arrives. This unblocks maintainer add operators without
// advancing sink-side checkpoint beyond the collector-observed handshake ts.
_ = d.target.HandleEvents([]dispatcher.DispatcherEvent{event}, func() { d.wake() })
}

func (d *dispatcherStat) getHeartbeatProgressForEventService() (uint64, uint64) {
Expand Down
Loading
Loading