diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index ecd03fb3bf..de8deecef2 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -574,10 +574,9 @@ func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) error { } func (c *eventBroker) processTableTriggerDispatcher(ctx context.Context, dispatcherID common.DispatcherID, stat *dispatcherStat) { - if !c.checkAndSendReady(stat) { + if !c.activateDispatcherControlPlane(stat) { return } - c.sendHandshakeIfNeed(stat) startTs := stat.sentResolvedTs.Load() remoteID := node.ID(stat.info.GetServerID()) @@ -776,23 +775,31 @@ func (c *eventBroker) capCommitTsEndBySyncPoint(task scanTask, commitTsEnd uint6 // Note: A true return value only indicates potential scanning need, // final determination occurs when the scanTask is actully processed. func (c *eventBroker) scanReady(task scanTask) bool { - if task.isRemoved.Load() { + if task.isTaskScanning.Load() { return false } - if task.isTaskScanning.Load() { + if !c.activateDispatcherControlPlane(task) { + return false + } + + ok, _ := c.getScanTaskDataRange(task) + return ok +} + +// activateDispatcherControlPlane advances the dispatcher through ready/handshake +// without coupling it to scan task generation. +func (c *eventBroker) activateDispatcherControlPlane(task scanTask) bool { + if task.isRemoved.Load() { return false } - // If the dispatcher is not ready, we don't need do the scan. if !c.checkAndSendReady(task) { return false } c.sendHandshakeIfNeed(task) - - ok, _ := c.getScanTaskDataRange(task) - return ok + return true } func (c *eventBroker) checkAndSendReady(task scanTask) bool { @@ -1416,6 +1423,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error { } c.dispatchers.Store(id, dispatcherPtr) c.metricsCollector.metricDispatcherCount.Inc() + c.activateDispatcherControlPlane(dispatcher) log.Info("register dispatcher", zap.Uint64("clusterID", c.tidbClusterID), zap.Stringer("changefeedID", changefeedID), @@ -1568,6 +1576,7 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error { zap.Uint64("newStartTs", dispatcherInfo.GetStartTs()), zap.Uint64("newEpoch", newStat.epoch), zap.Duration("resetTime", time.Since(start))) + c.activateDispatcherControlPlane(newStat) return nil } diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 637bee5ec2..79b65e5159 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -62,6 +62,18 @@ type notifyMsg struct { latestCommitTs uint64 } +func collectWrapEventTypes(ch chan *wrapEvent) []int { + eventTypes := make([]int, 0) + for { + select { + case e := <-ch: + eventTypes = append(eventTypes, e.msgType) + default: + return eventTypes + } + } +} + type floorRetryMockUint64 struct { value atomic.Uint64 failFirstCASWithValue uint64 @@ -148,6 +160,64 @@ func TestCheckNeedScan(t *testing.T) { log.Info("Pass case 3") } +func TestAddDispatcherSendsReadyImmediatelyAfterRegistration(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + + info := newMockDispatcherInfoForTest(t) + err := broker.addDispatcher(info) + require.NoError(t, err) + + dispPtr := broker.getDispatcher(info.GetID()) + require.NotNil(t, dispPtr) + disp := dispPtr.Load() + require.NotNil(t, disp) + + eventTypes := collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex]) + require.Equal(t, []int{event.TypeReadyEvent}, eventTypes) +} + +func TestResetDispatcherSendsHandshakeImmediatelyAfterEpochSwitch(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + + info := newMockDispatcherInfoForTest(t) + err := broker.addDispatcher(info) + require.NoError(t, err) + + disp := broker.getDispatcher(info.GetID()).Load() + require.NotNil(t, disp) + collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex]) + + resetInfo := newMockDispatcherInfo(t, 500, info.GetID(), info.GetTableSpan().GetTableID(), eventpb.ActionType_ACTION_TYPE_RESET) + resetInfo.epoch = 1 + err = broker.resetDispatcher(resetInfo) + require.NoError(t, err) + + newDisp := broker.getDispatcher(info.GetID()).Load() + require.NotNil(t, newDisp) + require.Equal(t, uint64(1), newDisp.epoch) + + eventTypes := collectWrapEventTypes(broker.messageCh[newDisp.messageWorkerIndex]) + require.Equal(t, []int{event.TypeHandshakeEvent}, eventTypes) +} + +func TestAddTableTriggerDispatcherDoesNotSendReadyImmediately(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + + info := newMockDispatcherInfo(t, 300, common.NewDispatcherID(), 0, eventpb.ActionType_ACTION_TYPE_REGISTER) + info.span = common.KeyspaceDDLSpan(testTableTriggerKeyspaceID) + err := broker.addDispatcher(info) + require.NoError(t, err) + + disp := broker.getDispatcher(info.GetID()).Load() + require.NotNil(t, disp) + + eventTypes := collectWrapEventTypes(broker.messageCh[disp.messageWorkerIndex]) + require.Empty(t, eventTypes) +} + func TestOnNotify(t *testing.T) { broker, _, ss, _ := newEventBrokerForTest() // Close the broker, so we can catch all message in the test. @@ -1346,7 +1416,7 @@ func TestSendHandshakeUsesStartTs(t *testing.T) { func TestAddDispatcherFailure(t *testing.T) { broker, _, ss, _ := newEventBrokerForTest() - defer broker.close() + broker.close() // Simulate schema store failure ss.registerTableError = errors.New("mock error") @@ -1357,6 +1427,10 @@ func TestAddDispatcherFailure(t *testing.T) { _, ok := broker.changefeedMap.Load(dispInfo.GetChangefeedID()) require.False(t, ok, "changefeedStatus should be removed after failed registration") + require.Nil(t, broker.getDispatcher(dispInfo.GetID())) + for _, ch := range broker.messageCh { + require.Empty(t, collectWrapEventTypes(ch)) + } } type tableTriggerSchemaStore struct {