Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import (
"github.com/pingcap/tiflow/dm/pkg/dumpling"
fr "github.com/pingcap/tiflow/dm/pkg/func-rollback"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/mariadb2tidb"
mconfig "github.com/pingcap/tiflow/dm/pkg/mariadb2tidb/config"
"github.com/pingcap/tiflow/dm/pkg/terror"
onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools"
"github.com/pingcap/tiflow/dm/unit"
Expand Down Expand Up @@ -121,6 +123,26 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, e
return c
}

func buildMariaDB2TiDBConverters(instances []*mysqlInstance) map[string]*mariadb2tidb.Converter {
converters := make(map[string]*mariadb2tidb.Converter)
for _, inst := range instances {
if inst == nil || inst.cfg == nil {
continue
}
if !inst.cfg.MariaDB2TiDB.EnabledForFlavor(inst.cfg.Flavor) {
continue
}

convCfg := mconfig.DefaultConfig()
convCfg.EnabledRules = append([]string{}, inst.cfg.MariaDB2TiDB.EnabledRules...)
convCfg.DisabledRules = append([]string{}, inst.cfg.MariaDB2TiDB.DisabledRules...)
convCfg.StrictMode = inst.cfg.MariaDB2TiDB.Strict()

converters[inst.cfg.SourceID] = mariadb2tidb.NewConverter(convCfg)
}
return converters
}

