Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
IndexName: rule.IndexName,
Columns: rule.Columns,
TopicRule: rule.TopicRule,
TargetSchema: rule.TargetSchema,
TargetTable: rule.TargetTable,
})
}
var columnSelectors []*config.ColumnSelector
Expand Down Expand Up @@ -700,6 +702,8 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
IndexName: rule.IndexName,
Columns: rule.Columns,
TopicRule: rule.TopicRule,
TargetSchema: rule.TargetSchema,
TargetTable: rule.TargetTable,
})
}
var columnSelectors []*ColumnSelector
Expand Down Expand Up @@ -1190,6 +1194,21 @@ type DispatchRule struct {
IndexName string `json:"index,omitempty"`
Columns []string `json:"columns,omitempty"`
TopicRule string `json:"topic,omitempty"`

// TargetSchema sets the routed downstream schema name.
// Leave it empty to keep the source schema name.
// For example, if the source table is `sales`.`orders`, `target-schema = "sales_bak"`
// writes to `sales_bak`.`orders`.
// You can also use placeholders. For example, `target-schema = "{schema}_bak"`
// the target schema becomes `sales_bak`.
TargetSchema string `json:"target-schema,omitempty"`
// TargetTable sets the routed downstream table name.
// Leave it empty to keep the source table name.
// For example, if the source table is `sales`.`orders`, `target-table = "orders_bak"`
// writes to `sales`.`orders_bak`.
// You can also use placeholders. For example, `target-table = "{schema}_{table}"`
// becomes `sales_orders`.
TargetTable string `json:"target-table,omitempty"`
}

