From 11be67e35504e1053f39105d2da0b4d0cabb8eec Mon Sep 17 00:00:00 2001 From: wk989898 Date: Mon, 30 Mar 2026 08:58:50 +0000 Subject: [PATCH] init Signed-off-by: wk989898 --- .../dispatcherorchestrator/dispatcher_orchestrator.go | 2 +- maintainer/maintainer_manager.go | 10 ++++------ maintainer/maintainer_test.go | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go index f91e5222f2..7d2ca64ca0 100644 --- a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go +++ b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go @@ -362,7 +362,7 @@ func (m *DispatcherOrchestrator) handleCloseRequest( log.Info("try close dispatcher manager", zap.String("changefeed", cfId.String()), zap.Bool("success", response.Success)) - return m.sendResponse(from, messaging.MaintainerTopic, response) + return m.sendResponse(from, messaging.MaintainerManagerTopic, response) } func createBootstrapResponse( diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 469dc31fc1..d25352e8df 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -70,11 +70,6 @@ func NewMaintainerManager( } mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages) - mc.RegisterHandler(messaging.MaintainerTopic, - func(ctx context.Context, msg *messaging.TargetMessage) error { - req := msg.Message[0].(*heartbeatpb.MaintainerCloseResponse) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) - }) return m } @@ -91,13 +86,16 @@ func (m *Manager) recvMessages(ctx context.Context, msg *messaging.TargetMessage case m.msgCh <- msg: } return nil - // receive bootstrap response message from the dispatcher manager + // receive bootstrap response message from the dispatcher manager case messaging.TypeMaintainerBootstrapResponse: req := msg.Message[0].(*heartbeatpb.MaintainerBootstrapResponse) return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) case messaging.TypeMaintainerPostBootstrapResponse: req := msg.Message[0].(*heartbeatpb.MaintainerPostBootstrapResponse) return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + case messaging.TypeMaintainerCloseResponse: + req := msg.Message[0].(*heartbeatpb.MaintainerCloseResponse) + return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) // receive heartbeat message from dispatchers case messaging.TypeHeartBeatRequest: req := msg.Message[0].(*heartbeatpb.HeartBeatRequest) diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index d9029f8b5a..733c818109 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -232,7 +232,7 @@ func (m *mockDispatcherManager) onDispatchRequest( func (m *mockDispatcherManager) onMaintainerCloseRequest(msg *messaging.TargetMessage) { _ = m.mc.SendCommand(messaging.NewSingleTargetMessage(msg.From, - messaging.MaintainerTopic, &heartbeatpb.MaintainerCloseResponse{ + messaging.MaintainerManagerTopic, &heartbeatpb.MaintainerCloseResponse{ ChangefeedID: msg.Message[0].(*heartbeatpb.MaintainerCloseRequest).ChangefeedID, Success: true, }))