diff --git a/api/v2/model.go b/api/v2/model.go index 3c222c7166..e7fb540e89 100644 --- a/api/v2/model.go +++ b/api/v2/model.go @@ -342,6 +342,8 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( IndexName: rule.IndexName, Columns: rule.Columns, TopicRule: rule.TopicRule, + TargetSchema: rule.TargetSchema, + TargetTable: rule.TargetTable, }) } var columnSelectors []*config.ColumnSelector @@ -700,6 +702,8 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { IndexName: rule.IndexName, Columns: rule.Columns, TopicRule: rule.TopicRule, + TargetSchema: rule.TargetSchema, + TargetTable: rule.TargetTable, }) } var columnSelectors []*ColumnSelector @@ -1190,6 +1194,21 @@ type DispatchRule struct { IndexName string `json:"index,omitempty"` Columns []string `json:"columns,omitempty"` TopicRule string `json:"topic,omitempty"` + + // TargetSchema sets the routed downstream schema name. + // Leave it empty to keep the source schema name. + // For example, if the source table is `sales`.`orders`, `target-schema = "sales_bak"` + // writes to `sales_bak`.`orders`. + // You can also use placeholders. For example, `target-schema = "{schema}_bak"` + // the target schema becomes `sales_bak`. + TargetSchema string `json:"target-schema,omitempty"` + // TargetTable sets the routed downstream table name. + // Leave it empty to keep the source table name. + // For example, if the source table is `sales`.`orders`, `target-table = "orders_bak"` + // writes to `sales`.`orders_bak`. + // You can also use placeholders. For example, `target-table = "{schema}_{table}"` + // becomes `sales_orders`. + TargetTable string `json:"target-table,omitempty"` } // ColumnSelector represents a column selector for a table. diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index 2880ae14b2..a42b83ada9 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/ticdc/downstreamadapter/routing" "github.com/pingcap/ticdc/downstreamadapter/sink" "github.com/pingcap/ticdc/eventpb" "github.com/pingcap/ticdc/heartbeatpb" @@ -43,6 +44,7 @@ type DispatcherService interface { GetBDRMode() bool GetChangefeedID() common.ChangeFeedID GetTableSpan() *heartbeatpb.TableSpan + GetRouter() *routing.Router GetTimezone() string GetIntegrityConfig() *eventpb.IntegrityConfig GetFilterConfig() *eventpb.FilterConfig diff --git a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go index 8f95a93b5b..1b499338bd 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go @@ -142,6 +142,7 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive nil, nil, false, + nil, // router statuses, blockStatuses, errCh, diff --git a/downstreamadapter/dispatcher/basic_dispatcher_info.go b/downstreamadapter/dispatcher/basic_dispatcher_info.go index 19a0980461..2432b4a060 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_info.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_info.go @@ -17,6 +17,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/ticdc/downstreamadapter/routing" "github.com/pingcap/ticdc/downstreamadapter/syncpoint" "github.com/pingcap/ticdc/eventpb" "github.com/pingcap/ticdc/heartbeatpb" @@ -54,6 +55,11 @@ type SharedInfo struct { // will break the splittability of this table. enableSplittableCheck bool + // router is used to route source schema/table names to target schema/table names. + // It is used to apply routing to TableInfo before storing it. + // May be nil if no routing rules are configured. + router *routing.Router + // Shared resources // statusesChan is used to store the status of dispatchers when status changed // and push to heartbeatRequestQueue @@ -87,6 +93,7 @@ func NewSharedInfo( syncPointConfig *syncpoint.SyncPointConfig, txnAtomicity *config.AtomicityLevel, enableSplittableCheck bool, + router *routing.Router, statusesChan chan TableSpanStatusWithSeq, blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus, errCh chan error, @@ -101,6 +108,7 @@ func NewSharedInfo( filterConfig: filterConfig, syncPointConfig: syncPointConfig, enableSplittableCheck: enableSplittableCheck, + router: router, statusesChan: statusesChan, blockStatusesChan: blockStatusesChan, blockExecutor: newBlockEventExecutor(), @@ -176,6 +184,13 @@ func (d *BasicDispatcher) IsOutputRawChangeEvent() bool { return d.sharedInfo.outputRawChangeEvent } +func (d *BasicDispatcher) GetRouter() *routing.Router { + if d.sharedInfo == nil { + return nil + } + return d.sharedInfo.GetRouter() +} + func (d *BasicDispatcher) GetFilterConfig() *eventpb.FilterConfig { return d.sharedInfo.filterConfig } @@ -266,6 +281,12 @@ func (s *SharedInfo) GetBlockEventExecutor() *blockEventExecutor { return s.blockExecutor } +// GetRouter returns the router for schema/table name routing. +// May return nil if no routing rules are configured. +func (s *SharedInfo) GetRouter() *routing.Router { + return s.router +} + func (s *SharedInfo) Close() { if s.blockExecutor != nil { s.blockExecutor.Close() diff --git a/downstreamadapter/dispatcher/event_dispatcher_test.go b/downstreamadapter/dispatcher/event_dispatcher_test.go index 29b8ace3e2..7244508014 100644 --- a/downstreamadapter/dispatcher/event_dispatcher_test.go +++ b/downstreamadapter/dispatcher/event_dispatcher_test.go @@ -82,6 +82,7 @@ func newDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan) *Eve }, // syncPointConfig &defaultAtomicity, false, // enableSplittableCheck + nil, // router make(chan TableSpanStatusWithSeq, 128), make(chan *heartbeatpb.TableSpanBlockStatus, 128), make(chan error, 1), @@ -1003,6 +1004,7 @@ func TestDispatcherSplittableCheck(t *testing.T) { }, &defaultAtomicity, true, // enableSplittableCheck = true + nil, // router make(chan TableSpanStatusWithSeq, 128), make(chan *heartbeatpb.TableSpanBlockStatus, 128), make(chan error, 1), @@ -1113,6 +1115,7 @@ func TestDispatcher_SkipDMLAsStartTs_FilterCorrectly(t *testing.T) { }, &defaultAtomicity, false, + nil, // router make(chan TableSpanStatusWithSeq, 128), make(chan *heartbeatpb.TableSpanBlockStatus, 128), make(chan error, 1), @@ -1193,6 +1196,7 @@ func TestDispatcher_SkipDMLAsStartTs_Disabled(t *testing.T) { }, &defaultAtomicity, false, + nil, // router make(chan TableSpanStatusWithSeq, 128), make(chan *heartbeatpb.TableSpanBlockStatus, 128), make(chan error, 1), diff --git a/downstreamadapter/dispatcher/redo_dispatcher_test.go b/downstreamadapter/dispatcher/redo_dispatcher_test.go index 68f97016b7..cb8610b074 100644 --- a/downstreamadapter/dispatcher/redo_dispatcher_test.go +++ b/downstreamadapter/dispatcher/redo_dispatcher_test.go @@ -47,6 +47,7 @@ func newRedoDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan) nil, // redo dispatcher doesn't need syncPointConfig &defaultAtomicity, false, // enableSplittableCheck + nil, // router make(chan TableSpanStatusWithSeq, 128), make(chan *heartbeatpb.TableSpanBlockStatus, 128), make(chan error, 1), diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager.go b/downstreamadapter/dispatchermanager/dispatcher_manager.go index 863bd5310b..f5491f3dbc 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/downstreamadapter/dispatcher" "github.com/pingcap/ticdc/downstreamadapter/eventcollector" + "github.com/pingcap/ticdc/downstreamadapter/routing" "github.com/pingcap/ticdc/downstreamadapter/sink" "github.com/pingcap/ticdc/downstreamadapter/sink/mysql" "github.com/pingcap/ticdc/downstreamadapter/sink/redo" @@ -232,6 +233,14 @@ func NewDispatcherManager( } var err error + var router *routing.Router + if manager.config.SinkConfig != nil { + router, err = routing.NewRouter(manager.config.CaseSensitive, manager.config.SinkConfig.DispatchRules) + if err != nil { + return nil, errors.Trace(err) + } + } + manager.sink, err = sink.New(ctx, manager.config, manager.changefeedID) if err != nil { return nil, errors.Trace(err) @@ -258,6 +267,7 @@ func NewDispatcherManager( syncPointConfig, manager.config.SinkConfig.TxnAtomicity, manager.config.EnableSplittableCheck, + router, make(chan dispatcher.TableSpanStatusWithSeq, 8192), make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024), make(chan error, 1), @@ -365,7 +375,10 @@ func (e *DispatcherManager) InitalizeTableTriggerEventDispatcher(schemaInfo []*h } // table trigger event dispatcher can register to event collector to receive events after finish the initial table schema store from the maintainer. - appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher(e.GetTableTriggerEventDispatcher(), e.sinkQuota) + appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher( + e.GetTableTriggerEventDispatcher(), + e.sinkQuota, + ) // The table trigger event dispatcher needs changefeed-level checkpoint updates only // when downstream components must maintain table names (for non-MySQL sinks), or @@ -480,7 +493,10 @@ func (e *DispatcherManager) newEventDispatchers(infos map[common.DispatcherID]di // we don't register table trigger event dispatcher in event collector, when created. // Table trigger event dispatcher is a special dispatcher, // it need to wait get the initial table schema store from the maintainer, then will register to event collector to receive events. - appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher(d, e.sinkQuota) + appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher( + d, + e.sinkQuota, + ) } seq := e.dispatcherMap.Set(id, d) diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager_redo.go b/downstreamadapter/dispatchermanager/dispatcher_manager_redo.go index 8412716ac7..92452847af 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager_redo.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager_redo.go @@ -170,7 +170,10 @@ func (e *DispatcherManager) newRedoDispatchers(infos map[common.DispatcherID]dis e.SetTableTriggerRedoDispatcher(rd) } else { e.redoSchemaIDToDispatchers.Set(schemaIds[idx], id) - appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher(rd, e.redoQuota) + appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher( + rd, + e.redoQuota, + ) } redoSeq := e.redoDispatcherMap.Set(rd.GetId(), rd) @@ -280,7 +283,10 @@ func (e *DispatcherManager) InitalizeTableTriggerRedoDispatcher(schemaInfo []*he if !needAddDispatcher { return nil } - appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher(e.GetTableTriggerRedoDispatcher(), e.redoQuota) + appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher( + e.GetTableTriggerRedoDispatcher(), + e.redoQuota, + ) return nil } diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager_test.go b/downstreamadapter/dispatchermanager/dispatcher_manager_test.go index d26a6193d8..f106a2d10f 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager_test.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager_test.go @@ -79,6 +79,7 @@ func createTestDispatcher(t *testing.T, manager *DispatcherManager, id common.Di nil, &defaultAtomicity, false, + nil, // router make(chan dispatcher.TableSpanStatusWithSeq, 1), make(chan *heartbeatpb.TableSpanBlockStatus, 1), make(chan error, 1), @@ -140,6 +141,7 @@ func createTestManager(t *testing.T) *DispatcherManager { nil, // syncPointConfig &defaultAtomicity, false, + nil, // router make(chan dispatcher.TableSpanStatusWithSeq, 8192), make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024), make(chan error, 1), diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 50025b36cc..dba91b9fe0 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -479,10 +479,34 @@ func (d *dispatcherStat) handleSingleDataEvents(events []dispatcher.DispatcherEv if !d.filterAndUpdateEventByCommitTs(events[0]) { return false } - ddl := events[0].Event.(*commonEvent.DDLEvent) + originalDDL := events[0].Event.(*commonEvent.DDLEvent) + ddl, err := d.applyRoutingToDDLEvent(originalDDL) + if err != nil { + log.Error("failed to apply routing to DDL event", + zap.Stringer("changefeedID", d.target.GetChangefeedID()), + zap.Stringer("dispatcher", d.getDispatcherID()), + zap.Error(err)) + return false + } + events[0].Event = ddl d.tableInfoVersion.Store(ddl.FinishedTs) if ddl.TableInfo != nil { - d.tableInfo.Store(ddl.TableInfo) + // Check if this DDL's TableInfo is for a different table. + // This can happen with CREATE TABLE LIKE, where the DDL is added to the + // reference table's history for blocking purposes, but the TableInfo is + // for the new table. In this case, we should NOT update the stored tableInfo. + dispatcherTableID := d.target.GetTableSpan().TableID + ddlTableID := ddl.TableInfo.TableName.TableID + if ddlTableID != dispatcherTableID { + log.Debug("DDL TableInfo is for a different table, skipping tableInfo update", + zap.Stringer("changefeedID", d.target.GetChangefeedID()), + zap.Stringer("dispatcher", d.getDispatcherID()), + zap.Int64("dispatcherTableID", dispatcherTableID), + zap.Int64("ddlTableID", ddlTableID), + zap.String("ddlTableName", ddl.TableInfo.TableName.Table)) + } else { + d.tableInfo.Store(ddl.TableInfo) + } } return d.target.HandleEvents(events, func() { d.wake() }) } else { @@ -637,7 +661,7 @@ func (d *dispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent) } tableInfo := handshakeEvent.TableInfo if tableInfo != nil { - d.tableInfo.Store(tableInfo) + d.tableInfo.Store(d.applyRoutingToTableInfo(tableInfo)) } d.lastEventSeq.Store(handshakeEvent.Seq) } @@ -730,3 +754,15 @@ func (d *dispatcherStat) newDispatcherRemoveRequest(serverId string) *messaging. }, } } + +// applyRoutingToTableInfo applies routing rules to the TableInfo and returns a new TableInfo +// with TargetSchema/TargetTable set. If no routing is needed (no router configured, or routing +// results in same schema/table), returns the original tableInfo unchanged. +// This avoids mutating shared TableInfo objects that may be used by multiple changefeeds. +func (d *dispatcherStat) applyRoutingToTableInfo(tableInfo *common.TableInfo) *common.TableInfo { + return d.target.GetRouter().ApplyToTableInfo(tableInfo) +} + +func (d *dispatcherStat) applyRoutingToDDLEvent(ddl *commonEvent.DDLEvent) (*commonEvent.DDLEvent, error) { + return d.target.GetRouter().ApplyToDDLEvent(ddl, d.target.GetChangefeedID()) +} diff --git a/downstreamadapter/eventcollector/dispatcher_stat_test.go b/downstreamadapter/eventcollector/dispatcher_stat_test.go index 203fea8874..acb5106552 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat_test.go +++ b/downstreamadapter/eventcollector/dispatcher_stat_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/ticdc/downstreamadapter/dispatcher" + "github.com/pingcap/ticdc/downstreamadapter/routing" "github.com/pingcap/ticdc/eventpb" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" @@ -45,6 +46,7 @@ type mockDispatcher struct { checkPointTs uint64 skipSyncpointAtStartTs bool + router *routing.Router } func newMockDispatcher(id common.DispatcherID, startTs uint64) *mockDispatcher { @@ -131,6 +133,10 @@ func (m *mockDispatcher) IsOutputRawChangeEvent() bool { return false } +func (m *mockDispatcher) GetRouter() *routing.Router { + return m.router +} + // mockEvent implements the Event interface for testing type mockEvent struct { eventType int @@ -1340,3 +1346,373 @@ func TestRegisterTo(t *testing.T) { } }) } + +func TestApplyRoutingToTableInfo(t *testing.T) { + t.Parallel() + + localServerID := node.ID("local") + remoteServerID := node.ID("remote") + + // Create a router that routes source_db.* -> target_db.* + router, err := routing.NewRouter(true, []*config.DispatchRule{ + {Matcher: []string{"source_db.*"}, TargetSchema: "target_db", TargetTable: routing.TablePlaceholder}, + }) + require.NoError(t, err) + + t.Run("DDL with TableInfo gets routing applied and stored", func(t *testing.T) { + // Capture the event passed to HandleEvents to verify routing was applied + var capturedEvent *commonEvent.DDLEvent + mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) + mockDisp.router = router + mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { + if len(events) > 0 { + capturedEvent = events[0].Event.(*commonEvent.DDLEvent) + } + return false + } + + stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) + stat.connState.setEventServiceID(remoteServerID) + stat.epoch.Store(10) + stat.lastEventSeq.Store(1) + stat.lastEventCommitTs.Store(50) + + // Create original TableInfo - should NOT be mutated + // Use TableID: 1 to match the mockDispatcher's TableSpan.TableID + originalTableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "source_db", + Table: "users", + TableID: 1, + }, + } + + ddlEvent := &commonEvent.DDLEvent{ + Version: commonEvent.DDLEventVersion1, + FinishedTs: 100, + Epoch: 10, + Seq: 2, + TableInfo: originalTableInfo, + } + + events := []dispatcher.DispatcherEvent{ + {From: &remoteServerID, Event: ddlEvent}, + } + + stat.handleDataEvents(events...) + + // Verify the stored tableInfo has routing applied + storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) + require.NotNil(t, storedTableInfo) + require.Equal(t, "target_db", storedTableInfo.TableName.TargetSchema) + require.Equal(t, "users", storedTableInfo.TableName.TargetTable) + + // Verify original TableInfo was NOT mutated + require.Equal(t, "", originalTableInfo.TableName.TargetSchema) + require.Equal(t, "", originalTableInfo.TableName.TargetTable) + + // Verify original ddlEvent was NOT mutated (due to CloneForRouting) + require.Equal(t, "", ddlEvent.TableInfo.TableName.TargetSchema) + + // Verify the cloned event passed to HandleEvents has routing applied + require.NotNil(t, capturedEvent) + require.Equal(t, "target_db", capturedEvent.TableInfo.TableName.TargetSchema) + }) + + t.Run("DDL with MultipleTableInfos gets routing applied but only TableInfo stored", func(t *testing.T) { + // Capture the event passed to HandleEvents to verify routing was applied + var capturedEvent *commonEvent.DDLEvent + mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) + mockDisp.router = router + mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { + if len(events) > 0 { + capturedEvent = events[0].Event.(*commonEvent.DDLEvent) + } + return false + } + + stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) + stat.connState.setEventServiceID(remoteServerID) + stat.epoch.Store(10) + stat.lastEventSeq.Store(1) + stat.lastEventCommitTs.Store(50) + + // This dispatcher's table - use TableID: 1 to match mockDispatcher's TableSpan.TableID + primaryTableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "source_db", + Table: "orders", + TableID: 1, + }, + } + + // Other tables in a multi-table DDL (e.g., RENAME TABLE) + otherTableInfo1 := &common.TableInfo{ + TableName: common.TableName{ + Schema: "source_db", + Table: "old_name", + TableID: 200, + }, + } + otherTableInfo2 := &common.TableInfo{ + TableName: common.TableName{ + Schema: "source_db", + Table: "new_name", + TableID: 201, + }, + } + + ddlEvent := &commonEvent.DDLEvent{ + Version: commonEvent.DDLEventVersion1, + FinishedTs: 100, + Epoch: 10, + Seq: 2, + TableInfo: primaryTableInfo, + MultipleTableInfos: []*common.TableInfo{otherTableInfo1, otherTableInfo2}, + } + + events := []dispatcher.DispatcherEvent{ + {From: &remoteServerID, Event: ddlEvent}, + } + + stat.handleDataEvents(events...) + + // Verify only the primary TableInfo is stored + storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) + require.NotNil(t, storedTableInfo) + require.Equal(t, "orders", storedTableInfo.TableName.Table) + require.Equal(t, "target_db", storedTableInfo.TableName.TargetSchema) + + // Verify original ddlEvent was NOT mutated (due to CloneForRouting) + require.Equal(t, "", ddlEvent.MultipleTableInfos[0].TableName.TargetSchema) + require.Equal(t, "", ddlEvent.MultipleTableInfos[1].TableName.TargetSchema) + + // Verify the cloned event passed to HandleEvents has routing applied + require.NotNil(t, capturedEvent) + require.Equal(t, "target_db", capturedEvent.MultipleTableInfos[0].TableName.TargetSchema) + require.Equal(t, "target_db", capturedEvent.MultipleTableInfos[1].TableName.TargetSchema) + + // Verify originals were NOT mutated + require.Equal(t, "", otherTableInfo1.TableName.TargetSchema) + require.Equal(t, "", otherTableInfo2.TableName.TargetSchema) + }) + + t.Run("DDL without routing configured passes through unchanged", func(t *testing.T) { + mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) + // No router configured + mockDisp.router = nil + mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { + return false + } + + stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) + stat.connState.setEventServiceID(remoteServerID) + stat.epoch.Store(10) + stat.lastEventSeq.Store(1) + stat.lastEventCommitTs.Store(50) + + tableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "source_db", + Table: "users", + TableID: 1, + }, + } + + ddlEvent := &commonEvent.DDLEvent{ + Version: commonEvent.DDLEventVersion1, + FinishedTs: 100, + Epoch: 10, + Seq: 2, + TableInfo: tableInfo, + } + + events := []dispatcher.DispatcherEvent{ + {From: &remoteServerID, Event: ddlEvent}, + } + + stat.handleDataEvents(events...) + + // Verify tableInfo is stored (same object since no routing) + storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) + require.NotNil(t, storedTableInfo) + // No routing applied, so TargetSchema/TargetTable should be empty + require.Equal(t, "", storedTableInfo.TableName.TargetSchema) + require.Equal(t, "", storedTableInfo.TableName.TargetTable) + }) + + t.Run("DDL with table-only routing (schema unchanged)", func(t *testing.T) { + // Create a router that only renames the table, keeping schema the same + tableOnlyRouter, err := routing.NewRouter(true, []*config.DispatchRule{ + {Matcher: []string{"mydb.old_users"}, TargetSchema: "{schema}", TargetTable: "new_users"}, + }) + require.NoError(t, err) + + mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) + mockDisp.router = tableOnlyRouter + mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { + return false + } + + stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) + stat.connState.setEventServiceID(remoteServerID) + stat.epoch.Store(10) + stat.lastEventSeq.Store(1) + stat.lastEventCommitTs.Store(50) + + originalTableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "mydb", + Table: "old_users", + TableID: 1, + }, + } + + ddlEvent := &commonEvent.DDLEvent{ + Version: commonEvent.DDLEventVersion1, + FinishedTs: 100, + Epoch: 10, + Seq: 2, + TableInfo: originalTableInfo, + } + + events := []dispatcher.DispatcherEvent{ + {From: &remoteServerID, Event: ddlEvent}, + } + + stat.handleDataEvents(events...) + + // Verify schema is unchanged but table is renamed + storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) + require.NotNil(t, storedTableInfo) + require.Equal(t, "mydb", storedTableInfo.TableName.TargetSchema) + require.Equal(t, "new_users", storedTableInfo.TableName.TargetTable) + + // Verify original was NOT mutated + require.Equal(t, "", originalTableInfo.TableName.TargetSchema) + require.Equal(t, "", originalTableInfo.TableName.TargetTable) + }) + + t.Run("DDL with both schema and table routing", func(t *testing.T) { + // Create a router that renames both schema and table + bothRouter, err := routing.NewRouter(true, []*config.DispatchRule{ + {Matcher: []string{"staging.*"}, TargetSchema: "prod", TargetTable: "{schema}_{table}"}, + }) + require.NoError(t, err) + + mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) + mockDisp.router = bothRouter + mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { + return false + } + + stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) + stat.connState.setEventServiceID(remoteServerID) + stat.epoch.Store(10) + stat.lastEventSeq.Store(1) + stat.lastEventCommitTs.Store(50) + + originalTableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "staging", + Table: "events", + TableID: 1, + }, + } + + ddlEvent := &commonEvent.DDLEvent{ + Version: commonEvent.DDLEventVersion1, + FinishedTs: 100, + Epoch: 10, + Seq: 2, + TableInfo: originalTableInfo, + } + + events := []dispatcher.DispatcherEvent{ + {From: &remoteServerID, Event: ddlEvent}, + } + + stat.handleDataEvents(events...) + + // Verify both schema and table are renamed + storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) + require.NotNil(t, storedTableInfo) + require.Equal(t, "prod", storedTableInfo.TableName.TargetSchema) + require.Equal(t, "staging_events", storedTableInfo.TableName.TargetTable) + + // Verify original was NOT mutated + require.Equal(t, "", originalTableInfo.TableName.TargetSchema) + require.Equal(t, "", originalTableInfo.TableName.TargetTable) + }) + + t.Run("CREATE TABLE LIKE DDL does not overwrite original table's tableInfo", func(t *testing.T) { + // This test verifies the fix for a bug where CREATE TABLE t_like LIKE t + // would cause the dispatcher for table 't' to incorrectly store t_like's + // tableInfo, leading to DMLs being written to the wrong table. + // + // The bug scenario: + // 1. CREATE TABLE t_like LIKE t adds the DDL to t's DDL history (for blocking) + // 2. When t's dispatcher processes this DDL, it would store t_like's TableInfo + // 3. Subsequent DMLs on t would use wrong tableInfo and go to t_like + // + // The fix: Check if DDL's tableInfo.TableID matches dispatcher's TableID + // before storing. If they don't match, skip the tableInfo update. + + mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) + mockDisp.router = router + mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool { + return false + } + + stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil) + stat.connState.setEventServiceID(remoteServerID) + stat.epoch.Store(10) + stat.lastEventSeq.Store(1) + stat.lastEventCommitTs.Store(50) + + // Set up initial tableInfo for the original table 't' (tableID=1, which matches mockDispatcher) + originalTableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "test", + Table: "t", + TableID: 1, // This is the dispatcher's table + }, + } + stat.tableInfo.Store(originalTableInfo) + + // CREATE TABLE t_like LIKE t generates a DDL event where: + // - TableInfo is for the new table (t_like, tableID=999) + // - This DDL is added to t's DDL history for blocking purposes + newTableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "test", + Table: "t_like", + TableID: 999, // Different from dispatcher's tableID! + }, + } + + ddlEvent := &commonEvent.DDLEvent{ + Version: commonEvent.DDLEventVersion1, + FinishedTs: 100, + Epoch: 10, + Seq: 2, + TableInfo: newTableInfo, + } + + events := []dispatcher.DispatcherEvent{ + {From: &remoteServerID, Event: ddlEvent}, + } + + stat.handleDataEvents(events...) + + // Verify the stored tableInfo was NOT overwritten with t_like's tableInfo + storedTableInfo := stat.tableInfo.Load().(*common.TableInfo) + require.NotNil(t, storedTableInfo) + // Should still be the original table 't', not 't_like' + require.Equal(t, "t", storedTableInfo.TableName.Table) + require.Equal(t, int64(1), storedTableInfo.TableName.TableID) + // Should NOT be t_like + require.NotEqual(t, "t_like", storedTableInfo.TableName.Table) + require.NotEqual(t, int64(999), storedTableInfo.TableName.TableID) + }) +} diff --git a/downstreamadapter/eventcollector/event_collector_test.go b/downstreamadapter/eventcollector/event_collector_test.go index 8256236b53..f7f4bedfd3 100644 --- a/downstreamadapter/eventcollector/event_collector_test.go +++ b/downstreamadapter/eventcollector/event_collector_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/ticdc/downstreamadapter/dispatcher" + "github.com/pingcap/ticdc/downstreamadapter/routing" "github.com/pingcap/ticdc/eventpb" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" @@ -116,6 +117,10 @@ func (m *mockEventDispatcher) IsOutputRawChangeEvent() bool { return false } +func (m *mockEventDispatcher) GetRouter() *routing.Router { + return nil +} + func newMessage(id node.ID, msg messaging.IOTypeT) *messaging.TargetMessage { targetMessage := messaging.NewSingleTargetMessage(id, messaging.EventCollectorTopic, msg) targetMessage.From = id @@ -163,6 +168,12 @@ func TestProcessMessage(t *testing.T) { dml.Seq = seq.Add(1) dml.Epoch = 1 dml.CommitTs = ddl.FinishedTs + uint64(i) + // TableInfoVersion is set during event processing to ddl.FinishedTs + // (from tableInfoVersion = max(d.tableInfoVersion.Load(), d.target.GetStartTs()) + // where d.tableInfoVersion is set when processing the DDL event). + // Since BatchDMLEvent is cloned during processing to avoid race conditions, + // the processed events are different objects, so we set this here on the expected events. + dml.TableInfoVersion = ddl.FinishedTs events[dml.Seq] = dml } diff --git a/downstreamadapter/routing/ddl_routing.go b/downstreamadapter/routing/ddl_routing.go new file mode 100644 index 0000000000..5c166fe5d6 --- /dev/null +++ b/downstreamadapter/routing/ddl_routing.go @@ -0,0 +1,110 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/util/filter" + "go.uber.org/zap" +) + +// rewriteDDLQueryWithRouting rewrites a DDL query by applying routing rules +// to transform source table names to target table names. +// +// It only returns the query string and whether the query text changed. +// Canonical DDL schema/table fields are rewritten separately by ApplyToDDLEvent. +func rewriteDDLQueryWithRouting( + router *Router, ddl *commonEvent.DDLEvent, changefeedID common.ChangeFeedID, +) (string, bool, error) { + if router == nil || ddl.Query == "" { + return ddl.Query, false, nil + } + + // Get the default schema for parsing. If TableInfo is nil (e.g., for + // database-level DDLs like CREATE DATABASE), FetchDDLTables will extract + // the schema name directly from the DDL statement itself. + var originSchema string + if ddl.TableInfo != nil { + originSchema = ddl.TableInfo.GetSchemaName() + } + + // Parse the DDL query using TiDB parser + p := parser.New() + stmt, err := p.ParseOneStmt(ddl.Query, "", "") + if err != nil { + log.Error("rewrite ddl failed due to parse ddl query", + zap.String("keyspace", changefeedID.Keyspace()), + zap.String("changefeed", changefeedID.Name()), + zap.String("query", ddl.Query), zap.Error(err)) + return "", false, errors.WrapError(errors.ErrTableRoutingFailed, err) + } + + // Fetch source tables from the DDL + sourceTables, err := fetchDDLTables(originSchema, stmt) + if err != nil { + log.Error("rewrite ddl failed due to fetch ddl tables", + zap.String("keyspace", changefeedID.Keyspace()), + zap.String("changefeed", changefeedID.Name()), + zap.String("query", ddl.Query), zap.Error(err)) + return "", false, errors.WrapError(errors.ErrTableRoutingFailed, err) + } + + if len(sourceTables) == 0 { + return ddl.Query, false, nil + } + + // Build target tables by applying routing rules + var ( + routed bool + targetTables = make([]*filter.Table, 0, len(sourceTables)) + ) + for _, srcTable := range sourceTables { + targetSchema, targetTable := router.Route(srcTable.Schema, srcTable.Name) + if targetSchema != srcTable.Schema || targetTable != srcTable.Name { + routed = true + } + targetTables = append(targetTables, &filter.Table{ + Schema: targetSchema, + Name: targetTable, + }) + } + + if !routed { + return ddl.Query, false, nil + } + + // Rewrite the DDL with target tables + newQuery, err := rewriteDDLQuery(stmt, targetTables) + if err != nil { + log.Error("rewrite ddl failed due to rewrite ddl query", + zap.String("keyspace", changefeedID.Keyspace()), + zap.String("changefeed", changefeedID.Name()), + zap.String("query", ddl.Query), zap.Any("targetTables", targetTables), zap.Error(err)) + return "", false, errors.WrapError(errors.ErrTableRoutingFailed, err) + } + + if newQuery != ddl.Query { + log.Info("DDL query rewritten with routing", + zap.String("keyspace", changefeedID.Keyspace()), + zap.String("changefeed", changefeedID.Name()), + zap.String("originalQuery", ddl.Query), + zap.String("newQuery", newQuery)) + } + + return newQuery, newQuery != ddl.Query, nil +} diff --git a/downstreamadapter/routing/ddl_routing_test.go b/downstreamadapter/routing/ddl_routing_test.go new file mode 100644 index 0000000000..814eeea4a6 --- /dev/null +++ b/downstreamadapter/routing/ddl_routing_test.go @@ -0,0 +1,171 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "testing" + + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestRewriteDDLQueryWithRouting(t *testing.T) { + t.Parallel() + + changefeedID := common.NewChangefeedID4Test(common.DefaultKeyspaceName, "test-changefeed") + tests := []struct { + name string + router *Router + ddl *event.DDLEvent + expectedChanged bool + expectedQuery string + requiredFragments []string + forbiddenFragment string + }{ + { + name: "no router keeps original query", + ddl: &event.DDLEvent{Query: "CREATE TABLE `source_db`.`test_table` (id INT PRIMARY KEY)", TableInfo: &common.TableInfo{TableName: common.TableName{Schema: "source_db", Table: "test_table"}}}, + expectedChanged: false, + expectedQuery: "CREATE TABLE `source_db`.`test_table` (id INT PRIMARY KEY)", + }, + { + name: "empty query stays empty", + ddl: &event.DDLEvent{}, + expectedChanged: false, + expectedQuery: "", + }, + { + name: "no matched rule keeps original query", + router: mustNewRouter(t, false, []*config.DispatchRule{{ + Matcher: []string{"source_db.*"}, + TargetSchema: "target_db", + TargetTable: TablePlaceholder, + }}), + ddl: &event.DDLEvent{ + Query: "CREATE TABLE `other_db`.`test_table` (id INT PRIMARY KEY)", + TableInfo: &common.TableInfo{ + TableName: common.TableName{Schema: "other_db", Table: "test_table"}, + }, + }, + expectedChanged: false, + expectedQuery: "CREATE TABLE `other_db`.`test_table` (id INT PRIMARY KEY)", + }, + { + name: "matched table ddl rewrites target table", + router: mustNewRouter(t, false, []*config.DispatchRule{{ + Matcher: []string{"source_db.*"}, + TargetSchema: "target_db", + TargetTable: "{table}_routed", + }}), + ddl: &event.DDLEvent{ + Query: "ALTER TABLE `source_db`.`test_table` ADD COLUMN c INT", + TableInfo: &common.TableInfo{ + TableName: common.TableName{Schema: "source_db", Table: "test_table"}, + }, + }, + expectedChanged: true, + requiredFragments: []string{"`target_db`.`test_table_routed`"}, + forbiddenFragment: "`source_db`.`test_table`", + }, + { + name: "rename ddl rewrites both tables", + router: mustNewRouter(t, false, []*config.DispatchRule{ + { + Matcher: []string{"db1.*"}, + TargetSchema: "target1", + TargetTable: TablePlaceholder, + }, + { + Matcher: []string{"db2.*"}, + TargetSchema: "target2", + TargetTable: TablePlaceholder, + }, + }), + ddl: &event.DDLEvent{ + Query: "RENAME TABLE `db1`.`t1` TO `db2`.`t2`", + TableInfo: &common.TableInfo{ + TableName: common.TableName{Schema: "db2", Table: "t2"}, + }, + MultipleTableInfos: []*common.TableInfo{ + {TableName: common.TableName{Schema: "db2", Table: "t2"}}, + {TableName: common.TableName{Schema: "db1", Table: "t1"}}, + }, + }, + expectedChanged: true, + requiredFragments: []string{"`target1`.`t1`", "`target2`.`t2`"}, + }, + { + name: "database ddl rewrites schema", + router: mustNewRouter(t, false, []*config.DispatchRule{{ + Matcher: []string{"source_db.*"}, + TargetSchema: "target_db", + }}), + ddl: &event.DDLEvent{ + Query: "CREATE DATABASE `source_db`", + }, + expectedChanged: true, + requiredFragments: []string{"`target_db`"}, + forbiddenFragment: "`source_db`", + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + newQuery, changed, err := rewriteDDLQueryWithRouting(tc.router, tc.ddl, changefeedID) + require.NoError(t, err) + require.Equal(t, tc.expectedChanged, changed) + if tc.expectedQuery != "" || tc.ddl.Query == "" { + require.Equal(t, tc.expectedQuery, newQuery) + } + for _, fragment := range tc.requiredFragments { + require.Contains(t, newQuery, fragment) + } + if tc.forbiddenFragment != "" { + require.NotContains(t, newQuery, tc.forbiddenFragment) + } + }) + } +} + +func TestRewriteDDLQueryWithRoutingReturnsTypedParseError(t *testing.T) { + t.Parallel() + + router := mustNewRouter(t, false, []*config.DispatchRule{{ + Matcher: []string{"source_db.*"}, + TargetSchema: "target_db", + TargetTable: TablePlaceholder, + }}) + + ddl := &event.DDLEvent{Query: "INVALID DDL"} + + _, _, err := rewriteDDLQueryWithRouting( + router, ddl, common.NewChangefeedID4Test(common.DefaultKeyspaceName, "test-changefeed"), + ) + require.Error(t, err) + code, ok := errors.RFCCode(err) + require.True(t, ok) + require.Equal(t, errors.ErrTableRoutingFailed.RFCCode(), code) +} + +func mustNewRouter(t *testing.T, caseSensitive bool, rules []*config.DispatchRule) *Router { + t.Helper() + + router, err := NewRouter(caseSensitive, rules) + require.NoError(t, err) + return router +} diff --git a/downstreamadapter/routing/ddl_table_utils.go b/downstreamadapter/routing/ddl_table_utils.go new file mode 100644 index 0000000000..41bd835ba4 --- /dev/null +++ b/downstreamadapter/routing/ddl_table_utils.go @@ -0,0 +1,177 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "bytes" + "fmt" + + "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/format" + "github.com/pingcap/tidb/pkg/util/filter" +) + +func genTableName(schema string, table string) *filter.Table { + return &filter.Table{Schema: schema, Name: table} +} + +// tableNameExtractor extracts table names from DDL AST nodes. +// ref: https://github.com/pingcap/tidb/blob/09feccb529be2830944e11f5fed474020f50370f/server/sql_info_fetcher.go#L46 +type tableNameExtractor struct { + curDB string + names []*filter.Table +} + +func (tne *tableNameExtractor) Enter(in ast.Node) (ast.Node, bool) { + if _, ok := in.(*ast.ReferenceDef); ok { + return in, true + } + if t, ok := in.(*ast.TableName); ok { + tb := &filter.Table{Schema: t.Schema.O, Name: t.Name.O} + if tb.Schema == "" { + tb.Schema = tne.curDB + } + tne.names = append(tne.names, tb) + return in, true + } + return in, false +} + +func (tne *tableNameExtractor) Leave(in ast.Node) (ast.Node, bool) { + return in, true +} + +// fetchDDLTables returns tables in DDL statement. +// Because we use visitor pattern, first tableName is always upper-most table in AST. +// Specifically: +// - for `CREATE TABLE ... LIKE` DDL, result contains [sourceTable, sourceRefTable] +// - for RENAME TABLE DDL, result contains [old1, new1, old2, new2, old3, new3, ...] because of TiDB parser +// - for other DDL, order of tableName is the node visit order. +func fetchDDLTables(schema string, stmt ast.StmtNode) ([]*filter.Table, error) { + switch stmt.(type) { + case ast.DDLNode: + default: + return nil, fmt.Errorf("unknown DDL type: %T", stmt) + } + + // Special cases: schema related SQLs don't have tableName + switch v := stmt.(type) { + case *ast.AlterDatabaseStmt: + dbName := v.Name.O + if dbName == "" { + dbName = schema + } + return []*filter.Table{{Schema: dbName, Name: ""}}, nil + case *ast.CreateDatabaseStmt: + return []*filter.Table{{Schema: v.Name.O, Name: ""}}, nil + case *ast.DropDatabaseStmt: + return []*filter.Table{{Schema: v.Name.O, Name: ""}}, nil + } + + e := &tableNameExtractor{ + curDB: schema, + names: make([]*filter.Table, 0), + } + stmt.Accept(e) + + return e.names, nil +} + +// tableRenameVisitor renames tables in DDL AST nodes. +type tableRenameVisitor struct { + targetNames []*filter.Table + i int + hasErr bool +} + +func (v *tableRenameVisitor) Enter(in ast.Node) (ast.Node, bool) { + if v.hasErr { + return in, true + } + if _, ok := in.(*ast.ReferenceDef); ok { + return in, true + } + if t, ok := in.(*ast.TableName); ok { + if v.i >= len(v.targetNames) { + v.hasErr = true + return in, true + } + t.Schema = ast.NewCIStr(v.targetNames[v.i].Schema) + t.Name = ast.NewCIStr(v.targetNames[v.i].Name) + v.i++ + return in, true + } + return in, false +} + +func (v *tableRenameVisitor) Leave(in ast.Node) (ast.Node, bool) { + if v.hasErr { + return in, false + } + return in, true +} + +// rewriteDDLQuery renames tables in DDL by given `targetTables`. +// Argument `targetTables` should have the same structure as the return value of fetchDDLTables. +// Returned DDL is formatted like StringSingleQuotes, KeyWordUppercase and NameBackQuotes. +func rewriteDDLQuery(stmt ast.StmtNode, targetTables []*filter.Table) (string, error) { + switch stmt.(type) { + case ast.DDLNode: + default: + return "", fmt.Errorf("unknown DDL type: %T", stmt) + } + + switch v := stmt.(type) { + case *ast.AlterDatabaseStmt: + if len(targetTables) != 1 { + return "", fmt.Errorf("failed to rewrite DDL: expected 1 target table, got %d", len(targetTables)) + } + v.Name = ast.NewCIStr(targetTables[0].Schema) + case *ast.CreateDatabaseStmt: + if len(targetTables) != 1 { + return "", fmt.Errorf("failed to rewrite DDL: expected 1 target table, got %d", len(targetTables)) + } + v.Name = ast.NewCIStr(targetTables[0].Schema) + case *ast.DropDatabaseStmt: + if len(targetTables) != 1 { + return "", fmt.Errorf("failed to rewrite DDL: expected 1 target table, got %d", len(targetTables)) + } + v.Name = ast.NewCIStr(targetTables[0].Schema) + default: + visitor := &tableRenameVisitor{ + targetNames: targetTables, + } + stmt.Accept(visitor) + if visitor.hasErr { + return "", fmt.Errorf("failed to rewrite DDL: not enough target tables for statement, got %d tables", len(targetTables)) + } + // Check if all target tables were consumed - extra targets indicate a configuration mismatch + if visitor.i < len(targetTables) { + return "", fmt.Errorf("failed to rewrite DDL: %d target tables provided but only %d were used in statement", len(targetTables), visitor.i) + } + } + + var b []byte + bf := bytes.NewBuffer(b) + err := stmt.Restore(&format.RestoreCtx{ + Flags: format.DefaultRestoreFlags | format.RestoreTiDBSpecialComment | format.RestoreStringWithoutDefaultCharset, + In: bf, + }) + if err != nil { + return "", errors.Trace(err) + } + + return bf.String(), nil +} diff --git a/downstreamadapter/routing/ddl_table_utils_test.go b/downstreamadapter/routing/ddl_table_utils_test.go new file mode 100644 index 0000000000..98b593d556 --- /dev/null +++ b/downstreamadapter/routing/ddl_table_utils_test.go @@ -0,0 +1,428 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/util/filter" + "github.com/stretchr/testify/require" +) + +type testCase struct { + sql string + expectedSQLs []string + expectedTableNames [][]*filter.Table + targetTableNames [][]*filter.Table + targetSQLs []string +} + +var testCases = []testCase{ + // Test case with foreign key - foreign key references should NOT be renamed + { + "create table `t1` (`id` int, `student_id` int, primary key (`id`), foreign key (`student_id`) references `t2`(`id`))", + []string{"CREATE TABLE `t1` (`id` INT,`student_id` INT,PRIMARY KEY(`id`),CONSTRAINT FOREIGN KEY (`student_id`) REFERENCES `t2`(`id`))"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "t1")}}, + []string{"CREATE TABLE `xtest`.`t1` (`id` INT,`student_id` INT,PRIMARY KEY(`id`),CONSTRAINT FOREIGN KEY (`student_id`) REFERENCES `t2`(`id`))"}, + }, + // CREATE SCHEMA/DATABASE + { + "create schema `s1`", + []string{"CREATE DATABASE `s1`"}, + [][]*filter.Table{{genTableName("s1", "")}}, + [][]*filter.Table{{genTableName("xs1", "")}}, + []string{"CREATE DATABASE `xs1`"}, + }, + { + "create schema if not exists `s1`", + []string{"CREATE DATABASE IF NOT EXISTS `s1`"}, + [][]*filter.Table{{genTableName("s1", "")}}, + [][]*filter.Table{{genTableName("xs1", "")}}, + []string{"CREATE DATABASE IF NOT EXISTS `xs1`"}, + }, + // DROP SCHEMA/DATABASE + { + "drop schema `s1`", + []string{"DROP DATABASE `s1`"}, + [][]*filter.Table{{genTableName("s1", "")}}, + [][]*filter.Table{{genTableName("xs1", "")}}, + []string{"DROP DATABASE `xs1`"}, + }, + { + "drop schema if exists `s1`", + []string{"DROP DATABASE IF EXISTS `s1`"}, + [][]*filter.Table{{genTableName("s1", "")}}, + [][]*filter.Table{{genTableName("xs1", "")}}, + []string{"DROP DATABASE IF EXISTS `xs1`"}, + }, + // ALTER DATABASE without explicit name - cannot rename since AST doesn't store database name + { + "alter database collate utf8mb4_general_ci", + []string{"ALTER DATABASE COLLATE = utf8mb4_general_ci"}, + [][]*filter.Table{{genTableName("test", "")}}, + [][]*filter.Table{{genTableName("xtest", "")}}, + // Note: When no database name is in original SQL, RenameDDLTable cannot add it + // because the AST parser tracks whether name was present + []string{"ALTER DATABASE COLLATE = utf8mb4_general_ci"}, + }, + // DROP TABLE - single table + { + "drop table `Ss1`.`tT1`", + []string{"DROP TABLE `Ss1`.`tT1`"}, + [][]*filter.Table{{genTableName("Ss1", "tT1")}}, + [][]*filter.Table{{genTableName("xSs1", "xtT1")}}, + []string{"DROP TABLE `xSs1`.`xtT1`"}, + }, + // DROP TABLE - multiple tables (requires SplitDDL to split, so we test without splitting) + { + "drop table `s1`.`t1`, `s2`.`t2`", + []string{"DROP TABLE `s1`.`t1`, `s2`.`t2`"}, + [][]*filter.Table{{genTableName("s1", "t1"), genTableName("s2", "t2")}}, + [][]*filter.Table{{genTableName("xs1", "xt1"), genTableName("xs2", "xt2")}}, + []string{"DROP TABLE `xs1`.`xt1`, `xs2`.`xt2`"}, + }, + { + "drop table `s1`.`t1`, `s2`.`t2`, `xx`", + []string{"DROP TABLE `s1`.`t1`, `s2`.`t2`, `xx`"}, + [][]*filter.Table{{genTableName("s1", "t1"), genTableName("s2", "t2"), genTableName("test", "xx")}}, + [][]*filter.Table{{genTableName("xs1", "xt1"), genTableName("xs2", "xt2"), genTableName("xtest", "xxx")}}, + []string{"DROP TABLE `xs1`.`xt1`, `xs2`.`xt2`, `xtest`.`xxx`"}, + }, + // CREATE TABLE + { + "create table `s1`.`t1` (id int)", + []string{"CREATE TABLE `s1`.`t1` (`id` INT)"}, + [][]*filter.Table{{genTableName("s1", "t1")}}, + [][]*filter.Table{{genTableName("xs1", "xt1")}}, + []string{"CREATE TABLE `xs1`.`xt1` (`id` INT)"}, + }, + { + "create table `t1` (id int)", + []string{"CREATE TABLE `t1` (`id` INT)"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"CREATE TABLE `xtest`.`xt1` (`id` INT)"}, + }, + { + "create table `s1` (c int default '0')", + []string{"CREATE TABLE `s1` (`c` INT DEFAULT '0')"}, + [][]*filter.Table{{genTableName("test", "s1")}}, + [][]*filter.Table{{genTableName("xtest", "xs1")}}, + []string{"CREATE TABLE `xtest`.`xs1` (`c` INT DEFAULT '0')"}, + }, + // CREATE TABLE LIKE + { + "create table `t1` like `t2`", + []string{"CREATE TABLE `t1` LIKE `t2`"}, + [][]*filter.Table{{genTableName("test", "t1"), genTableName("test", "t2")}}, + [][]*filter.Table{{genTableName("xtest", "xt1"), genTableName("xtest", "xt2")}}, + []string{"CREATE TABLE `xtest`.`xt1` LIKE `xtest`.`xt2`"}, + }, + { + "create table `s1`.`t1` like `t2`", + []string{"CREATE TABLE `s1`.`t1` LIKE `t2`"}, + [][]*filter.Table{{genTableName("s1", "t1"), genTableName("test", "t2")}}, + [][]*filter.Table{{genTableName("xs1", "xt1"), genTableName("xtest", "xt2")}}, + []string{"CREATE TABLE `xs1`.`xt1` LIKE `xtest`.`xt2`"}, + }, + { + "create table `t1` like `xx`.`t2`", + []string{"CREATE TABLE `t1` LIKE `xx`.`t2`"}, + [][]*filter.Table{{genTableName("test", "t1"), genTableName("xx", "t2")}}, + [][]*filter.Table{{genTableName("xtest", "xt1"), genTableName("xxx", "xt2")}}, + []string{"CREATE TABLE `xtest`.`xt1` LIKE `xxx`.`xt2`"}, + }, + // TRUNCATE TABLE + { + "truncate table `t1`", + []string{"TRUNCATE TABLE `t1`"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"TRUNCATE TABLE `xtest`.`xt1`"}, + }, + { + "truncate table `s1`.`t1`", + []string{"TRUNCATE TABLE `s1`.`t1`"}, + [][]*filter.Table{{genTableName("s1", "t1")}}, + [][]*filter.Table{{genTableName("xs1", "xt1")}}, + []string{"TRUNCATE TABLE `xs1`.`xt1`"}, + }, + // RENAME TABLE - single + { + "rename table `s1`.`t1` to `s2`.`t2`", + []string{"RENAME TABLE `s1`.`t1` TO `s2`.`t2`"}, + [][]*filter.Table{{genTableName("s1", "t1"), genTableName("s2", "t2")}}, + [][]*filter.Table{{genTableName("xs1", "xt1"), genTableName("xs2", "xt2")}}, + []string{"RENAME TABLE `xs1`.`xt1` TO `xs2`.`xt2`"}, + }, + // RENAME TABLE - multiple (no splitting) + { + "rename table `t1` to `t2`, `s1`.`t1` to `t2`", + []string{"RENAME TABLE `t1` TO `t2`, `s1`.`t1` TO `t2`"}, + [][]*filter.Table{{genTableName("test", "t1"), genTableName("test", "t2"), genTableName("s1", "t1"), genTableName("test", "t2")}}, + [][]*filter.Table{{genTableName("xtest", "xt1"), genTableName("xtest", "xt2"), genTableName("xs1", "xt1"), genTableName("xtest", "xt2")}}, + []string{"RENAME TABLE `xtest`.`xt1` TO `xtest`.`xt2`, `xs1`.`xt1` TO `xtest`.`xt2`"}, + }, + // DROP INDEX + { + "drop index i1 on `s1`.`t1`", + []string{"DROP INDEX `i1` ON `s1`.`t1`"}, + [][]*filter.Table{{genTableName("s1", "t1")}}, + [][]*filter.Table{{genTableName("xs1", "xt1")}}, + []string{"DROP INDEX `i1` ON `xs1`.`xt1`"}, + }, + { + "drop index i1 on `t1`", + []string{"DROP INDEX `i1` ON `t1`"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"DROP INDEX `i1` ON `xtest`.`xt1`"}, + }, + // CREATE INDEX + { + "create index i1 on `t1`(`c1`)", + []string{"CREATE INDEX `i1` ON `t1` (`c1`)"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"CREATE INDEX `i1` ON `xtest`.`xt1` (`c1`)"}, + }, + { + "create index i1 on `s1`.`t1`(`c1`)", + []string{"CREATE INDEX `i1` ON `s1`.`t1` (`c1`)"}, + [][]*filter.Table{{genTableName("s1", "t1")}}, + [][]*filter.Table{{genTableName("xs1", "xt1")}}, + []string{"CREATE INDEX `i1` ON `xs1`.`xt1` (`c1`)"}, + }, + // ALTER TABLE - multiple specs (no splitting, test as single statement) + { + "alter table `t1` add column c1 int, drop column c2", + []string{"ALTER TABLE `t1` ADD COLUMN `c1` INT, DROP COLUMN `c2`"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` ADD COLUMN `c1` INT, DROP COLUMN `c2`"}, + }, + { + "alter table `s1`.`t1` add column c1 int, rename to `t2`, drop column c2", + []string{"ALTER TABLE `s1`.`t1` ADD COLUMN `c1` INT, RENAME AS `t2`, DROP COLUMN `c2`"}, + [][]*filter.Table{{genTableName("s1", "t1"), genTableName("test", "t2")}}, + [][]*filter.Table{{genTableName("xs1", "xt1"), genTableName("xtest", "xt2")}}, + []string{"ALTER TABLE `xs1`.`xt1` ADD COLUMN `c1` INT, RENAME AS `xtest`.`xt2`, DROP COLUMN `c2`"}, + }, + { + "alter table `s1`.`t1` add column c1 int, rename to `xx`.`t2`, drop column c2", + []string{"ALTER TABLE `s1`.`t1` ADD COLUMN `c1` INT, RENAME AS `xx`.`t2`, DROP COLUMN `c2`"}, + [][]*filter.Table{{genTableName("s1", "t1"), genTableName("xx", "t2")}}, + [][]*filter.Table{{genTableName("xs1", "xt1"), genTableName("xxx", "xt2")}}, + []string{"ALTER TABLE `xs1`.`xt1` ADD COLUMN `c1` INT, RENAME AS `xxx`.`xt2`, DROP COLUMN `c2`"}, + }, + // ALTER TABLE with IF NOT EXISTS / IF EXISTS + // Note: TiDB parser converts these to TiDB-specific comment syntax (/*T! ... */) + { + "alter table `t1` add column if not exists c1 int", + []string{"ALTER TABLE `t1` ADD COLUMN IF NOT EXISTS `c1` INT"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` ADD COLUMN /*T! IF NOT EXISTS */`c1` INT"}, + }, + { + "alter table `t1` add index if not exists (a) using btree comment 'a'", + []string{"ALTER TABLE `t1` ADD INDEX IF NOT EXISTS(`a`) USING BTREE COMMENT 'a'"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` ADD INDEX/*T! IF NOT EXISTS */(`a`) USING BTREE COMMENT 'a'"}, + }, + { + "alter table `t1` add constraint fk_t2_id foreign key if not exists (t2_id) references t2(id)", + []string{"ALTER TABLE `t1` ADD CONSTRAINT `fk_t2_id` FOREIGN KEY IF NOT EXISTS (`t2_id`) REFERENCES `t2`(`id`)"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` ADD CONSTRAINT `fk_t2_id` FOREIGN KEY /*T! IF NOT EXISTS */(`t2_id`) REFERENCES `t2`(`id`)"}, + }, + { + "create index if not exists i1 on `t1`(`c1`)", + []string{"CREATE INDEX IF NOT EXISTS `i1` ON `t1` (`c1`)"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"CREATE INDEX /*T! IF NOT EXISTS */`i1` ON `xtest`.`xt1` (`c1`)"}, + }, + { + "alter table `t1` add partition if not exists ( partition p2 values less than maxvalue)", + []string{"ALTER TABLE `t1` ADD PARTITION IF NOT EXISTS (PARTITION `p2` VALUES LESS THAN (MAXVALUE))"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` ADD PARTITION/*T! IF NOT EXISTS */ (PARTITION `p2` VALUES LESS THAN (MAXVALUE))"}, + }, + { + "alter table `t1` drop column if exists c2", + []string{"ALTER TABLE `t1` DROP COLUMN IF EXISTS `c2`"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` DROP COLUMN /*T! IF EXISTS */`c2`"}, + }, + { + "alter table `t1` change column if exists a b varchar(255)", + []string{"ALTER TABLE `t1` CHANGE COLUMN IF EXISTS `a` `b` VARCHAR(255)"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` CHANGE COLUMN /*T! IF EXISTS */`a` `b` VARCHAR(255)"}, + }, + { + "alter table `t1` modify column if exists a varchar(255)", + []string{"ALTER TABLE `t1` MODIFY COLUMN IF EXISTS `a` VARCHAR(255)"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` MODIFY COLUMN /*T! IF EXISTS */`a` VARCHAR(255)"}, + }, + { + "alter table `t1` drop index if exists i1", + []string{"ALTER TABLE `t1` DROP INDEX IF EXISTS `i1`"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` DROP INDEX /*T! IF EXISTS */`i1`"}, + }, + { + "alter table `t1` drop foreign key fk_t2_id", + []string{"ALTER TABLE `t1` DROP FOREIGN KEY `fk_t2_id`"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` DROP FOREIGN KEY `fk_t2_id`"}, + }, + { + "alter table `t1` drop partition if exists p2", + []string{"ALTER TABLE `t1` DROP PARTITION IF EXISTS `p2`"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` DROP PARTITION /*T! IF EXISTS */`p2`"}, + }, + // ALTER TABLE PARTITION BY + { + "alter table `t1` partition by hash(a)", + []string{"ALTER TABLE `t1` PARTITION BY HASH (`a`) PARTITIONS 1"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` PARTITION BY HASH (`a`) PARTITIONS 1"}, + }, + { + "alter table `t1` partition by key(a)", + []string{"ALTER TABLE `t1` PARTITION BY KEY (`a`) PARTITIONS 1"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` PARTITION BY KEY (`a`) PARTITIONS 1"}, + }, + { + "alter table `t1` partition by range(a) (partition x values less than (75))", + []string{"ALTER TABLE `t1` PARTITION BY RANGE (`a`) (PARTITION `x` VALUES LESS THAN (75))"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` PARTITION BY RANGE (`a`) (PARTITION `x` VALUES LESS THAN (75))"}, + }, + { + "alter table `t1` partition by list columns (a, b) (partition x values in ((10, 20)))", + []string{"ALTER TABLE `t1` PARTITION BY LIST COLUMNS (`a`,`b`) (PARTITION `x` VALUES IN ((10, 20)))"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` PARTITION BY LIST COLUMNS (`a`,`b`) (PARTITION `x` VALUES IN ((10, 20)))"}, + }, + { + "alter table `t1` partition by list (a) (partition x default)", + []string{"ALTER TABLE `t1` PARTITION BY LIST (`a`) (PARTITION `x` DEFAULT)"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` PARTITION BY LIST (`a`) (PARTITION `x` DEFAULT)"}, + }, + { + "alter table `t1` partition by system_time (partition x history, partition y current)", + []string{"ALTER TABLE `t1` PARTITION BY SYSTEM_TIME (PARTITION `x` HISTORY,PARTITION `y` CURRENT)"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "xt1")}}, + []string{"ALTER TABLE `xtest`.`xt1` PARTITION BY SYSTEM_TIME (PARTITION `x` HISTORY,PARTITION `y` CURRENT)"}, + }, + // ALTER DATABASE with explicit name + { + "alter database `test` charset utf8mb4", + []string{"ALTER DATABASE `test` CHARACTER SET = utf8mb4"}, + [][]*filter.Table{{genTableName("test", "")}}, + [][]*filter.Table{{genTableName("xtest", "")}}, + []string{"ALTER DATABASE `xtest` CHARACTER SET = utf8mb4"}, + }, + // ALTER TABLE ADD COLUMN with multiple columns (no splitting) + { + "alter table `t1` add column (c1 int, c2 int)", + []string{"ALTER TABLE `t1` ADD COLUMN (`c1` INT, `c2` INT)"}, + [][]*filter.Table{{genTableName("test", "t1")}}, + [][]*filter.Table{{genTableName("xtest", "t1")}}, + []string{"ALTER TABLE `xtest`.`t1` ADD COLUMN (`c1` INT, `c2` INT)"}, + }, +} + +var nonDDLs = []string{ + "GRANT CREATE TABLESPACE ON *.* TO `root`@`%` WITH GRANT OPTION", +} + +func TestError(t *testing.T) { + t.Parallel() + p := parser.New() + + // DML will report ErrUnknownTypeDDL + dml := "INSERT INTO `t1` VALUES (1)" + + stmts, _, err := p.Parse(dml, "", "") + require.NoError(t, err) + _, err = fetchDDLTables("test", stmts[0]) + require.Error(t, err) + require.Contains(t, err.Error(), "unknown DDL type") + + _, err = rewriteDDLQuery(stmts[0], nil) + require.Error(t, err) + require.Contains(t, err.Error(), "unknown DDL type") + + // tableRenameVisitor with less `targetNames` won't panic + ddl := "create table `s1`.`t1` (id int)" + stmts, _, err = p.Parse(ddl, "", "") + require.NoError(t, err) + _, err = rewriteDDLQuery(stmts[0], nil) + require.Error(t, err) + require.Contains(t, err.Error(), "not enough target tables") + + _, _, err = p.Parse("alter table bar ADD SPATIAL INDEX (`g`)", "", "") + require.Error(t, err) +} + +// TestResolveDDL tests FetchDDLTables and RenameDDLTable +func TestResolveDDL(t *testing.T) { + t.Parallel() + p := parser.New() + + for _, ca := range testCases { + stmts, _, err := p.Parse(ca.sql, "", "") + require.NoError(t, err) + require.Len(t, stmts, 1) + + // Test FetchDDLTables + tableNames, err := fetchDDLTables("test", stmts[0]) + require.NoError(t, err) + require.Equal(t, ca.expectedTableNames[0], tableNames, "FetchDDLTables failed for: %s", ca.sql) + + // Re-parse for RenameDDLTable since it modifies AST in place + stmts, _, err = p.Parse(ca.sql, "", "") + require.NoError(t, err) + + // Test RenameDDLTable + targetSQL, err := rewriteDDLQuery(stmts[0], ca.targetTableNames[0]) + require.NoError(t, err) + require.Equal(t, ca.targetSQLs[0], targetSQL, "RenameDDLTable failed for: %s", ca.sql) + } +} diff --git a/downstreamadapter/routing/router.go b/downstreamadapter/routing/router.go new file mode 100644 index 0000000000..b3b815ff3a --- /dev/null +++ b/downstreamadapter/routing/router.go @@ -0,0 +1,273 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "strings" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/errors" + tfilter "github.com/pingcap/tidb/pkg/util/table-filter" + "go.uber.org/zap" +) + +// Routing expression placeholders that can be used in TargetSchema and TargetTable. +const ( + // SchemaPlaceholder is replaced with the source schema name in routing expressions. + SchemaPlaceholder = "{schema}" + // TablePlaceholder is replaced with the source table name in routing expressions. + TablePlaceholder = "{table}" +) + +// Router routes source schema/table names to target schema/table names. +type Router struct { + rules []*rule +} + +// rule represents a single routing rule. +type rule struct { + filter tfilter.Filter + schemaExpr string + tableExpr string +} + +// NewRouter creates a new Router from dispatch rules. +// Returns nil if no routing rules are configured. +func NewRouter(caseSensitive bool, rules []*config.DispatchRule) (*Router, error) { + if len(rules) == 0 { + return nil, nil + } + + routingRules := make([]*rule, 0, len(rules)) + for _, r := range rules { + if r.TargetSchema == "" && r.TargetTable == "" { + continue + } + + f, err := tfilter.Parse(r.Matcher) + if err != nil { + log.Warn("router failed to initialize", zap.Strings("matcher", r.Matcher), zap.Error(err)) + return nil, errors.WrapError(errors.ErrInvalidTableRoutingRule, err) + } + if !caseSensitive { + f = tfilter.CaseInsensitive(f) + } + + routingRules = append(routingRules, &rule{ + filter: f, + schemaExpr: r.TargetSchema, + tableExpr: r.TargetTable, + }) + } + + if len(routingRules) == 0 { + return nil, nil + } + + return &Router{rules: routingRules}, nil +} + +// Route returns the target schema and table names for the given source schema/table. +func (r *Router) Route(sourceSchema, sourceTable string) (targetSchema, targetTable string) { + if r == nil || len(r.rules) == 0 { + return sourceSchema, sourceTable + } + + rule := r.matchRule(sourceSchema, sourceTable) + if rule == nil { + return sourceSchema, sourceTable + } + + targetSchema = substituteExpression(rule.schemaExpr, sourceSchema, sourceTable, sourceSchema) + targetTable = substituteExpression(rule.tableExpr, sourceSchema, sourceTable, sourceTable) + return targetSchema, targetTable +} + +// ApplyToTableInfo returns the original TableInfo unless routing changes the target name. +// When routing changes the target, it clones the TableInfo so the caller can safely reuse +// routed metadata without mutating the shared source TableInfo. +func (r *Router) ApplyToTableInfo(tableInfo *common.TableInfo) *common.TableInfo { + if tableInfo == nil { + return nil + } + + sourceSchema := tableInfo.TableName.Schema + sourceTable := tableInfo.TableName.Table + targetSchema, targetTable := r.Route(sourceSchema, sourceTable) + if targetSchema == sourceSchema && targetTable == sourceTable { + return tableInfo + } + + return tableInfo.CloneWithRouting(targetSchema, targetTable) +} + +// ApplyToDDLEvent returns the original DDL event unless routing changes the query or related +// table metadata. When routing applies, it clones the DDL event once and rewrites all relevant +// routing-aware fields on the clone. +func (r *Router) ApplyToDDLEvent(ddl *commonEvent.DDLEvent, changefeedID common.ChangeFeedID) (*commonEvent.DDLEvent, error) { + if ddl == nil { + return nil, nil + } + + newQuery, queryChanged, err := rewriteDDLQueryWithRouting(r, ddl, changefeedID) + if err != nil { + return nil, err + } + + routedSchemaName, routedTableName, schemaTableChanged := routeSchemaTable( + r, ddl.GetSourceSchemaName(), ddl.GetSourceTableName(), + ) + routedExtraSchemaName, routedExtraTableName, extraSchemaTableChanged := routeSchemaTable( + r, ddl.GetSourceExtraSchemaName(), ddl.GetSourceExtraTableName(), + ) + routedTableInfo := r.ApplyToTableInfo(ddl.TableInfo) + tableInfoChanged := routedTableInfo != ddl.TableInfo + routedMultipleTableInfos, multipleTableInfosChanged := applyToMultipleTableInfos(r, ddl.MultipleTableInfos) + routedBlockedTableNames, blockedTableNamesChanged := applyToBlockedTableNames(r, ddl.BlockedTableNames) + + if !queryChanged && + !schemaTableChanged && + !extraSchemaTableChanged && + !tableInfoChanged && + !multipleTableInfosChanged && + !blockedTableNamesChanged { + return ddl, nil + } + + cloned := ddl.CloneForRouting() + if queryChanged { + cloned.Query = newQuery + } + if schemaTableChanged { + cloned.TargetSchemaName = routedSchemaName + cloned.TargetTableName = routedTableName + } + if extraSchemaTableChanged { + cloned.TargetExtraSchemaName = routedExtraSchemaName + cloned.TargetExtraTableName = routedExtraTableName + } + if tableInfoChanged { + cloned.TableInfo = routedTableInfo + } + if multipleTableInfosChanged { + cloned.MultipleTableInfos = routedMultipleTableInfos + } + if blockedTableNamesChanged { + cloned.BlockedTableNames = routedBlockedTableNames + } + + return cloned, nil +} + +// matchRule finds the first rule that matches the given schema/table. +func (r *Router) matchRule(schema, table string) *rule { + for _, rule := range r.rules { + if rule.filter.MatchTable(schema, table) { + return rule + } + } + return nil +} + +func applyToMultipleTableInfos(r *Router, tableInfos []*common.TableInfo) ([]*common.TableInfo, bool) { + if len(tableInfos) == 0 { + return tableInfos, false + } + + var ( + changed bool + routedTableInfos []*common.TableInfo + ) + for i, tableInfo := range tableInfos { + routedTableInfo := r.ApplyToTableInfo(tableInfo) + if routedTableInfo != tableInfo { + if !changed { + routedTableInfos = append([]*common.TableInfo(nil), tableInfos...) + changed = true + } + routedTableInfos[i] = routedTableInfo + } + } + + if !changed { + return tableInfos, false + } + return routedTableInfos, true +} + +func applyToBlockedTableNames(r *Router, tableNames []commonEvent.SchemaTableName) ([]commonEvent.SchemaTableName, bool) { + if len(tableNames) == 0 { + return tableNames, false + } + + var ( + changed bool + routedTableNames []commonEvent.SchemaTableName + ) + for i, tableName := range tableNames { + targetSchema, targetTable := r.Route(tableName.SchemaName, tableName.TableName) + if targetSchema != tableName.SchemaName || targetTable != tableName.TableName { + if !changed { + routedTableNames = append([]commonEvent.SchemaTableName(nil), tableNames...) + changed = true + } + routedTableNames[i] = commonEvent.SchemaTableName{ + SchemaName: targetSchema, + TableName: targetTable, + } + } + } + + if !changed { + return tableNames, false + } + return routedTableNames, true +} + +func routeSchemaOnly(r *Router, schema string) (string, bool) { + if schema == "" { + return "", false + } + targetSchema, _ := r.Route(schema, "") + return targetSchema, targetSchema != schema +} + +func routeSchemaTable(r *Router, schema, table string) (string, string, bool) { + if schema == "" && table == "" { + return "", "", false + } + if table == "" { + targetSchema, changed := routeSchemaOnly(r, schema) + return targetSchema, "", changed + } + targetSchema, targetTable := r.Route(schema, table) + return targetSchema, targetTable, targetSchema != schema || targetTable != table +} + +// substituteExpression replaces {schema} and {table} placeholders with actual values. +// If expr is empty, returns defaultValue (typically sourceSchema for schema expressions, +// sourceTable for table expressions). +func substituteExpression(expr, sourceSchema, sourceTable, defaultValue string) string { + if expr == "" { + return defaultValue + } + + result := expr + result = strings.ReplaceAll(result, SchemaPlaceholder, sourceSchema) + result = strings.ReplaceAll(result, TablePlaceholder, sourceTable) + return result +} diff --git a/downstreamadapter/routing/router_apply_test.go b/downstreamadapter/routing/router_apply_test.go new file mode 100644 index 0000000000..e186c7099e --- /dev/null +++ b/downstreamadapter/routing/router_apply_test.go @@ -0,0 +1,236 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "testing" + + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" + "github.com/stretchr/testify/require" +) + +func TestApplyToTableInfo(t *testing.T) { + t.Parallel() + + tableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "source_db", + Table: "source_table", + TableID: 1, + }, + } + + var nilRouter *Router + require.Same(t, tableInfo, nilRouter.ApplyToTableInfo(tableInfo)) + + noOpRouter, err := NewRouter(false, []*config.DispatchRule{ + { + Matcher: []string{"other_db.*"}, + TargetSchema: "target_db", + TargetTable: TablePlaceholder, + }, + }) + require.NoError(t, err) + require.Same(t, tableInfo, noOpRouter.ApplyToTableInfo(tableInfo)) + + router, err := NewRouter(false, []*config.DispatchRule{ + { + Matcher: []string{"source_db.source_table"}, + TargetSchema: "target_db", + TargetTable: "target_table", + }, + }) + require.NoError(t, err) + + routed := router.ApplyToTableInfo(tableInfo) + require.NotSame(t, tableInfo, routed) + require.Equal(t, "source_db", routed.GetSchemaName()) + require.Equal(t, "source_table", routed.GetTableName()) + require.Equal(t, "target_db", routed.GetTargetSchemaName()) + require.Equal(t, "target_table", routed.GetTargetTableName()) + require.Equal(t, "source_db", routed.TableName.Schema) + require.Equal(t, "source_table", routed.TableName.Table) + require.Equal(t, "target_db", routed.TableName.TargetSchema) + require.Equal(t, "target_table", routed.TableName.TargetTable) + + require.Empty(t, tableInfo.TableName.TargetSchema) + require.Empty(t, tableInfo.TableName.TargetTable) +} + +func TestApplyToDDLEvent(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + router *Router + ddl *event.DDLEvent + check func(t *testing.T, original, routed *event.DDLEvent) + }{ + { + name: "single table ddl", + router: func() *Router { + router, err := NewRouter(false, []*config.DispatchRule{{ + Matcher: []string{"source_db.source_table"}, + TargetSchema: "target_db", + TargetTable: "target_table", + }}) + require.NoError(t, err) + return router + }(), + ddl: func() *event.DDLEvent { + originalTableInfo := &common.TableInfo{ + TableName: common.TableName{ + Schema: "source_db", + Table: "source_table", + TableID: 1, + }, + } + return &event.DDLEvent{ + Query: "ALTER TABLE `source_db`.`source_table` ADD INDEX idx_id(id)", + SchemaName: "source_db", + TableName: "source_table", + TableInfo: originalTableInfo, + MultipleTableInfos: []*common.TableInfo{ + originalTableInfo, + }, + BlockedTableNames: []event.SchemaTableName{ + {SchemaName: "source_db", TableName: "source_table"}, + }, + } + }(), + check: func(t *testing.T, original, routed *event.DDLEvent) { + require.Contains(t, routed.Query, "`target_db`.`target_table`") + require.Equal(t, "target_db", routed.GetDDLSchemaName()) + require.Equal(t, "source_db", routed.SchemaName) + require.Equal(t, "source_table", routed.TableName) + require.Equal(t, "target_db", routed.TargetSchemaName) + require.Equal(t, "target_table", routed.TargetTableName) + require.NotSame(t, original.TableInfo, routed.TableInfo) + require.Equal(t, "source_db", routed.TableInfo.GetSchemaName()) + require.Equal(t, "source_table", routed.TableInfo.GetTableName()) + require.Equal(t, "target_db", routed.TableInfo.GetTargetSchemaName()) + require.Equal(t, "target_table", routed.TableInfo.GetTargetTableName()) + require.Equal(t, "target_db", routed.TableInfo.TableName.TargetSchema) + require.Equal(t, "target_table", routed.TableInfo.TableName.TargetTable) + require.NotSame(t, original.MultipleTableInfos[0], routed.MultipleTableInfos[0]) + require.Equal(t, "source_db", routed.MultipleTableInfos[0].GetSchemaName()) + require.Equal(t, "source_table", routed.MultipleTableInfos[0].GetTableName()) + require.Equal(t, "target_db", routed.MultipleTableInfos[0].GetTargetSchemaName()) + require.Equal(t, "target_table", routed.MultipleTableInfos[0].GetTargetTableName()) + require.Equal(t, event.SchemaTableName{ + SchemaName: "target_db", + TableName: "target_table", + }, routed.BlockedTableNames[0]) + + require.Equal(t, "ALTER TABLE `source_db`.`source_table` ADD INDEX idx_id(id)", original.Query) + require.Equal(t, "source_db", original.GetDDLSchemaName()) + require.Empty(t, original.TableInfo.TableName.TargetSchema) + require.Empty(t, original.TableInfo.TableName.TargetTable) + }, + }, + { + name: "rename ddl", + router: func() *Router { + router, err := NewRouter(false, []*config.DispatchRule{ + { + Matcher: []string{"old_db.*"}, + TargetSchema: "old_target_db", + TargetTable: "{table}_old", + }, + { + Matcher: []string{"new_db.*"}, + TargetSchema: "new_target_db", + TargetTable: "{table}_new", + }, + }) + require.NoError(t, err) + return router + }(), + ddl: &event.DDLEvent{ + Query: "RENAME TABLE `old_db`.`orders` TO `new_db`.`orders_archive`", + SchemaName: "new_db", + TableName: "orders_archive", + ExtraSchemaName: "old_db", + ExtraTableName: "orders", + TableNameChange: &event.TableNameChange{ + AddName: []event.SchemaTableName{{ + SchemaName: "new_db", + TableName: "orders_archive", + }}, + DropName: []event.SchemaTableName{{ + SchemaName: "old_db", + TableName: "orders", + }}, + }, + }, + check: func(t *testing.T, original, routed *event.DDLEvent) { + require.Equal(t, "new_db", routed.SchemaName) + require.Equal(t, "orders_archive", routed.TableName) + require.Equal(t, "old_db", routed.ExtraSchemaName) + require.Equal(t, "orders", routed.ExtraTableName) + require.Equal(t, "new_target_db", routed.TargetSchemaName) + require.Equal(t, "orders_archive_new", routed.TargetTableName) + require.Equal(t, "old_target_db", routed.TargetExtraSchemaName) + require.Equal(t, "orders_old", routed.TargetExtraTableName) + require.Equal(t, event.SchemaTableName{ + SchemaName: "new_db", + TableName: "orders_archive", + }, routed.TableNameChange.AddName[0]) + require.Equal(t, event.SchemaTableName{ + SchemaName: "old_db", + TableName: "orders", + }, routed.TableNameChange.DropName[0]) + require.Contains(t, routed.Query, "`old_target_db`.`orders_old`") + require.Contains(t, routed.Query, "`new_target_db`.`orders_archive_new`") + require.Equal(t, "new_db", original.SchemaName) + require.Equal(t, "orders_archive", original.TableName) + require.Equal(t, "old_db", original.ExtraSchemaName) + require.Equal(t, "orders", original.ExtraTableName) + }, + }, + { + name: "database ddl", + router: func() *Router { + router, err := NewRouter(false, []*config.DispatchRule{{ + Matcher: []string{"source_db.*"}, + TargetSchema: "target_db", + }}) + require.NoError(t, err) + return router + }(), + ddl: &event.DDLEvent{ + Query: "CREATE DATABASE `source_db`", + SchemaName: "source_db", + }, + check: func(t *testing.T, original, routed *event.DDLEvent) { + require.Equal(t, "source_db", routed.SchemaName) + require.Equal(t, "target_db", routed.TargetSchemaName) + require.Equal(t, "target_db", routed.GetDDLSchemaName()) + require.Equal(t, "source_db", original.SchemaName) + }, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + routed, err := tc.router.ApplyToDDLEvent(tc.ddl, common.NewChangefeedID4Test(common.DefaultKeyspaceName, "test-changefeed")) + require.NoError(t, err) + require.NotSame(t, tc.ddl, routed) + tc.check(t, tc.ddl, routed) + }) + } +} diff --git a/downstreamadapter/routing/router_test.go b/downstreamadapter/routing/router_test.go new file mode 100644 index 0000000000..30092404f3 --- /dev/null +++ b/downstreamadapter/routing/router_test.go @@ -0,0 +1,266 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "testing" + + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestSubstituteExpression(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + expr string + sourceSchema string + sourceTable string + defaultValue string + expected string + }{ + { + name: "empty expression falls back to default", + expr: "", + sourceSchema: "mydb", + sourceTable: "mytable", + defaultValue: "mydb", + expected: "mydb", + }, + { + name: "schema placeholder", + expr: SchemaPlaceholder, + sourceSchema: "mydb", + sourceTable: "mytable", + defaultValue: "unused", + expected: "mydb", + }, + { + name: "table placeholder", + expr: TablePlaceholder + "_bak", + sourceSchema: "mydb", + sourceTable: "mytable", + defaultValue: "unused", + expected: "mytable_bak", + }, + { + name: "literal with both placeholders", + expr: "db_" + SchemaPlaceholder + "_" + TablePlaceholder + "_v2", + sourceSchema: "prod", + sourceTable: "users", + defaultValue: "unused", + expected: "db_prod_users_v2", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := substituteExpression(tc.expr, tc.sourceSchema, tc.sourceTable, tc.defaultValue) + require.Equal(t, tc.expected, result) + }) + } +} + +func TestNewRouter(t *testing.T) { + t.Parallel() + + t.Run("nil or empty rules return nil router", func(t *testing.T) { + t.Parallel() + + router, err := NewRouter(true, nil) + require.NoError(t, err) + require.Nil(t, router) + + router, err = NewRouter(true, []*config.DispatchRule{}) + require.NoError(t, err) + require.Nil(t, router) + }) + + t.Run("rules without routing targets are skipped", func(t *testing.T) { + t.Parallel() + + router, err := NewRouter(true, []*config.DispatchRule{ + {Matcher: []string{"db1.*"}}, + }) + require.NoError(t, err) + require.Nil(t, router) + }) + + t.Run("invalid matcher returns error", func(t *testing.T) { + t.Parallel() + + router, err := NewRouter(true, []*config.DispatchRule{ + { + Matcher: []string{"[invalid"}, + TargetSchema: "target", + TargetTable: TablePlaceholder, + }, + }) + require.Error(t, err) + code, ok := errors.RFCCode(err) + require.True(t, ok) + require.Equal(t, errors.ErrInvalidTableRoutingRule.RFCCode(), code) + require.Nil(t, router) + }) + + t.Run("builds router from rules with target fields and ignores pure dispatch rules", func(t *testing.T) { + t.Parallel() + + router, err := NewRouter(true, []*config.DispatchRule{ + { + Matcher: []string{"db1.*"}, + DispatcherRule: "ts", + }, + { + Matcher: []string{"db2.*"}, + TargetSchema: "archive", + TargetTable: TablePlaceholder, + }, + { + Matcher: []string{"db3.users"}, + TargetSchema: SchemaPlaceholder, + TargetTable: "users_bak", + }, + }) + require.NoError(t, err) + require.NotNil(t, router) + + schema, table := router.Route("db1", "orders") + require.Equal(t, "db1", schema) + require.Equal(t, "orders", table) + + schema, table = router.Route("db2", "orders") + require.Equal(t, "archive", schema) + require.Equal(t, "orders", table) + + schema, table = router.Route("db3", "users") + require.Equal(t, "db3", schema) + require.Equal(t, "users_bak", table) + }) +} + +func TestRouterRoute(t *testing.T) { + t.Parallel() + + var nilRouter *Router + schema, table := nilRouter.Route("source_db", "source_table") + require.Equal(t, "source_db", schema) + require.Equal(t, "source_table", table) + + router, err := NewRouter(true, []*config.DispatchRule{ + {Matcher: []string{"db1.specific"}, TargetSchema: "specific_db", TargetTable: "specific_table"}, + {Matcher: []string{"db1.*"}, TargetSchema: "db1_archive", TargetTable: TablePlaceholder}, + {Matcher: []string{"staging.*"}, TargetSchema: "prod", TargetTable: SchemaPlaceholder + "_" + TablePlaceholder}, + {Matcher: []string{"*.*"}, TargetSchema: "fallback", TargetTable: TablePlaceholder}, + }) + require.NoError(t, err) + require.NotNil(t, router) + + testCases := []struct { + name string + sourceSchema string + sourceTable string + expectedSchema string + expectedTable string + }{ + { + name: "specific matcher rewrites both names", + sourceSchema: "db1", + sourceTable: "specific", + expectedSchema: "specific_db", + expectedTable: "specific_table", + }, + { + name: "schema wildcard rewrites schema only", + sourceSchema: "db1", + sourceTable: "orders", + expectedSchema: "db1_archive", + expectedTable: "orders", + }, + { + name: "placeholders use source names", + sourceSchema: "staging", + sourceTable: "events", + expectedSchema: "prod", + expectedTable: "staging_events", + }, + { + name: "fallback matcher applies when no prior rule matches", + sourceSchema: "db2", + sourceTable: "products", + expectedSchema: "fallback", + expectedTable: "products", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + gotSchema, gotTable := router.Route(tc.sourceSchema, tc.sourceTable) + require.Equal(t, tc.expectedSchema, gotSchema) + require.Equal(t, tc.expectedTable, gotTable) + }) + } +} + +func TestRouterFirstMatchWins(t *testing.T) { + t.Parallel() + + router, err := NewRouter(true, []*config.DispatchRule{ + {Matcher: []string{"*.*"}, TargetSchema: "catch_all", TargetTable: TablePlaceholder}, + {Matcher: []string{"db1.*"}, TargetSchema: "db1_only", TargetTable: TablePlaceholder}, + {Matcher: []string{"db1.users"}, TargetSchema: "users_only", TargetTable: "users_bak"}, + }) + require.NoError(t, err) + require.NotNil(t, router) + + schema, table := router.Route("db1", "users") + require.Equal(t, "catch_all", schema) + require.Equal(t, "users", table) +} + +func TestRouterCaseSensitivity(t *testing.T) { + t.Parallel() + + t.Run("case sensitive router requires exact match", func(t *testing.T) { + t.Parallel() + + router, err := NewRouter(true, []*config.DispatchRule{ + {Matcher: []string{"MyDB.MyTable"}, TargetSchema: "target_db", TargetTable: "target_table"}, + }) + require.NoError(t, err) + + schema, table := router.Route("MyDB", "MyTable") + require.Equal(t, "target_db", schema) + require.Equal(t, "target_table", table) + + schema, table = router.Route("mydb", "mytable") + require.Equal(t, "mydb", schema) + require.Equal(t, "mytable", table) + }) + + t.Run("case insensitive router preserves source case in placeholders", func(t *testing.T) { + t.Parallel() + + router, err := NewRouter(false, []*config.DispatchRule{ + {Matcher: []string{"MyDB.*"}, TargetSchema: "backup_" + SchemaPlaceholder, TargetTable: TablePlaceholder}, + }) + require.NoError(t, err) + + schema, table := router.Route("mydb", "MyTable") + require.Equal(t, "backup_mydb", schema) + require.Equal(t, "MyTable", table) + }) +} diff --git a/downstreamadapter/sink/sink.go b/downstreamadapter/sink/sink.go index 5e9312b519..cefaf5233f 100644 --- a/downstreamadapter/sink/sink.go +++ b/downstreamadapter/sink/sink.go @@ -52,6 +52,7 @@ func New(ctx context.Context, cfg *config.ChangefeedConfig, changefeedID common. if err != nil { return nil, errors.WrapError(errors.ErrSinkURIInvalid, err) } + scheme := config.GetScheme(sinkURI) switch scheme { case config.MySQLScheme, config.MySQLSSLScheme, config.TiDBScheme, config.TiDBSSLScheme: diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index ef8dcd6732..0cd046461a 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -22,6 +22,8 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" "go.uber.org/zap" ) @@ -41,14 +43,22 @@ type DDLEvent struct { SchemaID int64 `json:"schema_id"` SchemaName string `json:"schema_name"` TableName string `json:"table_name"` + // TargetSchemaName and TargetTableName carry routed names for sink output paths. + // They are runtime-only fields and are not serialized. + TargetSchemaName string `json:"-"` + TargetTableName string `json:"-"` // the following two fields are just used for RenameTable, // they are the old schema/table name of the table - ExtraSchemaName string `json:"extra_schema_name"` - ExtraTableName string `json:"extra_table_name"` - Query string `json:"query"` - TableInfo *common.TableInfo `json:"-"` - StartTs uint64 `json:"start_ts"` - FinishedTs uint64 `json:"finished_ts"` + ExtraSchemaName string `json:"extra_schema_name"` + ExtraTableName string `json:"extra_table_name"` + // TargetExtraSchemaName and TargetExtraTableName carry routed old names for rename DDLs. + // They are runtime-only fields and are not serialized. + TargetExtraSchemaName string `json:"-"` + TargetExtraTableName string `json:"-"` + Query string `json:"query"` + TableInfo *common.TableInfo `json:"-"` + StartTs uint64 `json:"start_ts"` + FinishedTs uint64 `json:"finished_ts"` // The seq of the event. It is set by event service. Seq uint64 `json:"seq"` // The epoch of the event. It is set by event service. @@ -189,18 +199,62 @@ func (d *DDLEvent) GetSchemaName() string { return d.SchemaName } +func (d *DDLEvent) GetSourceSchemaName() string { + return d.SchemaName +} + func (d *DDLEvent) GetTableName() string { return d.TableName } +func (d *DDLEvent) GetSourceTableName() string { + return d.TableName +} + func (d *DDLEvent) GetExtraSchemaName() string { return d.ExtraSchemaName } +func (d *DDLEvent) GetSourceExtraSchemaName() string { + return d.ExtraSchemaName +} + func (d *DDLEvent) GetExtraTableName() string { return d.ExtraTableName } +func (d *DDLEvent) GetSourceExtraTableName() string { + return d.ExtraTableName +} + +func (d *DDLEvent) GetTargetSchemaName() string { + if d.TargetSchemaName != "" { + return d.TargetSchemaName + } + return d.SchemaName +} + +func (d *DDLEvent) GetTargetTableName() string { + if d.TargetTableName != "" { + return d.TargetTableName + } + return d.TableName +} + +func (d *DDLEvent) GetTargetExtraSchemaName() string { + if d.TargetExtraSchemaName != "" { + return d.TargetExtraSchemaName + } + return d.ExtraSchemaName +} + +func (d *DDLEvent) GetTargetExtraTableName() string { + if d.TargetExtraTableName != "" { + return d.TargetExtraTableName + } + return d.ExtraTableName +} + // GetTableID returns the logic table ID of the event. // it returns 0 when there is no tableinfo func (d *DDLEvent) GetTableID() int64 { @@ -230,18 +284,23 @@ func (d *DDLEvent) GetEvents() []*DDLEvent { } for i, info := range d.MultipleTableInfos { event := &DDLEvent{ - Version: d.Version, - Type: byte(t), - SchemaName: info.GetSchemaName(), - TableName: info.GetTableName(), - TableInfo: info, - Query: queries[i], - StartTs: d.StartTs, - FinishedTs: d.FinishedTs, + Version: d.Version, + Type: byte(t), + SchemaName: info.GetSchemaName(), + TableName: info.GetTableName(), + TargetSchemaName: info.GetTargetSchemaName(), + TargetTableName: info.GetTargetTableName(), + TableInfo: info, + Query: queries[i], + StartTs: d.StartTs, + FinishedTs: d.FinishedTs, } if model.ActionType(d.Type) == model.ActionRenameTables { event.ExtraSchemaName = d.TableNameChange.DropName[i].SchemaName event.ExtraTableName = d.TableNameChange.DropName[i].TableName + targetExtraSchemaName, targetExtraTableName := extractRenameTargetExtraFromQuery(queries[i]) + event.TargetExtraSchemaName = targetExtraSchemaName + event.TargetExtraTableName = targetExtraTableName } events = append(events, event) } @@ -251,6 +310,19 @@ func (d *DDLEvent) GetEvents() []*DDLEvent { return []*DDLEvent{d} } +func extractRenameTargetExtraFromQuery(query string) (string, string) { + stmt, err := parser.New().ParseOneStmt(query, "", "") + if err != nil { + log.Panic("parse split rename query failed", zap.String("query", query), zap.Error(err)) + } + renameStmt, ok := stmt.(*ast.RenameTableStmt) + if !ok || len(renameStmt.TableToTables) == 0 { + log.Panic("unexpected split rename query", zap.String("query", query), zap.Any("stmt", stmt)) + } + oldTable := renameStmt.TableToTables[0].OldTable + return oldTable.Schema.O, oldTable.Name.O +} + func (d *DDLEvent) GetSeq() uint64 { return d.Seq } @@ -299,6 +371,13 @@ func (e *DDLEvent) GetDDLQuery() string { return e.Query } +func (e *DDLEvent) GetDDLSchemaName() string { + if e == nil { + return "" + } + return e.GetTargetSchemaName() +} + func (e *DDLEvent) GetDDLType() model.ActionType { return model.ActionType(e.Type) } @@ -479,6 +558,45 @@ func (t *DDLEvent) IsPaused() bool { return false } +// CloneForRouting creates a shallow copy of the DDLEvent that can safely be mutated +// for table-route purposes without affecting the original event. +// +// The clone shares most read-only fields with the original. Slice fields that can be +// replaced independently downstream are copied so routing can update them without +// mutating shared state. +func (d *DDLEvent) CloneForRouting() *DDLEvent { + if d == nil { + return nil + } + + // Create shallow copy + clone := *d + + // PostTxnFlushed needs its own backing array to prevent potential races. + // Currently, DDL events arrive with nil PostTxnFlushed (callbacks are added + // downstream by basic_dispatcher.go), so append(nil, f) naturally creates a + // fresh slice. However, we make an explicit copy here for future-proofing: + // if any code path later adds callbacks before cloning, sharing the backing + // array could cause nondeterministic callback visibility or data races. + if d.PostTxnFlushed != nil { + clone.PostTxnFlushed = make([]func(), len(d.PostTxnFlushed)) + copy(clone.PostTxnFlushed, d.PostTxnFlushed) + } + + // MultipleTableInfos needs a new slice so each dispatcher can independently + // apply routing to its elements without affecting others + if d.MultipleTableInfos != nil { + clone.MultipleTableInfos = make([]*common.TableInfo, len(d.MultipleTableInfos)) + copy(clone.MultipleTableInfos, d.MultipleTableInfos) + } + + if d.BlockedTableNames != nil { + clone.BlockedTableNames = append([]SchemaTableName(nil), d.BlockedTableNames...) + } + + return &clone +} + func (t *DDLEvent) Len() int32 { return 1 } diff --git a/pkg/common/event/ddl_event_test.go b/pkg/common/event/ddl_event_test.go index 6cb58852e8..2921b33096 100644 --- a/pkg/common/event/ddl_event_test.go +++ b/pkg/common/event/ddl_event_test.go @@ -21,6 +21,8 @@ import ( "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/ast" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -503,3 +505,192 @@ INSERT INTO test VALUES (1); }) } } + +// TestDDLEventCloneForRouting tests the CloneForRouting method to ensure it properly +// clones DDL events to avoid race conditions between multiple dispatchers +func TestDDLEventCloneForRouting(t *testing.T) { + helper := NewEventTestHelper(t) + defer helper.Close() + + helper.tk.MustExec("use test") + ddlJob := helper.DDL2Job(createTableSQL) + require.NotNil(t, ddlJob) + + // Create original DDL event with all fields populated + originalTableInfo := common.WrapTableInfo(ddlJob.SchemaName, ddlJob.BinlogInfo.TableInfo) + originalTableInfo.InitPrivateFields() + + multipleTableInfo1 := common.WrapTableInfo("schema1", ddlJob.BinlogInfo.TableInfo) + multipleTableInfo1.InitPrivateFields() + multipleTableInfo2 := common.WrapTableInfo("schema2", ddlJob.BinlogInfo.TableInfo) + multipleTableInfo2.InitPrivateFields() + + postFlushFunc1 := func() {} + postFlushFunc2 := func() {} + + original := &DDLEvent{ + Version: DDLEventVersion1, + DispatcherID: common.NewDispatcherID(), + Type: byte(ddlJob.Type), + SchemaID: ddlJob.SchemaID, + SchemaName: ddlJob.SchemaName, + TableName: ddlJob.TableName, + Query: ddlJob.Query, + TableInfo: originalTableInfo, + FinishedTs: ddlJob.BinlogInfo.FinishedTS, + Seq: 1, + Epoch: 2, + MultipleTableInfos: []*common.TableInfo{multipleTableInfo1, multipleTableInfo2}, + PostTxnFlushed: []func(){postFlushFunc1, postFlushFunc2}, + TiDBOnly: true, + BDRMode: "test-mode", + } + + // Clone the event + cloned := original.CloneForRouting() + require.NotNil(t, cloned) + + // Verify that cloned is a separate object + require.False(t, original == cloned, "cloned event should be a different object") + + // Verify that immutable fields are shared (shallow copy) + require.Equal(t, original.Version, cloned.Version) + require.Equal(t, original.DispatcherID, cloned.DispatcherID) + require.Equal(t, original.Type, cloned.Type) + require.Equal(t, original.SchemaID, cloned.SchemaID) + require.Equal(t, original.SchemaName, cloned.SchemaName) + require.Equal(t, original.TableName, cloned.TableName) + require.Equal(t, original.Query, cloned.Query) + require.Equal(t, original.FinishedTs, cloned.FinishedTs) + require.Equal(t, original.Seq, cloned.Seq) + require.Equal(t, original.Epoch, cloned.Epoch) + require.Equal(t, original.TiDBOnly, cloned.TiDBOnly) + require.Equal(t, original.BDRMode, cloned.BDRMode) + + // Verify that TableInfo pointer is shared initially + require.True(t, original.TableInfo == cloned.TableInfo, "TableInfo should be shared initially") + + // Verify that MultipleTableInfos is a new slice (but points to same TableInfo objects initially) + require.False(t, &original.MultipleTableInfos[0] == &cloned.MultipleTableInfos[0], "MultipleTableInfos should be a new slice") + require.True(t, original.MultipleTableInfos[0] == cloned.MultipleTableInfos[0], "MultipleTableInfos elements should be shared initially") + require.True(t, original.MultipleTableInfos[1] == cloned.MultipleTableInfos[1], "MultipleTableInfos elements should be shared initially") + + // Verify that PostTxnFlushed is an independent copy (not shared) + // This is defensive: currently DDL events arrive with nil PostTxnFlushed, + // but we copy it to prevent races if callbacks are ever added before cloning. + require.NotNil(t, cloned.PostTxnFlushed) + require.Equal(t, 2, len(cloned.PostTxnFlushed), "PostTxnFlushed should have same length as original") + require.Equal(t, 2, len(original.PostTxnFlushed), "Original PostTxnFlushed should remain unchanged") + // Verify independent backing arrays - appending to clone should not affect original + require.NotEqual(t, &original.PostTxnFlushed[0], &cloned.PostTxnFlushed[0], "PostTxnFlushed should have independent backing arrays") + + // Verify that appending to cloned PostTxnFlushed doesn't affect original + cloned.AddPostFlushFunc(func() {}) + require.Equal(t, 3, len(cloned.PostTxnFlushed), "Clone should have appended callback") + require.Equal(t, 2, len(original.PostTxnFlushed), "Original should be unaffected by clone's append") + + // Now simulate what happens during routing: mutate the cloned event + cloned.SchemaName = "routed_schema" + cloned.Query = "CREATE TABLE routed_schema.test ..." + newRoutedTableInfo := originalTableInfo.CloneWithRouting("routed_schema", "test") + cloned.TableInfo = newRoutedTableInfo + cloned.MultipleTableInfos[0] = multipleTableInfo1.CloneWithRouting("routed_schema1", "table1") + cloned.MultipleTableInfos[1] = multipleTableInfo2.CloneWithRouting("routed_schema2", "table2") + + // Verify that mutations to cloned event don't affect the original + require.Equal(t, ddlJob.SchemaName, original.SchemaName, "Original SchemaName should be unchanged") + require.Equal(t, ddlJob.Query, original.Query, "Original Query should be unchanged") + require.True(t, original.TableInfo == originalTableInfo, "Original TableInfo should be unchanged") + require.True(t, original.MultipleTableInfos[0] == multipleTableInfo1, "Original MultipleTableInfos[0] should be unchanged") + require.True(t, original.MultipleTableInfos[1] == multipleTableInfo2, "Original MultipleTableInfos[1] should be unchanged") + + // Verify that cloned event has the mutations + require.Equal(t, "routed_schema", cloned.TargetSchemaName) + require.Equal(t, "CREATE TABLE routed_schema.test ...", cloned.Query) + require.True(t, cloned.TableInfo == newRoutedTableInfo) + require.Equal(t, "routed_schema", cloned.TableInfo.TableName.TargetSchema) + require.Equal(t, original.SchemaName, cloned.GetSourceSchemaName()) + require.Equal(t, original.TableName, cloned.GetSourceTableName()) + + // Test cloning nil event + var nilEvent *DDLEvent + clonedNil := nilEvent.CloneForRouting() + require.Nil(t, clonedNil) +} + +func TestCloneForRoutingPreservesSourceFields(t *testing.T) { + original := &DDLEvent{ + SchemaName: "source_db", + TableName: "new_orders", + ExtraSchemaName: "source_db", + ExtraTableName: "old_orders", + TargetSchemaName: "target_db", + TargetTableName: "new_orders_routed", + TargetExtraSchemaName: "target_db", + TargetExtraTableName: "old_orders_routed", + } + + cloned := original.CloneForRouting() + cloned.TargetSchemaName = "target_db_v2" + cloned.TargetTableName = "new_orders_routed_v2" + cloned.TargetExtraSchemaName = "target_db_v2" + cloned.TargetExtraTableName = "old_orders_routed_v2" + + require.Equal(t, "source_db", cloned.GetSourceSchemaName()) + require.Equal(t, "new_orders", cloned.GetSourceTableName()) + require.Equal(t, "source_db", cloned.GetSourceExtraSchemaName()) + require.Equal(t, "old_orders", cloned.GetSourceExtraTableName()) + require.Equal(t, "target_db_v2", cloned.GetTargetSchemaName()) + require.Equal(t, "new_orders_routed_v2", cloned.GetTargetTableName()) + require.Equal(t, "target_db_v2", cloned.GetTargetExtraSchemaName()) + require.Equal(t, "old_orders_routed_v2", cloned.GetTargetExtraTableName()) +} + +func TestGetEventsForRenameTablesPreservesSourceAndTargetNames(t *testing.T) { + sourceTable1 := common.WrapTableInfo("new_db1", &model.TableInfo{ + ID: 100, + Name: ast.NewCIStr("new_table1"), + UpdateTS: 10, + }) + sourceTable2 := common.WrapTableInfo("new_db2", &model.TableInfo{ + ID: 101, + Name: ast.NewCIStr("new_table2"), + UpdateTS: 11, + }) + + ddl := &DDLEvent{ + Type: byte(model.ActionRenameTables), + Query: "RENAME TABLE `old_target_db1`.`old_target_table1` TO `new_target_db1`.`new_target_table1`; RENAME TABLE `old_target_db2`.`old_target_table2` TO `new_target_db2`.`new_target_table2`", + MultipleTableInfos: []*common.TableInfo{ + sourceTable1.CloneWithRouting("new_target_db1", "new_target_table1"), + sourceTable2.CloneWithRouting("new_target_db2", "new_target_table2"), + }, + TableNameChange: &TableNameChange{ + DropName: []SchemaTableName{ + {SchemaName: "old_db1", TableName: "old_table1"}, + {SchemaName: "old_db2", TableName: "old_table2"}, + }, + }, + } + + events := ddl.GetEvents() + require.Len(t, events, 2) + + require.Equal(t, "new_db1", events[0].SchemaName) + require.Equal(t, "new_table1", events[0].TableName) + require.Equal(t, "new_target_db1", events[0].TargetSchemaName) + require.Equal(t, "new_target_table1", events[0].TargetTableName) + require.Equal(t, "old_db1", events[0].ExtraSchemaName) + require.Equal(t, "old_table1", events[0].ExtraTableName) + require.Equal(t, "old_target_db1", events[0].TargetExtraSchemaName) + require.Equal(t, "old_target_table1", events[0].TargetExtraTableName) + + require.Equal(t, "new_db2", events[1].SchemaName) + require.Equal(t, "new_table2", events[1].TableName) + require.Equal(t, "new_target_db2", events[1].TargetSchemaName) + require.Equal(t, "new_target_table2", events[1].TargetTableName) + require.Equal(t, "old_db2", events[1].ExtraSchemaName) + require.Equal(t, "old_table2", events[1].ExtraTableName) + require.Equal(t, "old_target_db2", events[1].TargetExtraSchemaName) + require.Equal(t, "old_target_table2", events[1].TargetExtraTableName) +} diff --git a/pkg/common/event/dml_event.go b/pkg/common/event/dml_event.go index 8868da40dd..48f75bfcff 100644 --- a/pkg/common/event/dml_event.go +++ b/pkg/common/event/dml_event.go @@ -278,17 +278,37 @@ func (b *BatchDMLEvent) encodeV1() ([]byte, error) { // AssembleRows assembles the Rows from the RawRows. // It also sets the TableInfo and clears the RawRows. +// For local events (same node, b.Rows already set), it only applies routing +// without replacing the TableInfo to preserve schema version compatibility. func (b *BatchDMLEvent) AssembleRows(tableInfo *common.TableInfo) { + if tableInfo == nil { + log.Panic("DMLEvent: TableInfo is nil") + } + defer func() { b.TableInfo.InitPrivateFields() }() - // rows is already set, no need to assemble again - // When the event is passed from the same node, the Rows is already set. + + // For local events (same node), rows are already set. + // If routing is configured, reassign the TableInfo pointer to the passed tableInfo + // (which already has TargetSchema/TargetTable set via CloneWithRouting). + // IMPORTANT: We modify the POINTER, not the object it points to, because the + // original TableInfo is shared from the schema store across all dispatchers. if b.Rows != nil { + if tableInfo.TableName.TargetSchema != "" || tableInfo.TableName.TargetTable != "" { + b.TableInfo = tableInfo + for _, dml := range b.DMLEvents { + dml.TableInfo = tableInfo + } + } return } - if tableInfo == nil { - log.Panic("DMLEvent: TableInfo is nil") + + // For remote events, verify schema version compatibility before replacing TableInfo + if b.TableInfo != nil && b.TableInfo.GetUpdateTS() != tableInfo.GetUpdateTS() { + log.Panic("DMLEvent: TableInfoVersion mismatch", + zap.Uint64("dmlEventTableInfoVersion", b.TableInfo.GetUpdateTS()), + zap.Uint64("tableInfoVersion", tableInfo.GetUpdateTS())) return } @@ -297,10 +317,6 @@ func (b *BatchDMLEvent) AssembleRows(tableInfo *common.TableInfo) { return } - if b.TableInfo != nil && b.TableInfo.GetUpdateTS() != tableInfo.GetUpdateTS() { - log.Panic("DMLEvent: TableInfoVersion mismatch", zap.Uint64("dmlEventTableInfoVersion", b.TableInfo.GetUpdateTS()), zap.Uint64("tableInfoVersion", tableInfo.GetUpdateTS())) - return - } decoder := chunk.NewCodec(tableInfo.GetFieldSlice()) b.Rows, _ = decoder.Decode(b.RawRows) b.TableInfo = tableInfo diff --git a/pkg/common/event/redo.go b/pkg/common/event/redo.go index 5f3b48ad9e..de051cc5a5 100644 --- a/pkg/common/event/redo.go +++ b/pkg/common/event/redo.go @@ -17,7 +17,7 @@ import ( "fmt" "github.com/pingcap/log" - commonType "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/util" timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" @@ -51,10 +51,10 @@ type RedoDMLEvent struct { // RedoDDLEvent represents DDL event used in redo log persistent type RedoDDLEvent struct { - DDL *DDLEventInRedoLog `msg:"ddl"` - Type byte `msg:"type"` - TableName commonType.TableName `msg:"table-name"` - TableSchemaStore *TableSchemaStore `msg:"table-schema-store"` + DDL *DDLEventInRedoLog `msg:"ddl"` + Type byte `msg:"type"` + TableName common.TableName `msg:"table-name"` + TableSchemaStore *TableSchemaStore `msg:"table-schema-store"` } // DMLEventInRedoLog is used to store DMLEvent in redo log v2 format @@ -64,7 +64,7 @@ type DMLEventInRedoLog struct { // Table contains the table name and table ID. // NOTICE: We store the physical table ID here, not the logical table ID. - Table *commonType.TableName `msg:"table"` + Table *common.TableName `msg:"table"` Columns []*RedoColumn `msg:"columns"` PreColumns []*RedoColumn `msg:"pre-columns"` @@ -105,7 +105,7 @@ type RedoRowEvent struct { StartTs uint64 CommitTs uint64 PhysicalTableID int64 - TableInfo *commonType.TableInfo + TableInfo *common.TableInfo Event RowChange Callback func() } @@ -142,29 +142,31 @@ func (r *RedoRowEvent) ToRedoLog() *RedoLog { Type: RedoLogTypeRow, } if r.TableInfo != nil { - redoLog.RedoRow.Row.Table = &commonType.TableName{ - Schema: r.TableInfo.TableName.Schema, - Table: r.TableInfo.TableName.Table, - TableID: r.PhysicalTableID, - IsPartition: r.TableInfo.TableName.IsPartition, + redoLog.RedoRow.Row.Table = &common.TableName{ + Schema: r.TableInfo.TableName.Schema, + Table: r.TableInfo.TableName.Table, + TableID: r.PhysicalTableID, + IsPartition: r.TableInfo.TableName.IsPartition, + TargetSchema: r.TableInfo.TableName.TargetSchema, + TargetTable: r.TableInfo.TableName.TargetTable, } redoLog.RedoRow.Row.IndexColumns = getIndexColumns(r.TableInfo) columnCount := len(r.TableInfo.GetColumns()) columns := make([]*RedoColumn, 0, columnCount) switch r.Event.RowType { - case commonType.RowTypeInsert: + case common.RowTypeInsert: redoLog.RedoRow.Columns = make([]RedoColumnValue, 0, columnCount) - case commonType.RowTypeDelete: + case common.RowTypeDelete: redoLog.RedoRow.PreColumns = make([]RedoColumnValue, 0, columnCount) - case commonType.RowTypeUpdate: + case common.RowTypeUpdate: redoLog.RedoRow.Columns = make([]RedoColumnValue, 0, columnCount) redoLog.RedoRow.PreColumns = make([]RedoColumnValue, 0, columnCount) default: } for i, column := range r.TableInfo.GetColumns() { - if commonType.IsColCDCVisible(column) { + if common.IsColCDCVisible(column) { columns = append(columns, &RedoColumn{ Name: column.Name.String(), Type: column.GetType(), @@ -173,13 +175,13 @@ func (r *RedoRowEvent) ToRedoLog() *RedoLog { }) isHandleKey := r.TableInfo.IsHandleKey(column.ID) switch r.Event.RowType { - case commonType.RowTypeInsert: + case common.RowTypeInsert: v := parseColumnValue(&r.Event.Row, column, i, isHandleKey) redoLog.RedoRow.Columns = append(redoLog.RedoRow.Columns, v) - case commonType.RowTypeDelete: + case common.RowTypeDelete: v := parseColumnValue(&r.Event.PreRow, column, i, isHandleKey) redoLog.RedoRow.PreColumns = append(redoLog.RedoRow.PreColumns, v) - case commonType.RowTypeUpdate: + case common.RowTypeUpdate: v := parseColumnValue(&r.Event.Row, column, i, isHandleKey) redoLog.RedoRow.Columns = append(redoLog.RedoRow.Columns, v) v = parseColumnValue(&r.Event.PreRow, column, i, isHandleKey) @@ -189,11 +191,11 @@ func (r *RedoRowEvent) ToRedoLog() *RedoLog { } } switch r.Event.RowType { - case commonType.RowTypeInsert: + case common.RowTypeInsert: redoLog.RedoRow.Row.Columns = columns - case commonType.RowTypeDelete: + case common.RowTypeDelete: redoLog.RedoRow.Row.PreColumns = columns - case commonType.RowTypeUpdate: + case common.RowTypeUpdate: redoLog.RedoRow.Row.Columns = columns redoLog.RedoRow.Row.PreColumns = columns } @@ -226,7 +228,7 @@ func (d *DDLEvent) ToRedoLog() *RedoLog { } // GetCommitTs returns commit timestamp of the log event. -func (r *RedoLog) GetCommitTs() commonType.Ts { +func (r *RedoLog) GetCommitTs() common.Ts { switch r.Type { case RedoLogTypeRow: return r.RedoRow.Row.CommitTs @@ -279,7 +281,7 @@ func (r *RedoDMLEvent) ToDMLEvent() *DMLEvent { colInfo.SetType(col.Type) colInfo.SetCharset(col.Charset) colInfo.SetCollate(col.Collation) - flag := commonType.ColumnFlagType(rawColsValue[idx].Flag) + flag := common.ColumnFlagType(rawColsValue[idx].Flag) // if flag.IsHandleKey() { // } // if flag.IsBinary(){ @@ -327,8 +329,18 @@ func (r *RedoDMLEvent) ToDMLEvent() *DMLEvent { indexInfo.Primary = isPrimary tidbTableInfo.Indices = append(tidbTableInfo.Indices, indexInfo) } + tableInfo := common.NewTableInfo4Decoder(r.Row.Table.Schema, tidbTableInfo) + // Restore routing info from redo log (TargetSchema/TargetTable for table routing). + // We must use CloneWithRouting because NewTableInfo4Decoder already called InitPrivateFields() + // which pre-computed SQL statements using the source schema/table. CloneWithRouting creates + // a new TableInfo with routing applied and uninitialized preSQLs that will be computed + // correctly when InitPrivateFields() is called. + if r.Row.Table.TargetSchema != "" || r.Row.Table.TargetTable != "" { + tableInfo = tableInfo.CloneWithRouting(r.Row.Table.TargetSchema, r.Row.Table.TargetTable) + tableInfo.InitPrivateFields() + } event := &DMLEvent{ - TableInfo: commonType.NewTableInfo4Decoder(r.Row.Table.Schema, tidbTableInfo), + TableInfo: tableInfo, CommitTs: r.Row.CommitTs, StartTs: r.Row.StartTs, Length: 1, @@ -342,15 +354,15 @@ func (r *RedoDMLEvent) ToDMLEvent() *DMLEvent { columns := event.TableInfo.GetColumns() if r.IsDelete() { collectAllColumnsValue(r.PreColumns, columns, chk) - event.RowTypes = append(event.RowTypes, commonType.RowTypeDelete) + event.RowTypes = append(event.RowTypes, common.RowTypeDelete) } else if r.IsUpdate() { collectAllColumnsValue(r.PreColumns, columns, chk) collectAllColumnsValue(r.Columns, columns, chk) // FIXME: exclude columns with same value - event.RowTypes = append(event.RowTypes, commonType.RowTypeUpdate, commonType.RowTypeUpdate) + event.RowTypes = append(event.RowTypes, common.RowTypeUpdate, common.RowTypeUpdate) } else if r.IsInsert() { collectAllColumnsValue(r.Columns, columns, chk) - event.RowTypes = append(event.RowTypes, commonType.RowTypeInsert) + event.RowTypes = append(event.RowTypes, common.RowTypeInsert) } else { log.Panic("unknown event type for the DML event") } @@ -361,18 +373,24 @@ func (r *RedoDMLEvent) ToDMLEvent() *DMLEvent { func (r *RedoDDLEvent) ToDDLEvent() *DDLEvent { blockedTables := r.DDL.BlockedTables blockedTableNames := r.DDL.BlockedTableNames + sourceSchemaName := r.TableName.GetOriginSchema() + sourceTableName := r.TableName.GetOriginTable() + targetSchemaName := r.TableName.GetTargetSchema() + targetTableName := r.TableName.GetTargetTable() if blockedTables == nil { blockedTables = &InfluencedTables{InfluenceType: InfluenceTypeNormal} - blockedTableNames = []SchemaTableName{{SchemaName: r.TableName.Schema, TableName: r.TableName.Table}} + blockedTableNames = []SchemaTableName{{SchemaName: targetSchemaName, TableName: targetTableName}} } return &DDLEvent{ - TableInfo: &commonType.TableInfo{ + TableInfo: &common.TableInfo{ TableName: r.TableName, }, Query: r.DDL.Query, Type: r.Type, - SchemaName: r.TableName.Schema, - TableName: r.TableName.Table, + SchemaName: sourceSchemaName, + TableName: sourceTableName, + TargetSchemaName: targetSchemaName, + TargetTableName: targetTableName, FinishedTs: r.DDL.CommitTs, StartTs: r.DDL.StartTs, BlockedTables: blockedTables, @@ -389,7 +407,7 @@ func (r *RedoDDLEvent) SetTableSchemaStore(tableSchemaStore *TableSchemaStore) { } func parseColumnValue(row *chunk.Row, colInfo *timodel.ColumnInfo, i int, isHandleKey bool) RedoColumnValue { - v := commonType.ExtractColVal(row, colInfo, i) + v := common.ExtractColVal(row, colInfo, i) switch colInfo.GetType() { case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: @@ -409,7 +427,7 @@ func parseColumnValue(row *chunk.Row, colInfo *timodel.ColumnInfo, i int, isHand // For compatibility func convertFlag(colInfo *timodel.ColumnInfo, isHandleKey bool) uint64 { - var flag commonType.ColumnFlagType + var flag common.ColumnFlagType if isHandleKey { flag.SetIsHandleKey() } @@ -438,7 +456,7 @@ func convertFlag(colInfo *timodel.ColumnInfo, isHandleKey bool) uint64 { } // For compatibility -func getIndexColumns(tableInfo *commonType.TableInfo) [][]int { +func getIndexColumns(tableInfo *common.TableInfo) [][]int { indexColumns := make([][]int, 0, len(tableInfo.GetIndexColumns())) rowColumnsOffset := tableInfo.GetRowColumnsOffset() for _, index := range tableInfo.GetIndexColumns() { diff --git a/pkg/common/event/redo_test.go b/pkg/common/event/redo_test.go new file mode 100644 index 0000000000..ce9408f68c --- /dev/null +++ b/pkg/common/event/redo_test.go @@ -0,0 +1,94 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package event + +import ( + "testing" + + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/stretchr/testify/require" +) + +func TestRedoDMLEventToDMLEventPreservesSourceAndTargetNames(t *testing.T) { + t.Parallel() + + helper := NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + job := helper.DDL2Job(`create table test.t(id int primary key, name varchar(32))`) + require.NotNil(t, job) + + sourceTableInfo := helper.GetTableInfo(job) + routedTableInfo := sourceTableInfo.CloneWithRouting("target_db", "target_table") + + dmlEvent := helper.DML2Event("test", "t", `insert into test.t values (1, 'alice')`) + dmlEvent.TableInfo = routedTableInfo + + row, ok := dmlEvent.GetNextRow() + require.True(t, ok) + + redoRow := (&RedoRowEvent{ + StartTs: dmlEvent.StartTs, + CommitTs: dmlEvent.CommitTs, + PhysicalTableID: dmlEvent.PhysicalTableID, + TableInfo: routedTableInfo, + Event: row, + }).ToRedoLog().RedoRow + + decoded := redoRow.ToDMLEvent() + require.Equal(t, "test", decoded.TableInfo.GetSchemaName()) + require.Equal(t, "t", decoded.TableInfo.GetTableName()) + require.Equal(t, "target_db", decoded.TableInfo.GetTargetSchemaName()) + require.Equal(t, "target_table", decoded.TableInfo.GetTargetTableName()) + require.Equal(t, "test", decoded.TableInfo.GetSourceSchemaName()) + require.Equal(t, "t", decoded.TableInfo.GetSourceTableName()) +} + +func TestRedoDDLEventToDDLEventPreservesSourceAndTargetNames(t *testing.T) { + t.Parallel() + + redoDDLEvent := &RedoDDLEvent{ + DDL: &DDLEventInRedoLog{ + StartTs: 100, + CommitTs: 200, + Query: "ALTER TABLE `target_db`.`target_table` ADD COLUMN age INT", + }, + Type: byte(model.ActionAddColumn), + TableName: common.TableName{ + Schema: "source_db", + Table: "source_table", + TargetSchema: "target_db", + TargetTable: "target_table", + }, + } + + ddlEvent := redoDDLEvent.ToDDLEvent() + require.Equal(t, "source_db", ddlEvent.SchemaName) + require.Equal(t, "source_table", ddlEvent.TableName) + require.Equal(t, "target_db", ddlEvent.TargetSchemaName) + require.Equal(t, "target_table", ddlEvent.TargetTableName) + require.Equal(t, "target_db", ddlEvent.GetDDLSchemaName()) + require.Equal(t, "source_db", ddlEvent.TableInfo.GetSchemaName()) + require.Equal(t, "source_table", ddlEvent.TableInfo.GetTableName()) + require.Equal(t, "target_db", ddlEvent.TableInfo.GetTargetSchemaName()) + require.Equal(t, "target_table", ddlEvent.TableInfo.GetTargetTableName()) + require.Equal(t, "source_db", ddlEvent.TableInfo.GetSourceSchemaName()) + require.Equal(t, "source_table", ddlEvent.TableInfo.GetSourceTableName()) + require.Equal(t, []SchemaTableName{{ + SchemaName: "target_db", + TableName: "target_table", + }}, ddlEvent.BlockedTableNames) +} diff --git a/pkg/common/table_info.go b/pkg/common/table_info.go index 8af5ebdc77..a43d2f79fe 100644 --- a/pkg/common/table_info.go +++ b/pkg/common/table_info.go @@ -125,6 +125,14 @@ func (ti *TableInfo) InitPrivateFields() { return } + // columnSchema may be nil for minimal TableInfo instances (e.g., in tests). + // In production, columnSchema is always set via WrapTableInfo or similar. + // Early return here without marking as initialized, so if columnSchema is + // set later, InitPrivateFields can be called again to properly initialize. + if ti.columnSchema == nil { + return + } + ti.preSQLs.mutex.Lock() defer ti.preSQLs.mutex.Unlock() @@ -133,13 +141,56 @@ func (ti *TableInfo) InitPrivateFields() { return } - ti.preSQLs.m[preSQLInsert] = fmt.Sprintf(ti.columnSchema.PreSQLs[preSQLInsert], ti.TableName.QuoteString()) - ti.preSQLs.m[preSQLReplace] = fmt.Sprintf(ti.columnSchema.PreSQLs[preSQLReplace], ti.TableName.QuoteString()) - ti.preSQLs.m[preSQLUpdate] = fmt.Sprintf(ti.columnSchema.PreSQLs[preSQLUpdate], ti.TableName.QuoteString()) + ti.preSQLs.m[preSQLInsert] = fmt.Sprintf(ti.columnSchema.PreSQLs[preSQLInsert], ti.TableName.QuoteTargetString()) + ti.preSQLs.m[preSQLReplace] = fmt.Sprintf(ti.columnSchema.PreSQLs[preSQLReplace], ti.TableName.QuoteTargetString()) + ti.preSQLs.m[preSQLUpdate] = fmt.Sprintf(ti.columnSchema.PreSQLs[preSQLUpdate], ti.TableName.QuoteTargetString()) ti.preSQLs.isInitialized.Store(true) } +// CloneWithRouting creates a shallow copy of TableInfo with routing applied. +// The new TableInfo shares the same columnSchema, View, Sequence pointers +// but has its own TableName (with TargetSchema/TargetTable set) and uninitialized preSQLs. +// This is safe because: +// - columnSchema, View, Sequence are read-only after creation +// - preSQLs will be initialized later via InitPrivateFields() using the new TableName +// - TableName is a value type that gets copied +func (ti *TableInfo) CloneWithRouting(targetSchema, targetTable string) *TableInfo { + if ti == nil { + return nil + } + // Create a new TableInfo with copied basic fields + cloned := &TableInfo{ + TableName: ti.TableName, // Value copy of TableName struct + Charset: ti.Charset, + Collate: ti.Collate, + Comment: ti.Comment, + columnSchema: ti.columnSchema, // Share the pointer (read-only) + HasPKOrNotNullUK: ti.HasPKOrNotNullUK, + View: ti.View, // Share the pointer (read-only) + Sequence: ti.Sequence, // Share the pointer (read-only) + UpdateTS: ti.UpdateTS, + ActiveActiveTable: ti.ActiveActiveTable, + SoftDeleteTable: ti.SoftDeleteTable, + // preSQLs is zero-initialized (uninitialized mutex/atomic, empty strings) + } + // Apply routing to the cloned TableName while keeping Schema/Table as source names. + cloned.TableName.TargetSchema = targetSchema + cloned.TableName.TargetTable = targetTable + + // Increment refcount for the shared columnSchema and set finalizer to decrement + // when the clone is garbage collected. This prevents use-after-free if the + // original TableInfo is GC'd before the clone. + if ti.columnSchema != nil { + GetSharedColumnSchemaStorage().incColumnSchemaCount(ti.columnSchema) + runtime.SetFinalizer(cloned, func(ti *TableInfo) { + GetSharedColumnSchemaStorage().tryReleaseColumnSchema(ti.columnSchema) + }) + } + + return cloned +} + func (ti *TableInfo) Marshal() ([]byte, error) { // otherField | columnSchemaData | columnSchemaDataSize data, err := json.Marshal(ti) @@ -341,26 +392,26 @@ func (ti *TableInfo) MustGetColumnOffsetByID(id int64) int { return offset } -// GetSchemaName returns the schema name of the table +// GetSchemaName returns the source schema name carried by this TableInfo. func (ti *TableInfo) GetSchemaName() string { - return ti.TableName.Schema + return ti.TableName.GetOriginSchema() } -// GetTableName returns the table name of the table +// GetTableName returns the source table name carried by this TableInfo. func (ti *TableInfo) GetTableName() string { - return ti.TableName.Table + return ti.TableName.GetOriginTable() } func (ti *TableInfo) GetTableNameCIStr() ast.CIStr { - return ast.NewCIStr(ti.TableName.Table) + return ast.NewCIStr(ti.GetTableName()) } -// GetSchemaNamePtr returns the pointer to the schema name of the table +// GetSchemaNamePtr returns the pointer to the source schema name. func (ti *TableInfo) GetSchemaNamePtr() *string { return &ti.TableName.Schema } -// GetTableNamePtr returns the pointer to the table name of the table +// GetTableNamePtr returns the pointer to the source table name. func (ti *TableInfo) GetTableNamePtr() *string { return &ti.TableName.Table } @@ -370,6 +421,28 @@ func (ti *TableInfo) IsPartitionTable() bool { return ti.TableName.IsPartition } +// GetTargetSchemaName returns the target schema name for routing. +// If TargetSchema is empty, returns Schema. +func (ti *TableInfo) GetTargetSchemaName() string { + return ti.TableName.GetTargetSchema() +} + +// GetTargetTableName returns the target table name for routing. +// If TargetTable is empty, returns Table. +func (ti *TableInfo) GetTargetTableName() string { + return ti.TableName.GetTargetTable() +} + +// GetSourceSchemaName returns the source schema name before routing. +func (ti *TableInfo) GetSourceSchemaName() string { + return ti.TableName.GetOriginSchema() +} + +// GetSourceTableName returns the source table name before routing. +func (ti *TableInfo) GetSourceTableName() string { + return ti.TableName.GetOriginTable() +} + // IsView checks if TableInfo is a view. func (t *TableInfo) IsView() bool { return t.View != nil diff --git a/pkg/common/table_info_test.go b/pkg/common/table_info_test.go index 55c7e3fb22..b3d6607e0c 100644 --- a/pkg/common/table_info_test.go +++ b/pkg/common/table_info_test.go @@ -24,6 +24,82 @@ import ( "github.com/stretchr/testify/require" ) +func TestCloneWithRouting(t *testing.T) { + t.Parallel() + + t.Run("nil TableInfo", func(t *testing.T) { + var ti *TableInfo + cloned := ti.CloneWithRouting("target_schema", "target_table") + require.Nil(t, cloned) + }) + + t.Run("basic cloning with routing", func(t *testing.T) { + original := &TableInfo{ + TableName: TableName{ + Schema: "source_db", + Table: "source_table", + TableID: 123, + }, + Charset: "utf8mb4", + Collate: "utf8mb4_bin", + Comment: "test table", + HasPKOrNotNullUK: true, + UpdateTS: 1000, + } + + cloned := original.CloneWithRouting("target_db", "target_table") + + // Verify cloned has routing applied + require.Equal(t, "source_db", cloned.TableName.Schema) + require.Equal(t, "source_table", cloned.TableName.Table) + require.Equal(t, "target_db", cloned.TableName.TargetSchema) + require.Equal(t, "target_table", cloned.TableName.TargetTable) + require.Equal(t, int64(123), cloned.TableName.TableID) + require.Equal(t, "source_db", cloned.GetSchemaName()) + require.Equal(t, "source_table", cloned.GetTableName()) + require.Equal(t, "source_db", cloned.GetSourceSchemaName()) + require.Equal(t, "source_table", cloned.GetSourceTableName()) + require.Equal(t, "target_db", cloned.GetTargetSchemaName()) + require.Equal(t, "target_table", cloned.GetTargetTableName()) + require.Equal(t, "source_db.source_table", cloned.TableName.String()) + require.Equal(t, "`source_db`.`source_table`", cloned.TableName.QuoteString()) + require.Equal(t, "source_db.source_table", cloned.TableName.OriginString()) + require.Equal(t, "`source_db`.`source_table`", cloned.TableName.QuoteOriginString()) + require.Same(t, &cloned.TableName.Schema, cloned.GetSchemaNamePtr()) + require.Same(t, &cloned.TableName.Table, cloned.GetTableNamePtr()) + + // Verify other fields are copied + require.Equal(t, "utf8mb4", cloned.Charset) + require.Equal(t, "utf8mb4_bin", cloned.Collate) + require.Equal(t, "test table", cloned.Comment) + require.Equal(t, true, cloned.HasPKOrNotNullUK) + require.Equal(t, uint64(1000), cloned.UpdateTS) + + // Verify original is NOT modified + require.Equal(t, "", original.TableName.TargetSchema) + require.Equal(t, "", original.TableName.TargetTable) + }) + + t.Run("target getters remain available without changing source fields", func(t *testing.T) { + original := &TableInfo{ + TableName: TableName{ + Schema: "source_db", + Table: "source_table", + TableID: 123, + }, + } + + cloned := original.CloneWithRouting("target_db", "target_table") + + require.Equal(t, "source_db", cloned.GetSchemaName()) + require.Equal(t, "source_table", cloned.GetTableName()) + require.Equal(t, "source_db", cloned.GetSourceSchemaName()) + require.Equal(t, "source_table", cloned.GetSourceTableName()) + require.Equal(t, "target_db", cloned.GetTargetSchemaName()) + require.Equal(t, "target_table", cloned.GetTargetTableName()) + }) +} + func TestUnmarshalJSONToTableInfoInvalidData(t *testing.T) { t.Parallel() diff --git a/pkg/common/table_name.go b/pkg/common/table_name.go index 646a18c6a7..728d80b3cb 100644 --- a/pkg/common/table_name.go +++ b/pkg/common/table_name.go @@ -26,14 +26,66 @@ type TableName struct { // TableID is the logic table ID TableID int64 `toml:"tbl-id" msg:"tbl-id"` IsPartition bool `toml:"is-partition" msg:"is-partition"` + + // TargetSchema and TargetTable is not empty if table routing enabled + TargetSchema string `toml:"target-db-name" msg:"target-db-name"` + TargetTable string `toml:"target-tbl-name" msg:"target-tbl-name"` } // String implements fmt.Stringer interface. func (t TableName) String() string { - return fmt.Sprintf("%s.%s", t.Schema, t.Table) + return t.OriginString() +} + +// OriginString returns the source schema.table string. +func (t TableName) OriginString() string { + return fmt.Sprintf("%s.%s", t.GetOriginSchema(), t.GetOriginTable()) } -// QuoteString returns quoted full table name +// QuoteString returns quoted full canonical table name. func (t TableName) QuoteString() string { - return QuoteSchema(t.Schema, t.Table) + return t.QuoteOriginString() +} + +// QuoteOriginString returns quoted full source table name. +func (t TableName) QuoteOriginString() string { + return QuoteSchema(t.GetOriginSchema(), t.GetOriginTable()) +} + +// GetOriginSchema returns the source schema name. +func (t *TableName) GetOriginSchema() string { + return t.Schema +} + +// GetOriginTable returns the source table name. +func (t *TableName) GetOriginTable() string { + return t.Table +} + +// GetTargetSchema returns the target schema name for routing. +// If TargetSchema is empty, returns Schema. +func (t *TableName) GetTargetSchema() string { + if t.TargetSchema != "" { + return t.TargetSchema + } + return t.Schema +} + +// GetTargetTable returns the target table name for routing. +// If TargetTable is empty, returns Table. +func (t *TableName) GetTargetTable() string { + if t.TargetTable != "" { + return t.TargetTable + } + return t.Table +} + +// TargetString returns the target schema.table string for routing. +func (t TableName) TargetString() string { + return fmt.Sprintf("%s.%s", t.GetTargetSchema(), t.GetTargetTable()) +} + +// QuoteTargetString returns quoted full target table name for routing. +func (t TableName) QuoteTargetString() string { + return QuoteSchema(t.GetTargetSchema(), t.GetTargetTable()) } diff --git a/pkg/common/table_name_gen.go b/pkg/common/table_name_gen.go index 2f0df4ba92..31cb8c036e 100644 --- a/pkg/common/table_name_gen.go +++ b/pkg/common/table_name_gen.go @@ -48,6 +48,18 @@ func (z *TableName) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "IsPartition") return } + case "target-db-name": + z.TargetSchema, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TargetSchema") + return + } + case "target-tbl-name": + z.TargetTable, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TargetTable") + return + } default: err = dc.Skip() if err != nil { @@ -61,9 +73,9 @@ func (z *TableName) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *TableName) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 4 + // map header, size 6 // write "db-name" - err = en.Append(0x84, 0xa7, 0x64, 0x62, 0x2d, 0x6e, 0x61, 0x6d, 0x65) + err = en.Append(0x86, 0xa7, 0x64, 0x62, 0x2d, 0x6e, 0x61, 0x6d, 0x65) if err != nil { return } @@ -102,15 +114,35 @@ func (z *TableName) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "IsPartition") return } + // write "target-db-name" + err = en.Append(0xae, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x2d, 0x64, 0x62, 0x2d, 0x6e, 0x61, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteString(z.TargetSchema) + if err != nil { + err = msgp.WrapError(err, "TargetSchema") + return + } + // write "target-tbl-name" + err = en.Append(0xaf, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x2d, 0x74, 0x62, 0x6c, 0x2d, 0x6e, 0x61, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteString(z.TargetTable) + if err != nil { + err = msgp.WrapError(err, "TargetTable") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *TableName) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 4 + // map header, size 6 // string "db-name" - o = append(o, 0x84, 0xa7, 0x64, 0x62, 0x2d, 0x6e, 0x61, 0x6d, 0x65) + o = append(o, 0x86, 0xa7, 0x64, 0x62, 0x2d, 0x6e, 0x61, 0x6d, 0x65) o = msgp.AppendString(o, z.Schema) // string "tbl-name" o = append(o, 0xa8, 0x74, 0x62, 0x6c, 0x2d, 0x6e, 0x61, 0x6d, 0x65) @@ -121,6 +153,12 @@ func (z *TableName) MarshalMsg(b []byte) (o []byte, err error) { // string "is-partition" o = append(o, 0xac, 0x69, 0x73, 0x2d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e) o = msgp.AppendBool(o, z.IsPartition) + // string "target-db-name" + o = append(o, 0xae, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x2d, 0x64, 0x62, 0x2d, 0x6e, 0x61, 0x6d, 0x65) + o = msgp.AppendString(o, z.TargetSchema) + // string "target-tbl-name" + o = append(o, 0xaf, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x2d, 0x74, 0x62, 0x6c, 0x2d, 0x6e, 0x61, 0x6d, 0x65) + o = msgp.AppendString(o, z.TargetTable) return } @@ -166,6 +204,18 @@ func (z *TableName) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "IsPartition") return } + case "target-db-name": + z.TargetSchema, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "TargetSchema") + return + } + case "target-tbl-name": + z.TargetTable, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "TargetTable") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -180,6 +230,6 @@ func (z *TableName) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *TableName) Msgsize() (s int) { - s = 1 + 8 + msgp.StringPrefixSize + len(z.Schema) + 9 + msgp.StringPrefixSize + len(z.Table) + 7 + msgp.Int64Size + 13 + msgp.BoolSize + s = 1 + 8 + msgp.StringPrefixSize + len(z.Schema) + 9 + msgp.StringPrefixSize + len(z.Table) + 7 + msgp.Int64Size + 13 + msgp.BoolSize + 15 + msgp.StringPrefixSize + len(z.TargetSchema) + 16 + msgp.StringPrefixSize + len(z.TargetTable) return } diff --git a/pkg/config/changefeed.go b/pkg/config/changefeed.go index 4334096bdc..251c8cfa39 100644 --- a/pkg/config/changefeed.go +++ b/pkg/config/changefeed.go @@ -497,10 +497,18 @@ func (info *ChangeFeedInfo) RmUnusedFields() { } func (info *ChangeFeedInfo) rmMQOnlyFields() { - log.Info("since the downstream is not a MQ, remove MQ only fields", - zap.String("keyspace", info.ChangefeedID.Keyspace()), - zap.String("changefeed", info.ChangefeedID.Name())) - info.Config.Sink.DispatchRules = nil + // Don't nil out DispatchRules entirely - it may contain routing rules (TargetSchema/TargetTable) + // Remove only MQ-specific fields from each rule. + for _, rule := range info.Config.Sink.DispatchRules { + if rule == nil { + continue + } + rule.DispatcherRule = "" + rule.PartitionRule = "" + rule.IndexName = "" + rule.Columns = nil + rule.TopicRule = "" + } info.Config.Sink.SchemaRegistry = nil info.Config.Sink.EncoderConcurrency = nil info.Config.Sink.OnlyOutputUpdatedColumns = nil diff --git a/pkg/config/sink.go b/pkg/config/sink.go index a118692037..51052c85ad 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -16,6 +16,7 @@ package config import ( "fmt" "net/url" + "regexp" "strconv" "strings" "time" @@ -140,7 +141,6 @@ type SinkConfig struct { // Protocol is NOT available when the downstream is DB. Protocol *string `toml:"protocol" json:"protocol,omitempty"` - // DispatchRules is only available when the downstream is MQ. DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers,omitempty"` ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors,omitempty"` @@ -386,7 +386,9 @@ func (d DateSeparator) String() string { } } -// DispatchRule represents partition rule for a table. +// DispatchRules configures event routing. +// For MQ sinks, rules control topic / partition dispatching. +// TargetSchema and TargetTable configure table routing. type DispatchRule struct { Matcher []string `toml:"matcher" json:"matcher"` // Deprecated, please use PartitionRule. @@ -402,6 +404,22 @@ type DispatchRule struct { Columns []string `toml:"columns" json:"columns"` TopicRule string `toml:"topic" json:"topic"` + + // TargetSchema sets the routed downstream schema name. + // Leave it empty to keep the source schema name. + // For example, if the source table is `sales`.`orders`, `target-schema = "sales_bak"` + // writes to `sales_bak`.`orders`. + // You can also use placeholders. For example, `target-schema = "{schema}_bak"` + // becomes `sales_bak`. + TargetSchema string `toml:"target-schema" json:"target-schema"` + + // TargetTable sets the routed downstream table name. + // Leave it empty to keep the source table name. + // For example, if the source table is `sales`.`orders`, `target-table = "orders_bak"` + // writes to `sales`.`orders_bak`. + // You can also use placeholders. For example, `target-table = "{schema}_{table}"` + // becomes `sales_orders`. + TargetTable string `toml:"target-table" json:"target-table"` } // ColumnSelector represents a column selector for a table. @@ -743,6 +761,10 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { return err } + if err := s.validateTableRoute(); err != nil { + return err + } + if IsMySQLCompatibleScheme(sinkURI.Scheme) { return nil } @@ -861,6 +883,21 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { return nil } +func (s *SinkConfig) validateTableRoute() error { + for _, rule := range s.DispatchRules { + if rule.TargetSchema == "" && rule.TargetTable == "" { + continue + } + if err := validateRoutingExpression("target-schema", rule.TargetSchema); err != nil { + return err + } + if err := validateRoutingExpression("target-table", rule.TargetTable); err != nil { + return err + } + } + return nil +} + // validateAndAdjustSinkURI validate and adjust `Protocol` and `TxnAtomicity` by sinkURI. func (s *SinkConfig) validateAndAdjustSinkURI(sinkURI *url.URL) error { if sinkURI == nil { @@ -1113,3 +1150,20 @@ type OpenProtocolConfig struct { type DebeziumConfig struct { OutputOldValue bool `toml:"output-old-value" json:"output-old-value"` } + +// validRoutingExpressionRegexp accepts routing expressions made of literal text +// and the {schema}/{table} placeholders, such as "archive", "{table}_bak", or "{schema}_{table}". +var validRoutingExpressionRegexp = regexp.MustCompile(`^(?:[^{}]|\{schema\}|\{table\})*$`) + +// validateRoutingExpression validates a routing expression for a single routing field. +// Valid expressions can contain literal text and {schema} or {table} placeholders. +func validateRoutingExpression(fieldName, expr string) error { + if expr == "" || validRoutingExpressionRegexp.MatchString(expr) { + return nil + } + return cerror.ErrInvalidTableRoutingRule.GenWithStack( + "%s %q must contain only literal text, {schema}, and {table}", + fieldName, + expr, + ) +} diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index 20c0d9075c..507bb250ef 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -17,6 +17,7 @@ import ( "net/url" "testing" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/util" "github.com/stretchr/testify/require" ) @@ -294,6 +295,124 @@ func TestCheckCompatibilityWithSinkURI(t *testing.T) { } } +func TestValidateTableRoute(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + cfg *SinkConfig + wantErr string + }{ + { + name: "valid routing rule", + cfg: &SinkConfig{ + DispatchRules: []*DispatchRule{ + { + Matcher: []string{"db1.*"}, + TargetSchema: "archive", + TargetTable: "{table}_bak", + }, + }, + }, + }, + { + name: "invalid target schema expression", + cfg: &SinkConfig{ + DispatchRules: []*DispatchRule{ + { + Matcher: []string{"db1.*"}, + TargetSchema: "{bad}", + TargetTable: "{table}_bak", + }, + }, + }, + wantErr: "target-schema", + }, + { + name: "mq dispatch rule ignored", + cfg: &SinkConfig{ + DispatchRules: []*DispatchRule{ + { + Matcher: []string{"db1.*"}, + PartitionRule: "columns", + Columns: []string{"id"}, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.cfg.validateTableRoute() + if tc.wantErr == "" { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, tc.wantErr) + } + }) + } +} + +func TestValidateRoutingExpression(t *testing.T) { + t.Parallel() + + validExpressions := []string{ + "", + "archive", + "orders_bak", + "archive_v2", + "db-01.table_02", + "{schema}", + "{table}", + "{schema}_{table}", + "{table}_bak", + "bak_{table}_v2", + "{schema}_backup", + "archive_{schema}_{table}_v2", + "prefix_{schema}_middle_{table}_suffix", + "{schema}_{schema}_{table}", + } + + for _, expr := range validExpressions { + name := expr + if name == "" { + name = "empty" + } + t.Run(name, func(t *testing.T) { + require.NoError(t, validateRoutingExpression("target-table", expr)) + }) + } +} + +func TestValidateRoutingExpressionRejectsInvalidExpressions(t *testing.T) { + t.Parallel() + + invalidExpressions := []string{ + "{invalid}", + "{Schema}", + "{TABLE}", + "{schema", + "schema}", + "{table", + "{", + "}", + "{{schema}}", + "{schema}{bad}", + "prefix_{schema}_{bad}", + } + + for _, expr := range invalidExpressions { + t.Run(expr, func(t *testing.T) { + err := validateRoutingExpression("target-table", expr) + require.Error(t, err) + code, ok := errors.RFCCode(err) + require.True(t, ok) + require.Equal(t, errors.ErrInvalidTableRoutingRule.RFCCode(), code) + }) + } +} + func TestValidateAndAdjustCSVConfig(t *testing.T) { t.Parallel() tests := []struct { diff --git a/pkg/errors/error.go b/pkg/errors/error.go index 3d0ce1c0cb..d06dfacfe4 100644 --- a/pkg/errors/error.go +++ b/pkg/errors/error.go @@ -261,6 +261,14 @@ var ( "sink config invalid", errors.RFCCodeText("CDC:ErrSinkInvalidConfig"), ) + ErrInvalidTableRoutingRule = errors.Normalize( + "invalid table routing rule", + errors.RFCCodeText("CDC:ErrInvalidTableRoutingRule"), + ) + ErrTableRoutingFailed = errors.Normalize( + "table routing failed", + errors.RFCCodeText("CDC:ErrTableRoutingFailed"), + ) ErrMessageTooLarge = errors.Normalize( "message is too large. table:%s, length:%d, maxMessageBytes:%d", errors.RFCCodeText("CDC:ErrMessageTooLarge"), diff --git a/tests/integration_tests/api_v2/model.go b/tests/integration_tests/api_v2/model.go index 59d2b9f3ad..14e2c2e90c 100644 --- a/tests/integration_tests/api_v2/model.go +++ b/tests/integration_tests/api_v2/model.go @@ -256,6 +256,8 @@ type DispatchRule struct { TopicRule string `json:"topic"` IndexName string `json:"index,omitempty"` Columns []string `json:"columns,omitempty"` + TargetSchema string `json:"target-schema,omitempty"` + TargetTable string `json:"target-table,omitempty"` } // ColumnSelector represents a column selector for a table.