Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions sync_diff_inspector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ import (
"go.uber.org/zap"
)

// Supported values for SplitterStrategy.
const (
SplitterStrategyLimit = "limit"
SplitterStrategyRandom = "random"
)

const (
// LocalFilePerm is the permission for local files
LocalFilePerm os.FileMode = 0o644
Expand Down Expand Up @@ -398,6 +404,8 @@ type Config struct {
CheckDataOnly bool `toml:"check-data-only" json:"-"`
// skip validation for tables that don't exist upstream or downstream
SkipNonExistingTable bool `toml:"skip-non-existing-table" json:"-"`
// SplitterStrategy controls the fallback splitter when bucket stats are unavailable.
SplitterStrategy string `toml:"splitter-strategy" json:"-"`
Comment thread
joechenrh marked this conversation as resolved.
// DMAddr is dm-master's address, the format should like "http://127.0.0.1:8261"
DMAddr string `toml:"dm-addr" json:"dm-addr"`
// DMTask string `toml:"dm-task" json:"dm-task"`
Expand Down Expand Up @@ -628,6 +636,10 @@ func (c *Config) CheckConfig() bool {
log.Error("check-thread-count must greater than 0!")
return false
}
if err := c.normalizeSplitterStrategy(); err != nil {
log.Warn("invalid splitter strategy", zap.Error(err))
return false
}
if len(c.DMAddr) != 0 {
u, err := url.Parse(c.DMAddr)
if err != nil || u.Scheme == "" || u.Host == "" {
Expand All @@ -643,6 +655,19 @@ func (c *Config) CheckConfig() bool {
return true
}

func (c *Config) normalizeSplitterStrategy() error {
mode := strings.ToLower(strings.TrimSpace(c.SplitterStrategy))
switch mode {
Comment thread
joechenrh marked this conversation as resolved.
case "", SplitterStrategyRandom:
c.SplitterStrategy = SplitterStrategyRandom
case SplitterStrategyLimit:
c.SplitterStrategy = mode
default:
return errors.Errorf("splitter-strategy must be limit or random")
}
return nil
}

func timestampOutputDir() string {
return filepath.Join(os.TempDir(), time.Now().Format("sync-diff.output.2006-01-02T15.04.05Z0700"))
}
Expand Down
4 changes: 4 additions & 0 deletions sync_diff_inspector/source/common/table_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type TableDiff struct {

ChunkSize int64 `json:"chunk-size"`

// SplitterStrategy is the fallback splitter when bucket stats are
// unavailable ("limit" or "random").
SplitterStrategy string `json:"-"`

// TableLack = 1: the table only exists downstream,
// TableLack = -1: the table only exists upstream,
// TableLack = 0: the table exists both upstream and downstream.
Expand Down
1 change: 1 addition & 0 deletions sync_diff_inspector/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func NewSources(ctx context.Context, cfg *config.Config) (downstream Source, ups
NeedUnifiedTimeZone: needUnifiedTimeZone,
Collation: tableConfig.Collation,
ChunkSize: tableConfig.ChunkSize,
SplitterStrategy: cfg.SplitterStrategy,
})

// When the router set case-sensitive false,
Expand Down
14 changes: 6 additions & 8 deletions sync_diff_inspector/source/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,13 @@ func (a *TiDBTableAnalyzer) AnalyzeSplitter(ctx context.Context, table *common.T
if err == nil {
return bucketIter, nil
}
log.Info("failed to build bucket iterator, fall back to use random iterator", zap.Error(err))
// fall back to random splitter

// use random splitter if we cannot use bucket splitter, then we can simply choose target table to generate chunks.
randIter, err := splitter.NewRandomIteratorWithCheckpoint(ctx, progressID, &originTable, a.dbConn, startRange)
if err != nil {
return nil, errors.Trace(err)
log.Info("failed to build bucket iterator, falling back", zap.Error(err))
Comment thread
joechenrh marked this conversation as resolved.
if originTable.SplitterStrategy == config.SplitterStrategyLimit {
log.Info("choose limit splitter", zap.String("table", progressID))
return splitter.NewLimitIteratorWithCheckpoint(ctx, progressID, &originTable, a.dbConn, startRange)
}
return randIter, nil
log.Info("choose random splitter", zap.String("table", progressID))
return splitter.NewRandomIteratorWithCheckpoint(ctx, progressID, &originTable, a.dbConn, startRange)
}

// TiDBRowsIterator is used to iterate rows in TiDB
Expand Down
36 changes: 31 additions & 5 deletions sync_diff_inspector/splitter/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/util/dbutil"
Expand Down Expand Up @@ -121,6 +122,8 @@ func NewLimitIteratorWithCheckpoint(
return nil, errors.NotFoundf("not found index")
}

tagChunk.IndexColumnNames = utils.GetColumnNames(indexColumns)

chunkSize := table.ChunkSize
if chunkSize <= 0 {
cnt, err := getRowCount(ctx, dbConn, table.Schema, table.Table, "", nil)
Expand Down Expand Up @@ -184,6 +187,20 @@ func (lmt *LimitIterator) Next() (*chunk.Range, error) {
if !ok && c == nil {
return nil, nil
}
if c != nil {
failpoint.Inject("print-chunk-info", func() {
lowerBounds := make([]string, len(c.Bounds))
upperBounds := make([]string, len(c.Bounds))
for i, bound := range c.Bounds {
lowerBounds[i] = bound.Lower
upperBounds[i] = bound.Upper
}
log.Info("failpoint print-chunk-info injected (limit splitter)",
zap.Strings("lowerBounds", lowerBounds),
zap.Strings("upperBounds", upperBounds),
zap.String("indexCode", c.Index.ToString()))
})
}
return c, nil
}
}
Expand All @@ -194,6 +211,10 @@ func (lmt *LimitIterator) GetIndexID() int64 {
}

func (lmt *LimitIterator) produceChunks(ctx context.Context, bucketID int) {
defer func() {
progress.UpdateTotal(lmt.progressID, 0, true)
close(lmt.chunksCh)
}()
for {
where, args := lmt.tagChunk.ToString(lmt.table.Collation)
query := fmt.Sprintf(lmt.queryTmpl, where)
Expand All @@ -211,16 +232,15 @@ func (lmt *LimitIterator) produceChunks(ctx context.Context, bucketID int) {
if dataMap == nil {
// there is no row in result set
chunk.InitChunk(chunkRange, chunk.Limit, bucketID, bucketID, lmt.table.Collation, lmt.table.Range)
progress.UpdateTotal(lmt.progressID, 1, true)
select {
case <-ctx.Done():
case lmt.chunksCh <- chunkRange:
}
close(lmt.chunksCh)
return
}

newTagChunk := chunk.NewChunkRangeOffset(lmt.columnOffset, lmt.table.Info)
newTagChunk.IndexColumnNames = chunkRange.IndexColumnNames
for column, data := range dataMap {
newTagChunk.Update(column, string(data.Data), "", !data.IsNull, false)
chunkRange.Update(column, "", string(data.Data), false, !data.IsNull)
Expand All @@ -235,6 +255,11 @@ func (lmt *LimitIterator) produceChunks(ctx context.Context, bucketID int) {
case lmt.chunksCh <- chunkRange:
}
lmt.tagChunk = newTagChunk

failpoint.Inject("check-one-chunk", func() {
log.Info("failpoint check-one-chunk injected, stop producing new chunks.")
failpoint.Return()
})
}
}

Expand Down Expand Up @@ -263,8 +288,9 @@ func generateLimitQueryTemplate(indexColumns []*model.ColumnInfo, table *common.
fields = append(fields, dbutil.ColumnName(columnInfo.Name.O))
}
columns := strings.Join(fields, ", ")
orderBy := utils.BuildOrderByClause(indexColumns, table.Collation)
tableName := dbutil.TableName(table.Schema, table.Table)

// TODO: the limit splitter has not been used yet.
// once it is used, need to add `collation` after `ORDER BY`.
return fmt.Sprintf("SELECT %s FROM %s WHERE %%s ORDER BY %s LIMIT %d,1", columns, dbutil.TableName(table.Schema, table.Table), columns, chunkSize)
return fmt.Sprintf("SELECT %s FROM %s WHERE %%s ORDER BY %s LIMIT %d,1",
columns, tableName, orderBy, chunkSize)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Diff Configuration.

######################### Global config #########################

# how many goroutines are created to check data
check-thread-count = 4

# set false if just want compare data by checksum, will skip select data when checksum is not equal.
# set true if want compare all different rows, will slow down the total compare time.
export-fix-sql = true

# ignore check table's data
check-struct-only = false

splitter-strategy = "limit"

######################### Databases config #########################
[data-sources.mysql1]
host = "127.0.0.1"#MYSQL_HOST
port = 3306#MYSQL_PORT
user = "root"
password = ""

# remove comment if use tidb's snapshot data
# snapshot = "2016-10-08 16:45:26"

[data-sources.tidb]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
# remove comment if use tidb's snapshot data
# snapshot = "2016-10-08 16:45:26"

[table-configs]
[table-configs.config1]
target-tables = ["diff_test.test"]
chunk-size = 10
range = "1=1"
######################### Task config #########################
[task]
# 1 fix sql: fix-target-TIDB1.sql
# 2 log: sync-diff.log
# 3 summary: summary.txt
# 4 checkpoint: a dir
output-dir = "/tmp/sync_diff_inspector_test/sync_diff_inspector/output"

source-instances = ["mysql1"]

target-instance = "tidb"

# tables need to check.
target-check-tables = ["diff_test.test"]

# extra table config
target-configs= ["config1"]
45 changes: 45 additions & 0 deletions sync_diff_inspector/tests/sync_diff_inspector/checkpoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,51 @@ cat $OUT_DIR/first_chunk_index
check_contains "${last_chunk_bound}" $OUT_DIR/first_chunk_bound
check_contains_regex ".:${bucket_index_left}-${bucket_index_right}:$((${last_chunk_index_array[2]} + 1)):${last_chunk_index_array[3]}" $OUT_DIR/first_chunk_index

sed "s/\"127.0.0.1\"#MYSQL_HOST/\"${MYSQL_HOST}\"/g" ./config_base_limit.toml | sed "s/3306#MYSQL_PORT/${MYSQL_PORT}/g" >./config.toml

echo "================test limit checkpoint================="
echo "------1. checkpoint and resume with limit---------"
rm -rf $OUT_DIR
mkdir -p $OUT_DIR
export GO_FAILPOINTS="github.com/pingcap/tiflow/sync_diff_inspector/splitter/check-one-chunk=return();\
github.com/pingcap/tiflow/sync_diff_inspector/splitter/print-chunk-info=return();\
github.com/pingcap/tiflow/sync_diff_inspector/diff/wait-for-checkpoint=return()"
sync_diff_inspector --config=./config.toml >$OUT_DIR/checkpoint_diff.output
check_contains "check pass!!!" $OUT_DIR/sync_diff.log
check_contains "choose limit splitter" $OUT_DIR/sync_diff.log
# Save the last chunk's info to verify continuation.
# With limit, each chunk is a single row query, so chunkIndex+1 == chunkCnt.
last_chunk_info=$(grep 'print-chunk-info' $OUT_DIR/sync_diff.log | awk -F 'upperBounds=' '{print $2}' | sed 's/[]["]//g' | sort -n | awk 'END {print}')
echo "$last_chunk_info" # e.g. 9 indexCode=0:0-0:0:1
last_chunk_bound=$(echo $last_chunk_info | awk -F ' ' '{print $1}')
echo "$last_chunk_bound"
last_chunk_index=$(echo $last_chunk_info | awk -F '=' '{print $2}')
echo "$last_chunk_index"
OLD_IFS="$IFS"
IFS=":"
last_chunk_index_array=($last_chunk_index)
IFS="$OLD_IFS"
for s in ${last_chunk_index_array[@]}; do
echo "$s"
done
# chunkIndex should be the last Index
[[ $((${last_chunk_index_array[2]} + 1)) -eq ${last_chunk_index_array[3]} ]] || exit 1
# Save bucketIndexRight, which should be equal to bucketIndexLeft of the chunk first created in the next running.
bucket_index_right=$(($(echo ${last_chunk_index_array[1]} | awk -F '-' '{print $2}') + 1))
echo $bucket_index_right

rm -f $OUT_DIR/sync_diff.log
export GO_FAILPOINTS="github.com/pingcap/tiflow/sync_diff_inspector/splitter/print-chunk-info=return()"
sync_diff_inspector --config=./config.toml >$OUT_DIR/checkpoint_diff.output
first_chunk_info=$(grep 'print-chunk-info' $OUT_DIR/sync_diff.log | awk -F 'lowerBounds=' '{print $2}' | sed 's/[]["]//g' | sort -n | awk 'NR==1')
echo $first_chunk_info | awk -F '=' '{print $1}' >$OUT_DIR/first_chunk_bound
cat $OUT_DIR/first_chunk_bound
echo $first_chunk_info | awk -F '=' '{print $3}' >$OUT_DIR/first_chunk_index
cat $OUT_DIR/first_chunk_index
# Notice: when chunk is created paralleling, the least chunk may not appear in the first line. so we sort it as before.
check_contains "${last_chunk_bound}" $OUT_DIR/first_chunk_bound
check_contains_regex ".*:${bucket_index_right}-.*:0:.*" $OUT_DIR/first_chunk_index

sed "s/\"127.0.0.1\"#MYSQL_HOST/\"${MYSQL_HOST}\"/g" ./config_base_rand.toml | sed "s/3306#MYSQL_PORT/${MYSQL_PORT}/g" >./config.toml

echo "================test random checkpoint================="
Expand Down