Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 136 additions & 14 deletions pkg/common/event/ddl_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"go.uber.org/zap"
)

Expand All @@ -41,14 +43,23 @@ type DDLEvent struct {
SchemaID int64 `json:"schema_id"`
SchemaName string `json:"schema_name"`
TableName string `json:"table_name"`

// the following two fields are just used for RenameTable,
// they are the old schema/table name of the table
ExtraSchemaName string `json:"extra_schema_name"`
ExtraTableName string `json:"extra_table_name"`
Query string `json:"query"`
TableInfo *common.TableInfo `json:"-"`
StartTs uint64 `json:"start_ts"`
FinishedTs uint64 `json:"finished_ts"`
ExtraSchemaName string `json:"extra_schema_name"`
ExtraTableName string `json:"extra_table_name"`

// target related fields carry routed names.
// They are set after the unmarshal, so no need to be serialized.
targetSchemaName string `json:"-"`
targetTableName string `json:"-"`
targetExtraSchemaName string `json:"-"`
targetExtraTableName string `json:"-"`

Query string `json:"query"`
TableInfo *common.TableInfo `json:"-"`
StartTs uint64 `json:"start_ts"`
FinishedTs uint64 `json:"finished_ts"`
// The seq of the event. It is set by event service.
Seq uint64 `json:"seq"`
// The epoch of the event. It is set by event service.
Expand Down Expand Up @@ -201,6 +212,34 @@ func (d *DDLEvent) GetExtraTableName() string {
return d.ExtraTableName
}

func (d *DDLEvent) GetTargetSchemaName() string {
if d.targetSchemaName != "" {
return d.targetSchemaName
}
return d.SchemaName
}

func (d *DDLEvent) GetTargetTableName() string {
if d.targetTableName != "" {
return d.targetTableName
}
return d.TableName
}

func (d *DDLEvent) GetTargetExtraSchemaName() string {
if d.targetExtraSchemaName != "" {
return d.targetExtraSchemaName
}
return d.ExtraSchemaName
}

func (d *DDLEvent) GetTargetExtraTableName() string {
if d.targetExtraTableName != "" {
return d.targetExtraTableName
}
return d.ExtraTableName
}

// GetTableID returns the logic table ID of the event.
// it returns 0 when there is no tableinfo
func (d *DDLEvent) GetTableID() int64 {
Expand All @@ -210,6 +249,7 @@ func (d *DDLEvent) GetTableID() int64 {
return 0
}

// GetEvents split the multi tables DDL into single table DDLs.
func (d *DDLEvent) GetEvents() []*DDLEvent {
// Some ddl event may be multi-events, we need to split it into multiple messages.
// Such as rename table test.table1 to test.table10, test.table2 to test.table20
Expand All @@ -230,18 +270,23 @@ func (d *DDLEvent) GetEvents() []*DDLEvent {
}
for i, info := range d.MultipleTableInfos {
event := &DDLEvent{
Version: d.Version,
Type: byte(t),
SchemaName: info.GetSchemaName(),
TableName: info.GetTableName(),
TableInfo: info,
Query: queries[i],
StartTs: d.StartTs,
FinishedTs: d.FinishedTs,
Version: d.Version,
Type: byte(t),
SchemaName: info.GetSchemaName(),
TableName: info.GetTableName(),
targetSchemaName: info.GetTargetSchemaName(),
targetTableName: info.GetTargetTableName(),
TableInfo: info,
Query: queries[i],
StartTs: d.StartTs,
FinishedTs: d.FinishedTs,
}
if model.ActionType(d.Type) == model.ActionRenameTables {
event.ExtraSchemaName = d.TableNameChange.DropName[i].SchemaName
event.ExtraTableName = d.TableNameChange.DropName[i].TableName
targetExtraSchemaName, targetExtraTableName := extractRenameTargetExtraFromQuery(queries[i])
event.targetExtraSchemaName = targetExtraSchemaName
event.targetExtraTableName = targetExtraTableName
}
events = append(events, event)
}
Expand All @@ -251,6 +296,19 @@ func (d *DDLEvent) GetEvents() []*DDLEvent {
return []*DDLEvent{d}
}

func extractRenameTargetExtraFromQuery(query string) (string, string) {
stmt, err := parser.New().ParseOneStmt(query, "", "")
if err != nil {
log.Panic("parse split rename query failed", zap.String("query", query), zap.Error(err))
}
renameStmt, ok := stmt.(*ast.RenameTableStmt)
if !ok || len(renameStmt.TableToTables) == 0 {
log.Panic("unexpected split rename query", zap.String("query", query), zap.Any("stmt", stmt))
}
oldTable := renameStmt.TableToTables[0].OldTable
return oldTable.Schema.O, oldTable.Name.O
}
Comment thread
3AceShowHand marked this conversation as resolved.

func (d *DDLEvent) GetSeq() uint64 {
return d.Seq
}
Expand Down Expand Up @@ -479,6 +537,70 @@ func (t *DDLEvent) IsPaused() bool {
return false
}

// NewRoutedDDLEvent builds a routed DDL event from the origin event and final routed fields.
func NewRoutedDDLEvent(
d *DDLEvent,
query string,
targetSchemaName, targetTableName string,
targetExtraSchemaName, targetExtraTableName string,
tableInfo *common.TableInfo,
multipleTableInfos []*common.TableInfo,
blockedTableNames []SchemaTableName,
) *DDLEvent {
if d == nil {
return nil
}

return &DDLEvent{
Version: d.Version,
DispatcherID: d.DispatcherID,
Type: d.Type,
SchemaID: d.SchemaID,
SchemaName: d.SchemaName,
TableName: d.TableName,
ExtraSchemaName: d.ExtraSchemaName,
ExtraTableName: d.ExtraTableName,
targetSchemaName: targetSchemaName,
targetTableName: targetTableName,
targetExtraSchemaName: targetExtraSchemaName,
targetExtraTableName: targetExtraTableName,
Query: query,
TableInfo: tableInfo,
StartTs: d.StartTs,
FinishedTs: d.FinishedTs,
Seq: d.Seq,
Epoch: d.Epoch,
// MultipleTableInfos and BlockedTableNames carry table names used by downstream
// execution paths, so the routed versions must be passed in explicitly.
MultipleTableInfos: multipleTableInfos,
BlockedTableNames: blockedTableNames,
// The following fields do not participate in table route name rewriting,
// so the routed event keeps the original values from the source event.
BlockedTables: d.BlockedTables,
NeedDroppedTables: d.NeedDroppedTables,
NeedAddedTables: d.NeedAddedTables,
UpdatedSchemas: d.UpdatedSchemas,
TableNameChange: d.TableNameChange,
TiDBOnly: d.TiDBOnly,
BDRMode: d.BDRMode,
Err: d.Err,
PostTxnFlushed: clonePostTxnFlushed(d.PostTxnFlushed),
eventSize: d.eventSize,
IsBootstrap: d.IsBootstrap,
NotSync: d.NotSync,
}
}

func clonePostTxnFlushed(postTxnFlushed []func()) []func() {
if postTxnFlushed == nil {
return nil
}

cloned := make([]func(), len(postTxnFlushed))
copy(cloned, postTxnFlushed)
return cloned
}

func (t *DDLEvent) Len() int32 {
return 1
}
Expand Down
Loading
Loading