// tablePairInfo records information about a upstream-downstream(source-target) table pair.
// Members may have repeated meanings but they have different data structure to satisfy different usages.
type tablePairInfo struct {
Expand Down Expand Up @@ -376,20 +398,23 @@ func (c *Checker) Init(ctx context.Context) (err error) {
}

dumpThreads := c.instances[0].cfg.MydumperConfig.Threads
converters := buildMariaDB2TiDBConverters(c.instances)
if _, ok := c.checkingItems[config.TableSchemaChecking]; ok {
c.checkList = append(c.checkList, checker.NewTablesChecker(
upstreamDBs,
c.instances[0].targetDB,
info.sourceID2TableMap,
info.targetTable2ExtendedColumns,
dumpThreads,
converters,
))
}
if _, ok := c.checkingItems[config.PrimaryKeyChecking]; ok {
c.checkList = append(c.checkList, checker.NewPrimaryKeyChecker(
upstreamDBs,
info.sourceID2TableMap,
dumpThreads,
converters,
))
}

Expand Down
11 changes: 9 additions & 2 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ type SubTaskConfig struct {
MydumperConfig // Mydumper configuration
LoaderConfig // Loader configuration
SyncerConfig // Syncer configuration
ValidatorCfg ValidatorConfig
// MariaDB2TiDB controls MariaDB schema conversion behavior.
MariaDB2TiDB MariaDB2TiDBConfig `yaml:"mariadb2tidb" toml:"mariadb2tidb" json:"mariadb2tidb"`
ValidatorCfg ValidatorConfig

// compatible with standalone dm unit
LogLevel string `toml:"log-level" json:"log-level"`
Expand Down Expand Up @@ -202,7 +204,9 @@ var SampleSubtaskConfig string

// NewSubTaskConfig creates a new SubTaskConfig.
func NewSubTaskConfig() *SubTaskConfig {
cfg := &SubTaskConfig{}
cfg := &SubTaskConfig{
MariaDB2TiDB: DefaultMariaDB2TiDBConfig(),
}
return cfg
}

Expand Down Expand Up @@ -347,6 +351,9 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
if c.MetaSchema == "" {
c.MetaSchema = defaultMetaSchema
}
if err := c.MariaDB2TiDB.Adjust(); err != nil {
return err
}

// adjust dir. Do not do this for both load and load&sync mode, as they are standalone
// mode and should take LoaderConfig.Dir as is
Expand Down
17 changes: 17 additions & 0 deletions dm/config/subtask.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,23 @@ meta-file = "./syncer.subTaskA.meta"
worker-count = 16
batch = 1000

# MariaDB schema conversion (mariadb2tidb)
# Converts MariaDB-only DDL into TiDB-compatible SQL during load/check/sync.
# mode: auto (MariaDB sources only), on (force enable), off (disable).
# strict-mode: if false, conversion errors fall back to the original SQL.
# enabled-rules/disabled-rules: allow or block specific rule names.
# available rules: Collation, ZeroTimestamp, UUIDType, MariaDBSpecific, Constraints, TrailingComma,
# VersionMacros, AutoIncrementValues, OnUpdateCurrentTimestamp, IndexType, QualifiedNames, SystemVersioning,
# SequenceType, ColumnAttributes, IndexOptions, FulltextIndexNormalize, SpatialIndexDrop, CreateOrReplace,
# EngineOptions, IgnoredClauseCleanup, CollationFallback, KeyLength, IndexPrefix, IntegerWidth,
# TextBlobDefaults, JsonCheck, FunctionDefault, JsonGenerated
#
# [mariadb2tidb]
# mode = "auto"
# strict-mode = true
# enabled-rules = []
# disabled-rules = []

# filter

# block allow list provides a library to filter replicate on schema/table by given rules
Expand Down
69 changes: 69 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,68 @@ type SyncerConfig struct {
EnableANSIQuotes bool `yaml:"enable-ansi-quotes" toml:"enable-ansi-quotes" json:"enable-ansi-quotes"`
}

// MariaDB2TiDBMode controls whether to enable MariaDB schema conversion.
const (
MariaDB2TiDBModeAuto = "auto"
MariaDB2TiDBModeOn = "on"
MariaDB2TiDBModeOff = "off"
)

// MariaDB2TiDBConfig controls schema conversion rules for MariaDB sources.
type MariaDB2TiDBConfig struct {
Mode string `yaml:"mode" toml:"mode" json:"mode"`
EnabledRules []string `yaml:"enabled-rules" toml:"enabled-rules" json:"enabled-rules"`
DisabledRules []string `yaml:"disabled-rules" toml:"disabled-rules" json:"disabled-rules"`
StrictMode *bool `yaml:"strict-mode" toml:"strict-mode" json:"strict-mode"`
}

// DefaultMariaDB2TiDBConfig returns the default config.
func DefaultMariaDB2TiDBConfig() MariaDB2TiDBConfig {
strict := true
return MariaDB2TiDBConfig{
Mode: MariaDB2TiDBModeAuto,
StrictMode: &strict,
}
}

// Adjust normalizes and validates the config.
func (c *MariaDB2TiDBConfig) Adjust() error {
if c.Mode == "" {
c.Mode = MariaDB2TiDBModeAuto
}
c.Mode = strings.ToLower(c.Mode)
switch c.Mode {
case MariaDB2TiDBModeAuto, MariaDB2TiDBModeOn, MariaDB2TiDBModeOff:
default:
return fmt.Errorf("mariadb2tidb.mode must be one of %q, %q, %q", MariaDB2TiDBModeAuto, MariaDB2TiDBModeOn, MariaDB2TiDBModeOff)
}
if c.StrictMode == nil {
strict := true
c.StrictMode = &strict
}
return nil
}

// EnabledForFlavor reports whether conversion should run for a flavor.
func (c MariaDB2TiDBConfig) EnabledForFlavor(flavor string) bool {
switch strings.ToLower(c.Mode) {
case MariaDB2TiDBModeOn:
return true
case MariaDB2TiDBModeOff:
return false
default:
return strings.EqualFold(flavor, "mariadb")
}
}

// Strict reports whether conversion should fail on errors.
func (c MariaDB2TiDBConfig) Strict() bool {
if c.StrictMode == nil {
return true
}
return *c.StrictMode
}

// DefaultSyncerConfig return default syncer config for task.
func DefaultSyncerConfig() SyncerConfig {
return SyncerConfig{
Expand Down Expand Up @@ -530,6 +592,9 @@ type TaskConfig struct {
// "strict" will add default collation as upstream, and downstream will occur error when downstream don't support
CollationCompatible string `yaml:"collation_compatible" toml:"collation_compatible" json:"collation_compatible"`

// mariadb2tidb controls MariaDB schema conversion behavior.
MariaDB2TiDB MariaDB2TiDBConfig `yaml:"mariadb2tidb" toml:"mariadb2tidb" json:"mariadb2tidb"`

TargetDB *dbconfig.DBConfig `yaml:"target-database" toml:"target-database" json:"target-database"`

MySQLInstances []*MySQLInstance `yaml:"mysql-instances" toml:"mysql-instances" json:"mysql-instances"`
Expand Down Expand Up @@ -593,6 +658,7 @@ func NewTaskConfig() *TaskConfig {
CleanDumpFile: true,
OnlineDDL: true,
CollationCompatible: defaultCollationCompatible,
MariaDB2TiDB: DefaultMariaDB2TiDBConfig(),
}
cfg.FlagSet = flag.NewFlagSet("task", flag.ContinueOnError)
return cfg
Expand Down Expand Up @@ -699,6 +765,9 @@ func (c *TaskConfig) adjust() error {
} else if c.CollationCompatible == "" {
c.CollationCompatible = LooseCollationCompatible
}
if err := c.MariaDB2TiDB.Adjust(); err != nil {
return err
}

for _, item := range c.IgnoreCheckingItems {
if err := ValidateCheckingItem(item); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TaskConfigToSubTaskConfigs(c *TaskConfig, sources map[string]dbconfig.DBCon
cfg.Timezone = c.Timezone
cfg.Meta = inst.Meta
cfg.CollationCompatible = c.CollationCompatible
cfg.MariaDB2TiDB = c.MariaDB2TiDB
cfg.Experimental = c.Experimental

fromClone := dbCfg.Clone()
Expand Down Expand Up @@ -381,6 +382,7 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
c.OnlineDDLScheme = stCfg0.OnlineDDLScheme
c.CleanDumpFile = stCfg0.CleanDumpFile
c.CollationCompatible = stCfg0.CollationCompatible
c.MariaDB2TiDB = stCfg0.MariaDB2TiDB
c.MySQLInstances = make([]*MySQLInstance, 0, len(stCfgs))
c.BAList = make(map[string]*filter.Rules)
c.Routes = make(map[string]*router.TableRule)
Expand Down
6 changes: 4 additions & 2 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,8 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) {
require.Equal(t, wordCount(cfg.String()), wordCount(cfg2.String())) // since rules are unordered, so use wordCount to compare

require.NoError(t, cfg.adjust())
stCfg1.MariaDB2TiDB = cfg.MariaDB2TiDB
stCfg2.MariaDB2TiDB = cfg.MariaDB2TiDB
stCfgs, err := TaskConfigToSubTaskConfigs(cfg, map[string]dbconfig.DBConfig{source1: source1DBCfg, source2: source2DBCfg})
require.NoError(t, err)
// revert ./dumpped_data.from-sub-tasks
Expand Down Expand Up @@ -1049,8 +1051,8 @@ func TestTaskConfigForDowngrade(t *testing.T) {
// make sure all new field were added
cfgReflect := reflect.Indirect(reflect.ValueOf(cfg))
cfgForDowngradeReflect := reflect.Indirect(reflect.ValueOf(cfgForDowngrade))
// without flag, collation_compatible, experimental, validator
require.Equal(t, cfgForDowngradeReflect.NumField()+4, cfgReflect.NumField())
// without flag, collation_compatible, mariadb2tidb, experimental, validator
require.Equal(t, cfgForDowngradeReflect.NumField()+5, cfgReflect.NumField())

// make sure all field were copied
cfgForClone := &TaskConfigForDowngrade{}
Expand Down
3 changes: 3 additions & 0 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,9 @@ func (l *LightningLoader) restore(ctx context.Context) error {
}

if status < lightningStatusFinished {
if err = l.maybeTransformSchemaFiles(ctx); err != nil {
return err
}
if err = l.checkPointList.RegisterCheckPoint(ctx); err != nil {
return err
}
Expand Down
101 changes: 101 additions & 0 deletions dm/loader/mariadb2tidb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package loader

import (
"context"
"sort"
"strings"

extstorage "github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/dm/pkg/mariadb2tidb"
mconfig "github.com/pingcap/tiflow/dm/pkg/mariadb2tidb/config"
"github.com/pingcap/tiflow/dm/pkg/storage"
"go.uber.org/zap"
)

func (l *LightningLoader) maybeTransformSchemaFiles(ctx context.Context) error {
if l.cfg == nil || !l.cfg.MariaDB2TiDB.EnabledForFlavor(l.cfg.Flavor) {
return nil
}

dumpStorage, closeFn, err := l.dumpStorage(ctx)
if err != nil {
return err
}
defer closeFn()

files, err := storage.CollectDirFiles(ctx, l.cfg.LoaderConfig.Dir, dumpStorage)
if err != nil {
return err
}

schemaFiles := make([]string, 0, len(files))
for name := range files {
if isSchemaFile(name) {
schemaFiles = append(schemaFiles, name)
}
}
if len(schemaFiles) == 0 {
return nil
}
sort.Strings(schemaFiles)

convCfg := mconfig.DefaultConfig()
convCfg.EnabledRules = append([]string{}, l.cfg.MariaDB2TiDB.EnabledRules...)
convCfg.DisabledRules = append([]string{}, l.cfg.MariaDB2TiDB.DisabledRules...)
convCfg.StrictMode = l.cfg.MariaDB2TiDB.Strict()
converter := mariadb2tidb.NewConverter(convCfg)

l.logger.Info("transforming MariaDB schema files for TiDB compatibility",
zap.String("dir", l.cfg.LoaderConfig.Dir),
zap.Int("files", len(schemaFiles)),
)

for _, name := range schemaFiles {
raw, err := dumpStorage.ReadFile(ctx, name)
if err != nil {
return err
}

transformed, err := converter.TransformSQL(string(raw))
if err != nil {
return err
}
if strings.TrimSpace(transformed) == "" || transformed == string(raw) {
continue
}

if err := dumpStorage.WriteFile(ctx, name, []byte(transformed)); err != nil {
return err
}
}
return nil
}

func (l *LightningLoader) dumpStorage(ctx context.Context) (extstorage.ExternalStorage, func(), error) {
if l.cfg.ExtStorage != nil {
return l.cfg.ExtStorage, func() {}, nil
}
created, err := storage.CreateStorage(ctx, l.cfg.LoaderConfig.Dir)
if err != nil {
return nil, nil, err
}
return created, created.Close, nil
}

func isSchemaFile(name string) bool {
lower := strings.ToLower(name)
return strings.HasSuffix(lower, ".sql") && strings.Contains(lower, "-schema")
}
14 changes: 14 additions & 0 deletions dm/master/task_advanced.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ enable-heartbeat: false # whether to enable heartbeat for calculating lag betwe
# heartbeat-update-interval: 1 # interval to do heartbeat and save timestamp, default 1s
# heartbeat-report-interval: 10 # interval to report time lap to prometheus, default 10s

# MariaDB schema conversion (mariadb2tidb)
# mode: auto (MariaDB sources only), on (force enable), off (disable).
# strict-mode: if false, conversion errors fall back to the original SQL.
# available rules: Collation, ZeroTimestamp, UUIDType, MariaDBSpecific, Constraints, TrailingComma,
# VersionMacros, AutoIncrementValues, OnUpdateCurrentTimestamp, IndexType, QualifiedNames, SystemVersioning,
# SequenceType, ColumnAttributes, IndexOptions, FulltextIndexNormalize, SpatialIndexDrop, CreateOrReplace,
# EngineOptions, IgnoredClauseCleanup, CollationFallback, KeyLength, IndexPrefix, IntegerWidth,
# TextBlobDefaults, JsonCheck, FunctionDefault, JsonGenerated
#mariadb2tidb:
# mode: "auto"
# strict-mode: true
# enabled-rules: []
# disabled-rules: []

target-database:
host: "192.168.0.1"
port: 4000
Expand Down
Loading