Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions tools/workload/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import (
pbank "workload/schema/bank"
pbank2 "workload/schema/bank2"
pbank3 "workload/schema/bank3"
pbank4 "workload/schema/bank4"
"workload/schema/bankupdate"
pcrawler "workload/schema/crawler"
pdc "workload/schema/dc"
"workload/schema/largerow"
"workload/schema/shop"
psysbench "workload/schema/sysbench"
ptableinfosharing "workload/schema/table_info_sharing"
puuu "workload/schema/uuu"
pwidetablewithjson "workload/schema/wide_table_with_json"
)
Expand Down Expand Up @@ -81,8 +83,10 @@ const (
crawler = "crawler"
bank2 = "bank2"
bank3 = "bank3"
bank4 = "bank4"
bankUpdate = "bank_update"
dc = "dc"
tableInfoSharing = "table_info_sharing"
wideTableWithJSON = "wide_table_with_json"
)

Expand Down Expand Up @@ -140,10 +144,14 @@ func (app *WorkloadApp) createWorkload() schema.Workload {
workload = pbank2.NewBank2Workload()
case bank3:
workload = pbank3.NewBankWorkload(app.Config.Partitioned)
case bank4:
workload = pbank4.NewBankWorkload(app.Config.Partitioned)
case bankUpdate:
workload = bankupdate.NewBankUpdateWorkload(app.Config.TotalRowCount, app.Config.UpdateLargeColumnSize)
case dc:
workload = pdc.NewDCWorkload()
case tableInfoSharing:
workload = ptableinfosharing.NewTableInfoSharingWorkload(app.Config.TableCount, app.Config.TableStartIndex)
case wideTableWithJSON:
workload = pwidetablewithjson.NewWideTableWithJSONWorkload(app.Config.RowSize, app.Config.TableCount, app.Config.TableStartIndex, app.Config.TotalRowCount)
default:
Expand Down
2 changes: 1 addition & 1 deletion tools/workload/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (c *WorkloadConfig) ParseFlags() error {
flag.Float64Var(&c.PercentageForDelete, "percentage-for-delete", c.PercentageForDelete, "percentage for delete: [0, 1.0]")
flag.BoolVar(&c.SkipCreateTable, "skip-create-table", c.SkipCreateTable, "do not create tables")
flag.StringVar(&c.Action, "action", c.Action, "action of the workload: [prepare, insert, update, delete, write, ddl, cleanup]")
flag.StringVar(&c.WorkloadType, "workload-type", c.WorkloadType, "workload type: [bank, sysbench, large_row, shop_item, uuu, bank2, bank3, bank_update, crawler, dc, wide_table_with_json]")
flag.StringVar(&c.WorkloadType, "workload-type", c.WorkloadType, "workload type: [bank, sysbench, large_row, shop_item, uuu, bank2, bank3, bank4, bank_update, crawler, dc, table_info_sharing, wide_table_with_json]")
flag.StringVar(&c.DBHost, "database-host", c.DBHost, "database host")
flag.StringVar(&c.DBUser, "database-user", c.DBUser, "database user")
flag.StringVar(&c.DBPassword, "database-password", c.DBPassword, "database password")
Expand Down
16 changes: 13 additions & 3 deletions tools/workload/ddl_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type DDLConfig struct {
Mode string `toml:"mode"`
RatePerMinute DDLRatePerMinute `toml:"rate_per_minute"`
Tables []string `toml:"tables"`
TablePatterns []string `toml:"table_patterns"`
}

type DDLRatePerMinute struct {
Expand Down Expand Up @@ -68,7 +69,7 @@ func LoadDDLConfig(path string) (*DDLConfig, error) {
func (c *DDLConfig) normalize() {
c.Mode = strings.ToLower(strings.TrimSpace(c.Mode))
if c.Mode == "" {
if len(c.Tables) > 0 {
if len(c.Tables) > 0 || len(c.TablePatterns) > 0 {
c.Mode = ddlModeFixed
} else {
c.Mode = ddlModeRandom
Expand All @@ -84,14 +85,23 @@ func (c *DDLConfig) normalize() {
}
}
c.Tables = tables

patterns := make([]string, 0, len(c.TablePatterns))
for _, pattern := range c.TablePatterns {
pattern = strings.TrimSpace(pattern)
if pattern != "" {
patterns = append(patterns, pattern)
}
}
c.TablePatterns = patterns
}

func (c *DDLConfig) validate() error {
if c.Mode != ddlModeFixed && c.Mode != ddlModeRandom {
return errors.Errorf("unsupported ddl mode: %s", c.Mode)
}
if c.Mode == ddlModeFixed && len(c.Tables) == 0 {
return errors.New("ddl mode fixed requires tables")
if c.Mode == ddlModeFixed && len(c.Tables) == 0 && len(c.TablePatterns) == 0 {
return errors.New("ddl mode fixed requires tables or table_patterns")
}

if err := validateRate("add_column", c.RatePerMinute.AddColumn); err != nil {
Expand Down
15 changes: 15 additions & 0 deletions tools/workload/ddl_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/errors"
plog "github.com/pingcap/log"
"go.uber.org/zap"
"workload/schema"
)

const (
Expand Down Expand Up @@ -135,13 +136,27 @@ func (r *DDLRunner) executeTask(conn *sql.Conn, task DDLTask) error {
}

r.app.Stats.DDLSucceeded.Add(1)
r.onDDLExecuted(task)
plog.Debug("ddl executed",
zap.String("ddlType", task.Type.String()),
zap.String("table", task.Table.String()),
zap.Duration("cost", time.Since(start)))
return nil
}

func (r *DDLRunner) onDDLExecuted(task DDLTask) {
if task.Type != ddlTruncateTable {
return
}

workload, ok := r.app.Workload.(schema.TableLifecycleAwareWorkload)
if !ok {
return
}

workload.OnTableTruncated(task.Table.Schema, task.Table.Name)
}

func (r *DDLRunner) buildDDL(ctx context.Context, conn *sql.Conn, task DDLTask) (sqlStr string, skipped bool, reason string, err error) {
switch task.Type {
case ddlAddColumn:
Expand Down
113 changes: 102 additions & 11 deletions tools/workload/ddl_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"math/rand"
"regexp"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -72,6 +73,11 @@ func NewDDLRunner(app *WorkloadApp, cfg *DDLConfig) (*DDLRunner, error) {
if err != nil {
return nil, err
}
patternTables, err := r.resolvePatternTables(cfg.TablePatterns)
if err != nil {
return nil, err
}
tables = mergeTableNames(tables, patternTables)
r.selector = newFixedTableSelector(tables)
case ddlModeRandom:
if app.Config.DBPrefix != "" || app.Config.DBNum != 1 {
Expand Down Expand Up @@ -112,24 +118,40 @@ func (r *DDLRunner) startTypeScheduler(ddlType DDLType, perMinute int) {
return
}

interval := schedulerInterval(perMinute)
go func() {
ticker := time.NewTicker(time.Minute)
r.enqueueTask(ddlType)

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
for i := 0; i < perMinute; i++ {
table, ok := r.selector.Next()
if !ok {
r.app.Stats.DDLSkipped.Add(1)
continue
}
r.taskCh <- DDLTask{Type: ddlType, Table: table}
}
<-ticker.C
for range ticker.C {
r.enqueueTask(ddlType)
}
}()
}

func schedulerInterval(perMinute int) time.Duration {
if perMinute <= 0 {
return 0
}

interval := time.Minute / time.Duration(perMinute)
if interval <= 0 {
return time.Nanosecond
}
return interval
}

func (r *DDLRunner) enqueueTask(ddlType DDLType) {
table, ok := r.selector.Next()
if !ok {
r.app.Stats.DDLSkipped.Add(1)
return
}
r.taskCh <- DDLTask{Type: ddlType, Table: table}
}

func (r *DDLRunner) startRandomTableRefresh() {
go func() {
ticker := time.NewTicker(time.Minute)
Expand Down Expand Up @@ -283,3 +305,72 @@ func parseTableList(rawTables []string, defaultSchema string) ([]TableName, erro
}
return out, nil
}

func mergeTableNames(left []TableName, right []TableName) []TableName {
seen := make(map[string]struct{}, len(left)+len(right))
out := make([]TableName, 0, len(left)+len(right))
for _, table := range left {
key := table.String()
if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}
out = append(out, table)
}
for _, table := range right {
key := table.String()
if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}
out = append(out, table)
}
return out
}

func (r *DDLRunner) resolvePatternTables(patterns []string) ([]TableName, error) {
if len(patterns) == 0 {
return nil, nil
}
if r.app.Config.DBPrefix != "" || r.app.Config.DBNum != 1 {
return nil, errors.New("ddl table_patterns only support single database connection")
}

dbs := r.app.DBManager.GetDBs()
if len(dbs) == 0 {
return nil, errors.New("no database connections available")
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

tableNames, err := fetchBaseTables(ctx, dbs[0].DB, r.app.Config.DBName)
if err != nil {
return nil, err
}

out := make([]TableName, 0)
for _, pattern := range patterns {
re, err := regexp.Compile(pattern)
if err != nil {
return nil, errors.Annotatef(err, "compile table pattern failed: %s", pattern)
}

matched := false
for _, name := range tableNames {
if !re.MatchString(name) {
continue
}
out = append(out, TableName{
Schema: r.app.Config.DBName,
Name: name,
})
matched = true
}
if !matched {
return nil, errors.Errorf("table pattern matched no tables: %s", pattern)
}
}

return out, nil
}
20 changes: 20 additions & 0 deletions tools/workload/examples/ddl_partition_table_mixed.generated.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this with a dedicated database that contains 1000 partitioned bank4 tables.
# Run workload with -workload-type bank4 -partitioned=true -table-count 1000.
# bank4 creates 126 monthly partitions per table (2021-07 through 2031-12).
# rate_per_minute is the total rate across all matched tables.
# Current values target approximately:
# - truncate_table: each table once every 10 minutes
# - add/drop column: each table once per minute
# - add/drop index: each table once every 2 minutes
mode = "fixed"

table_patterns = [
"^bank4_[0-9]+$",
]

[rate_per_minute]
truncate_table = 3000
add_column = 1000
drop_column = 1000
add_index = 1000
drop_index = 1000
10 changes: 10 additions & 0 deletions tools/workload/examples/ddl_partition_table_mixed.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Use this with a dedicated database that only contains partitioned bank4 tables.
# bank4 creates 126 monthly partitions per table (2021-07 through 2031-12).
mode = "random"

[rate_per_minute]
truncate_table = 1
add_column = 6
drop_column = 6
add_index = 3
drop_index = 3
14 changes: 14 additions & 0 deletions tools/workload/examples/ddl_truncate_table_mixed.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Periodically run TRUNCATE TABLE while other DDLs keep changing schema.
# The scheduler spreads each rate evenly over a minute.
mode = "fixed"

table_patterns = [
"^sbtest[0-9]+$",
]

[rate_per_minute]
truncate_table = 100
add_column = 600
drop_column = 600
add_index = 300
drop_index = 300
Loading
Loading