Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 41 additions & 15 deletions sync_diff_inspector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ const (
UnifiedTimeZone string = "+0:00"
)

// ChecksumAlgorithm specifies the hash function to use for chunk checksumming.
type ChecksumAlgorithm string

const (
// MD5 uses MD5 hash function (default for backwards compatibility)
MD5 ChecksumAlgorithm = "md5"
// SHA256 uses SHA256 hash function (for FIPS-compliant environments)
SHA256 ChecksumAlgorithm = "sha256"
)

// TableConfig is the config of table.
type TableConfig struct {
// table's filter to tell us which table should adapt to this config.
Expand Down Expand Up @@ -124,6 +134,8 @@ type DataSource struct {

Conn *sql.DB
SessionConfig SessionConfig `toml:"session" json:"session"`

ChecksumAlgorithm ChecksumAlgorithm `toml:"checksum-algorithm" json:"checksum-algorithm"`
}

// IsAutoSnapshot returns true if the tidb_snapshot is expected to automatically
Expand Down Expand Up @@ -393,6 +405,10 @@ type Config struct {
DMAddr string `toml:"dm-addr" json:"dm-addr"`
// DMTask string `toml:"dm-task" json:"dm-task"`
DMTask string `toml:"dm-task" json:"dm-task"`
// ChecksumAlgorithm specifies the hash function to use for chunk checksumming.
// Options: MD5 or SHA256. Default: MD5 (for backwards compatibility)
// Set to SHA256 for FIPS-compliant environments.
ChecksumAlgorithm ChecksumAlgorithm `toml:"checksum-algorithm" json:"checksum-algorithm"`

DataSources map[string]*DataSource `toml:"data-sources" json:"data-sources"`

Expand Down Expand Up @@ -428,8 +444,10 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.CheckStructOnly, "check-struct-only", false, "ignore check table's data")
fs.BoolVar(&cfg.SkipNonExistingTable, "skip-non-existing-table", false, "skip validation for tables that don't exist upstream or downstream")
fs.BoolVar(&cfg.CheckDataOnly, "check-data-only", false, "ignore check table's struct")
fs.StringVar((*string)(&cfg.ChecksumAlgorithm), "checksum-algorithm", string(MD5), "checksum function: md5, sha256")

_ = fs.MarkHidden("check-data-only")
_ = fs.MarkHidden("checksum-algorithm")

fs.SortFlags = false
return cfg
Expand Down Expand Up @@ -531,12 +549,13 @@ func (c *Config) adjustConfigByDMSubTasks() (err error) {
}
dataSources := make(map[string]*DataSource)
dataSources["target"] = &DataSource{
Host: subTaskCfgs[0].To.Host,
Port: subTaskCfgs[0].To.Port,
User: subTaskCfgs[0].To.User,
Password: utils.SecretString(subTaskCfgs[0].To.Password),
SqlMode: sqlMode,
Security: parseTLSFromDMConfig(subTaskCfgs[0].To.Security),
Host: subTaskCfgs[0].To.Host,
Port: subTaskCfgs[0].To.Port,
User: subTaskCfgs[0].To.User,
Password: utils.SecretString(subTaskCfgs[0].To.Password),
SqlMode: sqlMode,
Security: parseTLSFromDMConfig(subTaskCfgs[0].To.Security),
ChecksumAlgorithm: c.ChecksumAlgorithm,
}
for _, subTaskCfg := range subTaskCfgs {
tableRouter, err := router.NewTableRouter(subTaskCfg.CaseSensitive, []*router.TableRule{})
Expand All @@ -552,15 +571,15 @@ func (c *Config) adjustConfigByDMSubTasks() (err error) {
routeTargetSet[dbutil.TableName(rule.TargetSchema, rule.TargetTable)] = struct{}{}
}
dataSources[subTaskCfg.SourceID] = &DataSource{
Host: subTaskCfg.From.Host,
Port: subTaskCfg.From.Port,
User: subTaskCfg.From.User,
Password: utils.SecretString(subTaskCfg.From.Password),
SqlMode: sqlMode,
Security: parseTLSFromDMConfig(subTaskCfg.From.Security),
Router: tableRouter,

RouteTargetSet: routeTargetSet,
Host: subTaskCfg.From.Host,
Port: subTaskCfg.From.Port,
User: subTaskCfg.From.User,
Password: utils.SecretString(subTaskCfg.From.Password),
SqlMode: sqlMode,
Security: parseTLSFromDMConfig(subTaskCfg.From.Security),
Router: tableRouter,
RouteTargetSet: routeTargetSet,
ChecksumAlgorithm: c.ChecksumAlgorithm,
}
}
c.DataSources = dataSources
Expand All @@ -575,6 +594,12 @@ func (c *Config) adjustConfigByDMSubTasks() (err error) {
}

func (c *Config) Init() (err error) {
checksumAlgo := ChecksumAlgorithm(strings.ToLower(string(c.ChecksumAlgorithm)))
if checksumAlgo != MD5 && checksumAlgo != SHA256 {
return errors.Errorf("checksum-algorithm must be 'md5' or 'sha256', got: %s", c.ChecksumAlgorithm)
}
c.ChecksumAlgorithm = checksumAlgo

if len(c.DMAddr) > 0 {
err := c.adjustConfigByDMSubTasks()
if err != nil {
Expand All @@ -587,6 +612,7 @@ func (c *Config) Init() (err error) {
return nil
}
for _, d := range c.DataSources {
d.ChecksumAlgorithm = c.ChecksumAlgorithm
routeRuleList := make([]*router.TableRule, 0, len(c.Routes))
d.RouteTargetSet = make(map[string]struct{})
// if we had rules
Expand Down
27 changes: 23 additions & 4 deletions sync_diff_inspector/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func TestParseConfig(t *testing.T) {

// we might not use the same config to run this test. e.g. MYSQL_PORT can be 4000
require.JSONEq(t, cfg.String(),
"{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":{\"max_execution_time\":86400,\"tidb_opt_prefer_range_scan\":\"ON\"}}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":{\"max_execution_time\":86400,\"tidb_opt_prefer_range_scan\":\"ON\"}},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}")
"{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"checksum-algorithm\":\"md5\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null,\"checksum-algorithm\":\"md5\"},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null,\"checksum-algorithm\":\"md5\"},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null,\"checksum-algorithm\":\"md5\"},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":{\"max_execution_time\":86400,\"tidb_opt_prefer_range_scan\":\"ON\"},\"checksum-algorithm\":\"md5\"}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null,\"checksum-algorithm\":\"md5\"},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null,\"checksum-algorithm\":\"md5\"},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null,\"checksum-algorithm\":\"md5\"}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":{\"max_execution_time\":86400,\"tidb_opt_prefer_range_scan\":\"ON\"},\"checksum-algorithm\":\"md5\"},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}")
hash, err := cfg.Task.ComputeConfigHash()
require.NoError(t, err)
require.Equal(t, hash, "5a978bf48039d41b81403d635332493f031bb890a6d4e4d7df77f75e0ccc29f3")
require.Equal(t, "09c59e9563f2f03ec970420b9df37bb06b8b7d31228e65ef9bc8f997ce9a9e20", hash)

require.True(t, cfg.TableConfigs["config1"].Valid())

Expand All @@ -77,12 +77,31 @@ func TestError(t *testing.T) {
cfg.CheckThreadCount = 1
require.True(t, cfg.CheckConfig())

// Init
// Checksum algorithm - invalid
cfg.ChecksumAlgorithm = "invalid"
err := cfg.Init()
require.Contains(t, err.Error(), "checksum-algorithm must be 'md5' or 'sha256'")

// Valid checksum algorithm - sha256
cfg.ChecksumAlgorithm = "sha256"
cfg.DataSources = nil
err = cfg.Init()
require.NotContains(t, err.Error(), "checksum-algorithm")
require.Equal(t, SHA256, cfg.ChecksumAlgorithm)

// Valid checksum algorithm - MD5
cfg.ChecksumAlgorithm = "MD5"
err = cfg.Init()
require.NotContains(t, err.Error(), "checksum-algorithm")
require.Equal(t, MD5, cfg.ChecksumAlgorithm) // normalized to lowercase

cfg.ChecksumAlgorithm = MD5
cfg.DataSources = make(map[string]*DataSource)
// Init - invalid route
cfg.DataSources["123"] = &DataSource{
RouteRules: []string{"111"},
}
err := cfg.Init()
err = cfg.Init()
require.Contains(t, err.Error(), "not found source routes for rule 111, please correct the config")
}

Expand Down
8 changes: 7 additions & 1 deletion sync_diff_inspector/source/mysql_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type MySQLSources struct {
tableDiffs []*common.TableDiff

sourceTablesMap map[string][]*common.TableShardSource
checksumAlgorithm config.ChecksumAlgorithm
}

func getMatchedSourcesForTable(sourceTablesMap map[string][]*common.TableShardSource, table *common.TableDiff) []*common.TableShardSource {
Expand Down Expand Up @@ -103,7 +104,7 @@ func (s *MySQLSources) GetCountAndMd5(ctx context.Context, tableRange *splitter.

for _, ms := range matchSources {
go func(ms *common.TableShardSource) {
count, checksum, err := utils.GetCountAndMd5Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, "", chunk.Args)
count, checksum, err := utils.GetCountAndChecksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, "", chunk.Args, string(s.checksumAlgorithm))
infoCh <- &ChecksumInfo{
Checksum: checksum,
Count: count,
Expand Down Expand Up @@ -383,9 +384,14 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*
return nil, errors.Annotatef(err, "please make sure the filter is correct.")
}

checksumAlgorithm := config.MD5
if len(ds) > 0 {
checksumAlgorithm = ds[0].ChecksumAlgorithm
}
mss := &MySQLSources{
tableDiffs: tableDiffs,
sourceTablesMap: sourceTablesMap,
checksumAlgorithm: checksumAlgorithm,
}
return mss, nil
}
6 changes: 4 additions & 2 deletions sync_diff_inspector/source/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type TiDBSource struct {
sourceTableMap map[string]*common.TableSource
snapshot string
sqlHint string
checksumAlgorithm config.ChecksumAlgorithm
// bucketSpliterPool is the shared pool to produce chunks using bucket
bucketSpliterPool *utils.WorkerPool
dbConn *sql.DB
Expand Down Expand Up @@ -149,9 +150,9 @@ func (s *TiDBSource) GetCountAndMd5(ctx context.Context, tableRange *splitter.Ra
}
}

count, checksum, err := utils.GetCountAndMd5Checksum(
count, checksum, err := utils.GetCountAndChecksum(
ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info,
chunk.Where, indexHint, chunk.Args)
chunk.Where, indexHint, chunk.Args, string(s.checksumAlgorithm))

cost := time.Since(beginTime)
return &ChecksumInfo{
Expand Down Expand Up @@ -301,6 +302,7 @@ func NewTiDBSource(ctx context.Context, tableDiffs []*common.TableDiff, ds *conf
bucketSpliterPool: bucketSpliterPool,
version: utils.TryToGetVersion(ctx, ds.Conn),
sqlHint: ds.SQLHintUseIndex,
checksumAlgorithm: ds.ChecksumAlgorithm,
}
return ts, nil
}
32 changes: 23 additions & 9 deletions sync_diff_inspector/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,17 +779,20 @@ func GetTableSize(ctx context.Context, db *sql.DB, schemaName, tableName string)
return dataSize.Int64, nil
}

// GetCountAndMd5Checksum returns checksum code and count of some data by given condition
func GetCountAndMd5Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, indexHint string, args []any) (int64, uint64, error) {
// GetCountAndChecksum returns checksum code and count of some data by given condition
func GetCountAndChecksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, indexHint string, args []interface{}, checksumAlgorithm string) (int64, uint64, error) {
/*
calculate MD5 checksum and count example:
calculate checksum and count example (MD5):
mysql> SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM `a`.`t`;
+--------+----------------------
| CNT | CHECKSUM |
+--------+----------------------
| 100000 | 3462532621352132810 |
+--------+----------------------
1 row in set (0.46 sec)

calculate checksum and count example (SHA256):
mysql> SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(SHA2(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`))), 256), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(SHA2(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`))), 256), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM `a`.`t`;
*/
columnNames := make([]string, 0, len(tbInfo.Columns))
columnIsNull := make([]string, 0, len(tbInfo.Columns))
Expand All @@ -810,16 +813,27 @@ func GetCountAndMd5Checksum(ctx context.Context, db *sql.DB, schemaName, tableNa
columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name))
}

query := fmt.Sprintf("SELECT %s COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;",
indexHint,
strings.Join(columnNames, ", "),
strings.Join(columnIsNull, ", "),
var checksumFuncTemplate string
if checksumAlgorithm == "sha256" {
checksumFuncTemplate = "SHA2(%s, 256)"
} else {
checksumFuncTemplate = "MD5(%s)"
}

concatExpr := fmt.Sprintf("CONCAT_WS(',', %s, CONCAT(%s))",
strings.Join(columnNames, ", "),
strings.Join(columnIsNull, ", "),
strings.Join(columnIsNull, ", "))

checksumExpr := fmt.Sprintf(checksumFuncTemplate, concatExpr)

query := fmt.Sprintf("SELECT %s COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(%s, 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(%s, 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;",
indexHint,
checksumExpr,
checksumExpr,
dbutil.TableName(schemaName, tableName),
limitRange,
)
log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args))
log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args), zap.String("checksum-algorithm", checksumAlgorithm))

var count sql.NullInt64
var checksum uint64
Expand Down
Loading