Skip to content
Draft

WIP #4840

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,9 @@ func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) error {
}

func (c *eventBroker) processTableTriggerDispatcher(ctx context.Context, dispatcherID common.DispatcherID, stat *dispatcherStat) {
if !c.checkAndSendReady(stat) {
if !c.activateDispatcherControlPlane(stat) {
return
}
c.sendHandshakeIfNeed(stat)

startTs := stat.sentResolvedTs.Load()
remoteID := node.ID(stat.info.GetServerID())
Expand Down Expand Up @@ -776,23 +775,31 @@ func (c *eventBroker) capCommitTsEndBySyncPoint(task scanTask, commitTsEnd uint6
// Note: A true return value only indicates potential scanning need,
// final determination occurs when the scanTask is actully processed.
func (c *eventBroker) scanReady(task scanTask) bool {
if task.isRemoved.Load() {
if task.isTaskScanning.Load() {
return false
}

if task.isTaskScanning.Load() {
if !c.activateDispatcherControlPlane(task) {
return false
}

ok, _ := c.getScanTaskDataRange(task)
return ok
}

// activateDispatcherControlPlane advances the dispatcher through ready/handshake
// without coupling it to scan task generation.
func (c *eventBroker) activateDispatcherControlPlane(task scanTask) bool {
if task.isRemoved.Load() {
return false
}

// If the dispatcher is not ready, we don't need do the scan.
if !c.checkAndSendReady(task) {
return false
}

c.sendHandshakeIfNeed(task)

ok, _ := c.getScanTaskDataRange(task)
return ok
return true
}

func (c *eventBroker) checkAndSendReady(task scanTask) bool {
Expand Down Expand Up @@ -1416,6 +1423,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error {
}
c.dispatchers.Store(id, dispatcherPtr)
c.metricsCollector.metricDispatcherCount.Inc()
c.activateDispatcherControlPlane(dispatcher)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Calling activateDispatcherControlPlane here for a dispatcher with epoch > 0 will trigger a HandshakeEvent. However, dispatcher was initialized with nil for startTableInfo at line 1349. This means the handshake event will be sent without table information, which might cause issues for the receiver if it expects metadata for a newly registered table. Consider if addDispatcher should fetch table info when epoch > 0, similar to how resetDispatcher does at line 1531.

log.Info("register dispatcher",
zap.Uint64("clusterID", c.tidbClusterID),
zap.Stringer("changefeedID", changefeedID),
Expand Down Expand Up @@ -1568,6 +1576,7 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error {
zap.Uint64("newStartTs", dispatcherInfo.GetStartTs()),
zap.Uint64("newEpoch", newStat.epoch),
zap.Duration("resetTime", time.Since(start)))
c.activateDispatcherControlPlane(newStat)

return nil
}
Expand Down
76 changes: 75 additions & 1 deletion pkg/eventservice/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ type notifyMsg struct {
latestCommitTs uint64
}

func collectWrapEventTypes(ch chan *wrapEvent) []int {
eventTypes := make([]int, 0)
for {
select {
case e := <-ch:
eventTypes = append(eventTypes, e.msgType)
default:
return eventTypes
}
}
}

type floorRetryMockUint64 struct {
value atomic.Uint64
failFirstCASWithValue uint64
Expand Down Expand Up @@ -148,6 +160,64 @@ func TestCheckNeedScan(t *testing.T) {
log.Info("Pass case 3")
}

func TestAddDispatcherSendsReadyImmediatelyAfterRegistration(t *testing.T) {
broker, _, _, _ := newEventBrokerForTest()
broker.close()

info := newMockDispatcherInfoForTest(t)
err := broker.addDispatcher(info)
require.NoError(t, err)

dispPtr := broker.getDispatcher(info.GetID())
require.NotNil(t, dispPtr)
disp := dispPtr.Load()
require.NotNil(t, disp)

eventTypes := collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex])
require.Equal(t, []int{event.TypeReadyEvent}, eventTypes)
}

func TestResetDispatcherSendsHandshakeImmediatelyAfterEpochSwitch(t *testing.T) {
broker, _, _, _ := newEventBrokerForTest()
broker.close()

info := newMockDispatcherInfoForTest(t)
err := broker.addDispatcher(info)
require.NoError(t, err)

disp := broker.getDispatcher(info.GetID()).Load()
require.NotNil(t, disp)
collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex])

resetInfo := newMockDispatcherInfo(t, 500, info.GetID(), info.GetTableSpan().GetTableID(), eventpb.ActionType_ACTION_TYPE_RESET)
resetInfo.epoch = 1
err = broker.resetDispatcher(resetInfo)
require.NoError(t, err)

newDisp := broker.getDispatcher(info.GetID()).Load()
require.NotNil(t, newDisp)
require.Equal(t, uint64(1), newDisp.epoch)

eventTypes := collectWrapEventTypes(broker.messageCh[newDisp.messageWorkerIndex])
require.Equal(t, []int{event.TypeHandshakeEvent}, eventTypes)
}

func TestAddTableTriggerDispatcherDoesNotSendReadyImmediately(t *testing.T) {
broker, _, _, _ := newEventBrokerForTest()
broker.close()

info := newMockDispatcherInfo(t, 300, common.NewDispatcherID(), 0, eventpb.ActionType_ACTION_TYPE_REGISTER)
info.span = common.KeyspaceDDLSpan(testTableTriggerKeyspaceID)
err := broker.addDispatcher(info)
require.NoError(t, err)

disp := broker.getDispatcher(info.GetID()).Load()
require.NotNil(t, disp)

eventTypes := collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex])
require.Empty(t, eventTypes)
}

func TestOnNotify(t *testing.T) {
broker, _, ss, _ := newEventBrokerForTest()
// Close the broker, so we can catch all message in the test.
Expand Down Expand Up @@ -1346,7 +1416,7 @@ func TestSendHandshakeUsesStartTs(t *testing.T) {

func TestAddDispatcherFailure(t *testing.T) {
broker, _, ss, _ := newEventBrokerForTest()
defer broker.close()
broker.close()

// Simulate schema store failure
ss.registerTableError = errors.New("mock error")
Expand All @@ -1357,6 +1427,10 @@ func TestAddDispatcherFailure(t *testing.T) {

_, ok := broker.changefeedMap.Load(dispInfo.GetChangefeedID())
require.False(t, ok, "changefeedStatus should be removed after failed registration")
require.Nil(t, broker.getDispatcher(dispInfo.GetID()))
for _, ch := range broker.messageCh {
require.Empty(t, collectWrapEventTypes(ch))
}
}

type tableTriggerSchemaStore struct {
Expand Down