// ColumnSelector represents a column selector for a table.
Expand Down
2 changes: 2 additions & 0 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/routing"
"github.com/pingcap/ticdc/downstreamadapter/sink"
"github.com/pingcap/ticdc/eventpb"
"github.com/pingcap/ticdc/heartbeatpb"
Expand All @@ -43,6 +44,7 @@ type DispatcherService interface {
GetBDRMode() bool
GetChangefeedID() common.ChangeFeedID
GetTableSpan() *heartbeatpb.TableSpan
GetRouter() *routing.Router
GetTimezone() string
GetIntegrityConfig() *eventpb.IntegrityConfig
GetFilterConfig() *eventpb.FilterConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive
nil,
nil,
false,
nil, // router
statuses,
blockStatuses,
errCh,
Expand Down
21 changes: 21 additions & 0 deletions downstreamadapter/dispatcher/basic_dispatcher_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/ticdc/downstreamadapter/routing"
"github.com/pingcap/ticdc/downstreamadapter/syncpoint"
"github.com/pingcap/ticdc/eventpb"
"github.com/pingcap/ticdc/heartbeatpb"
Expand Down Expand Up @@ -54,6 +55,11 @@ type SharedInfo struct {
// will break the splittability of this table.
enableSplittableCheck bool

// router is used to route source schema/table names to target schema/table names.
// It is used to apply routing to TableInfo before storing it.
// May be nil if no routing rules are configured.
router *routing.Router

// Shared resources
// statusesChan is used to store the status of dispatchers when status changed
// and push to heartbeatRequestQueue
Expand Down Expand Up @@ -87,6 +93,7 @@ func NewSharedInfo(
syncPointConfig *syncpoint.SyncPointConfig,
txnAtomicity *config.AtomicityLevel,
enableSplittableCheck bool,
router *routing.Router,
statusesChan chan TableSpanStatusWithSeq,
blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus,
errCh chan error,
Expand All @@ -101,6 +108,7 @@ func NewSharedInfo(
filterConfig: filterConfig,
syncPointConfig: syncPointConfig,
enableSplittableCheck: enableSplittableCheck,
router: router,
statusesChan: statusesChan,
blockStatusesChan: blockStatusesChan,
blockExecutor: newBlockEventExecutor(),
Expand Down Expand Up @@ -176,6 +184,13 @@ func (d *BasicDispatcher) IsOutputRawChangeEvent() bool {
return d.sharedInfo.outputRawChangeEvent
}

func (d *BasicDispatcher) GetRouter() *routing.Router {
if d.sharedInfo == nil {
return nil
}
return d.sharedInfo.GetRouter()
}

func (d *BasicDispatcher) GetFilterConfig() *eventpb.FilterConfig {
return d.sharedInfo.filterConfig
}
Expand Down Expand Up @@ -266,6 +281,12 @@ func (s *SharedInfo) GetBlockEventExecutor() *blockEventExecutor {
return s.blockExecutor
}

// GetRouter returns the router for schema/table name routing.
// May return nil if no routing rules are configured.
func (s *SharedInfo) GetRouter() *routing.Router {
return s.router
}

func (s *SharedInfo) Close() {
if s.blockExecutor != nil {
s.blockExecutor.Close()
Expand Down
4 changes: 4 additions & 0 deletions downstreamadapter/dispatcher/event_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func newDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan) *Eve
}, // syncPointConfig
&defaultAtomicity,
false, // enableSplittableCheck
nil, // router
make(chan TableSpanStatusWithSeq, 128),
make(chan *heartbeatpb.TableSpanBlockStatus, 128),
make(chan error, 1),
Expand Down Expand Up @@ -1003,6 +1004,7 @@ func TestDispatcherSplittableCheck(t *testing.T) {
},
&defaultAtomicity,
true, // enableSplittableCheck = true
nil, // router
make(chan TableSpanStatusWithSeq, 128),
make(chan *heartbeatpb.TableSpanBlockStatus, 128),
make(chan error, 1),
Expand Down Expand Up @@ -1113,6 +1115,7 @@ func TestDispatcher_SkipDMLAsStartTs_FilterCorrectly(t *testing.T) {
},
&defaultAtomicity,
false,
nil, // router
make(chan TableSpanStatusWithSeq, 128),
make(chan *heartbeatpb.TableSpanBlockStatus, 128),
make(chan error, 1),
Expand Down Expand Up @@ -1193,6 +1196,7 @@ func TestDispatcher_SkipDMLAsStartTs_Disabled(t *testing.T) {
},
&defaultAtomicity,
false,
nil, // router
make(chan TableSpanStatusWithSeq, 128),
make(chan *heartbeatpb.TableSpanBlockStatus, 128),
make(chan error, 1),
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatcher/redo_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func newRedoDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan)
nil, // redo dispatcher doesn't need syncPointConfig
&defaultAtomicity,
false, // enableSplittableCheck
nil, // router
make(chan TableSpanStatusWithSeq, 128),
make(chan *heartbeatpb.TableSpanBlockStatus, 128),
make(chan error, 1),
Expand Down
20 changes: 18 additions & 2 deletions downstreamadapter/dispatchermanager/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/dispatcher"
"github.com/pingcap/ticdc/downstreamadapter/eventcollector"
"github.com/pingcap/ticdc/downstreamadapter/routing"
"github.com/pingcap/ticdc/downstreamadapter/sink"
"github.com/pingcap/ticdc/downstreamadapter/sink/mysql"
"github.com/pingcap/ticdc/downstreamadapter/sink/redo"
Expand Down Expand Up @@ -232,6 +233,14 @@ func NewDispatcherManager(
}

var err error
var router *routing.Router
if manager.config.SinkConfig != nil {
router, err = routing.NewRouter(manager.config.CaseSensitive, manager.config.SinkConfig.DispatchRules)
if err != nil {
return nil, errors.Trace(err)
}
}

manager.sink, err = sink.New(ctx, manager.config, manager.changefeedID)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -258,6 +267,7 @@ func NewDispatcherManager(
syncPointConfig,
manager.config.SinkConfig.TxnAtomicity,
manager.config.EnableSplittableCheck,
router,
make(chan dispatcher.TableSpanStatusWithSeq, 8192),
make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024),
make(chan error, 1),
Expand Down Expand Up @@ -365,7 +375,10 @@ func (e *DispatcherManager) InitalizeTableTriggerEventDispatcher(schemaInfo []*h
}

// table trigger event dispatcher can register to event collector to receive events after finish the initial table schema store from the maintainer.
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher(e.GetTableTriggerEventDispatcher(), e.sinkQuota)
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher(
e.GetTableTriggerEventDispatcher(),
e.sinkQuota,
)

// The table trigger event dispatcher needs changefeed-level checkpoint updates only
// when downstream components must maintain table names (for non-MySQL sinks), or
Expand Down Expand Up @@ -480,7 +493,10 @@ func (e *DispatcherManager) newEventDispatchers(infos map[common.DispatcherID]di
// we don't register table trigger event dispatcher in event collector, when created.
// Table trigger event dispatcher is a special dispatcher,
// it need to wait get the initial table schema store from the maintainer, then will register to event collector to receive events.
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher(d, e.sinkQuota)
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher(
d,
e.sinkQuota,
)
}

seq := e.dispatcherMap.Set(id, d)
Expand Down
10 changes: 8 additions & 2 deletions downstreamadapter/dispatchermanager/dispatcher_manager_redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ func (e *DispatcherManager) newRedoDispatchers(infos map[common.DispatcherID]dis
e.SetTableTriggerRedoDispatcher(rd)
} else {
e.redoSchemaIDToDispatchers.Set(schemaIds[idx], id)
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher(rd, e.redoQuota)
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher(
rd,
e.redoQuota,
)
}

redoSeq := e.redoDispatcherMap.Set(rd.GetId(), rd)
Expand Down Expand Up @@ -280,7 +283,10 @@ func (e *DispatcherManager) InitalizeTableTriggerRedoDispatcher(schemaInfo []*he
if !needAddDispatcher {
return nil
}
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher(e.GetTableTriggerRedoDispatcher(), e.redoQuota)
appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).AddDispatcher(
e.GetTableTriggerRedoDispatcher(),
e.redoQuota,
)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func createTestDispatcher(t *testing.T, manager *DispatcherManager, id common.Di
nil,
&defaultAtomicity,
false,
nil, // router
make(chan dispatcher.TableSpanStatusWithSeq, 1),
make(chan *heartbeatpb.TableSpanBlockStatus, 1),
make(chan error, 1),
Expand Down Expand Up @@ -140,6 +141,7 @@ func createTestManager(t *testing.T) *DispatcherManager {
nil, // syncPointConfig
&defaultAtomicity,
false,
nil, // router
make(chan dispatcher.TableSpanStatusWithSeq, 8192),
make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024),
make(chan error, 1),
Expand Down
42 changes: 39 additions & 3 deletions downstreamadapter/eventcollector/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,34 @@ func (d *dispatcherStat) handleSingleDataEvents(events []dispatcher.DispatcherEv
if !d.filterAndUpdateEventByCommitTs(events[0]) {
return false
}
ddl := events[0].Event.(*commonEvent.DDLEvent)
originalDDL := events[0].Event.(*commonEvent.DDLEvent)
ddl, err := d.applyRoutingToDDLEvent(originalDDL)
if err != nil {
log.Error("failed to apply routing to DDL event",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.Error(err))
return false
}
events[0].Event = ddl
d.tableInfoVersion.Store(ddl.FinishedTs)
if ddl.TableInfo != nil {
d.tableInfo.Store(ddl.TableInfo)
// Check if this DDL's TableInfo is for a different table.
// This can happen with CREATE TABLE LIKE, where the DDL is added to the
// reference table's history for blocking purposes, but the TableInfo is
// for the new table. In this case, we should NOT update the stored tableInfo.
dispatcherTableID := d.target.GetTableSpan().TableID
ddlTableID := ddl.TableInfo.TableName.TableID
if ddlTableID != dispatcherTableID {
log.Debug("DDL TableInfo is for a different table, skipping tableInfo update",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.Int64("dispatcherTableID", dispatcherTableID),
zap.Int64("ddlTableID", ddlTableID),
zap.String("ddlTableName", ddl.TableInfo.TableName.Table))
} else {
d.tableInfo.Store(ddl.TableInfo)
}
}
return d.target.HandleEvents(events, func() { d.wake() })
} else {
Expand Down Expand Up @@ -637,7 +661,7 @@ func (d *dispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent)
}
tableInfo := handshakeEvent.TableInfo
if tableInfo != nil {
d.tableInfo.Store(tableInfo)
d.tableInfo.Store(d.applyRoutingToTableInfo(tableInfo))
}
d.lastEventSeq.Store(handshakeEvent.Seq)
}
Expand Down Expand Up @@ -730,3 +754,15 @@ func (d *dispatcherStat) newDispatcherRemoveRequest(serverId string) *messaging.
},
}
}

// applyRoutingToTableInfo applies routing rules to the TableInfo and returns a new TableInfo
// with TargetSchema/TargetTable set. If no routing is needed (no router configured, or routing
// results in same schema/table), returns the original tableInfo unchanged.
// This avoids mutating shared TableInfo objects that may be used by multiple changefeeds.
func (d *dispatcherStat) applyRoutingToTableInfo(tableInfo *common.TableInfo) *common.TableInfo {
return d.target.GetRouter().ApplyToTableInfo(tableInfo)
}

func (d *dispatcherStat) applyRoutingToDDLEvent(ddl *commonEvent.DDLEvent) (*commonEvent.DDLEvent, error) {
return d.target.GetRouter().ApplyToDDLEvent(ddl, d.target.GetChangefeedID())
}
Loading
Loading