Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
15e04f8
pin
joechenrh Feb 10, 2026
38de6f3
add concurrency
joechenrh Feb 10, 2026
5af0c21
add progress
joechenrh Feb 10, 2026
bfa8476
sync-diff-inspector: fix random splitter formatting for verify
joechenrh Feb 13, 2026
eb6a537
sync-diff-inspector: tolerate transient exit in json integration checks
joechenrh Feb 13, 2026
9b5e979
sync-diff-inspector: fallback JSON tables from checksum-only path
joechenrh Feb 13, 2026
34008c5
sync-diff-inspector: gate checksum-only by source type and JSON
joechenrh Feb 13, 2026
0cb0681
sync-diff-inspector: remove accidental crashing TestXXX
joechenrh Feb 24, 2026
bd1b8a7
Update code
joechenrh Mar 4, 2026
f90ee22
Add integration test
joechenrh Mar 5, 2026
d4aa85a
sync-diff-inspector: use explicit NONCLUSTERED PK syntax in test
joechenrh Mar 5, 2026
3ca1ccf
address comment
joechenrh Mar 5, 2026
09d0ec9
sync-diff-inspector: address review comments for checksum-only mode
joechenrh Mar 5, 2026
ca3f527
sync-diff-inspector: fix nil-deref in initCheckpoint on mode switch
joechenrh Mar 10, 2026
3317a75
all: move global checksum capability into source interface
joechenrh Mar 12, 2026
da208b7
sync-diff-inspector: invalidate checkpoints on export-fix-sql change
joechenrh Mar 12, 2026
7724757
sync-diff-inspector: avoid stale checksum failures after resume
joechenrh Mar 12, 2026
5ac196d
sync-diff-inspector: make checksum split helper mutation explicit
joechenrh Mar 12, 2026
397398f
sync-diff-inspector: simplify checksum checkpoint flush loop
joechenrh Mar 12, 2026
d3c496e
sync-diff-inspector: streamline checksum worker pipeline
joechenrh Mar 12, 2026
81e9fe7
sync-diff-inspector: streamline checksum worker pipeline
joechenrh Mar 12, 2026
3a196ed
sync-diff-inspector: enable rowid chunking for checksum heap tables
joechenrh Mar 12, 2026
94e0276
sync-diff-inspector: clarify saved checkpoint modes
joechenrh Mar 12, 2026
2f87d92
sync-diff-inspector: finalize progress on checksum errors
joechenrh Mar 12, 2026
0981412
fix ci
joechenrh Mar 13, 2026
c1a8edc
all: stabilize sync_diff_inspector checkpoint test
joechenrh Mar 13, 2026
5b21f12
sync-diff-inspector: add partial checkpoint restart test with midway …
joechenrh Mar 16, 2026
e4e249e
sync-diff-inspector: use inline env vars for GO_FAILPOINTS in test
joechenrh Mar 16, 2026
f029a3d
Update comment
joechenrh Mar 16, 2026
e5193b9
sync-diff: harden global checksum fallback
joechenrh Mar 17, 2026
41ead0e
sync-diff: fix resumed report state
joechenrh Mar 18, 2026
196acc9
sync-diff: simplify report resume state
joechenrh Mar 18, 2026
c65078b
sync-diff: pass ExportFixSQL as parameter instead of duplicating field
joechenrh Mar 18, 2026
0592329
sync-diff: address review findings (#1,#2,#4,#5)
joechenrh Mar 18, 2026
9052fd3
sync-diff: fix shfmt indentation in checksum_only test script
joechenrh Mar 18, 2026
aa19e10
sync-diff: fix ComputeConfigHash call in TLS test after rebase
joechenrh Mar 18, 2026
a698a49
sync-diff: store ExportFixSQL in TaskConfig with json:"-"
joechenrh Mar 18, 2026
6a9564b
sync-diff: use t.TempDir() instead of hard-coded /tmp path in config …
joechenrh Mar 18, 2026
db7859b
sync-diff: remove hard-coded /tmp paths from test config fixtures
joechenrh Mar 19, 2026
e2d447d
sync-diff: fix checksum-only progress to show total chunks upfront
joechenrh Mar 20, 2026
95f6c0e
sync-diff: restore output-dir in example configs, override in test only
joechenrh Mar 20, 2026
faf8fd2
sync-diff: unify terminology to "global checksum" and add omitempty
joechenrh Mar 23, 2026
b9366ab
update comment
joechenrh Mar 23, 2026
8ed8109
sync-diff-inspector: clarify ExportFixSQL runtime hash comment
joechenrh Mar 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 82 additions & 7 deletions sync_diff_inspector/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,46 @@ type Checkpoint struct {
hp *nodeHeap
}

// SavedState contains the information of the latest checked chunk and state of `report`
// When sync-diff start from the checkpoint, it will load this information and continue running
// ChecksumSourceState stores one source's checksum progress in global-checksum mode.
type ChecksumSourceState struct {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does checksum-only mode the global checksum?

Copy link
Copy Markdown
Contributor Author

@joechenrh joechenrh Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep — when export-fix-sql=false and both sources implement ChecksumCapableSource, the tool routes into equalByGlobalChecksum which computes a single aggregated checksum per table (XOR of per-chunk CRC32), comparing the final value between upstream and downstream. Unified the terminology to "global checksum" throughout the codebase in joechenrh@faf8fd2 👍

LastRange *chunk.Range `json:"last-range,omitempty"`
Checksum uint64 `json:"checksum"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add omitempty for other fields too

Copy link
Copy Markdown
Contributor Author

@joechenrh joechenrh Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in joechenrh@faf8fd2 . Added omitempty to Upstream and Downstream in ChecksumState since they're always set together via NewChecksumState. For the inner fields (Checksum, Count, Done), their zero values are semantically meaningful (0 count / 0 checksum / not-done), so omitting them would break checkpoint restore.

Count int64 `json:"count"`
Done bool `json:"done"`
Comment thread
D3Hunter marked this conversation as resolved.
}

// ChecksumState stores checkpoint progress for global-checksum mode.
type ChecksumState struct {
TableIndex int `json:"table-index"`
Upstream *ChecksumSourceState `json:"upstream,omitempty"`
Downstream *ChecksumSourceState `json:"downstream,omitempty"`
}

// NewChecksumState returns an initialized checksum state for one table index.
func NewChecksumState(tableIndex int) *ChecksumState {
return &ChecksumState{
TableIndex: tableIndex,
Upstream: &ChecksumSourceState{},
Downstream: &ChecksumSourceState{},
}
}

func (s *ChecksumState) init() {
if s.Upstream == nil {
s.Upstream = &ChecksumSourceState{}
}
if s.Downstream == nil {
s.Downstream = &ChecksumSourceState{}
}
}

// SavedState stores mode-dependent checkpoint state plus report state for resume.
// Depending on the execution mode, it contains either Chunk (chunk diff mode)
// or Checksum (global-checksum mode).
type SavedState struct {
Comment thread
D3Hunter marked this conversation as resolved.
Comment thread
joechenrh marked this conversation as resolved.
Chunk *Node `json:"chunk-info"`
Report *report.Report `json:"report-info"`
Chunk *Node `json:"chunk-info,omitempty"`
Checksum *ChecksumState `json:"checksum-info,omitempty"`
Report *report.Report `json:"report-info"`
}

// InitCurrentSavedID the method is only used in initialization without lock, be cautious
Expand Down Expand Up @@ -258,16 +293,56 @@ func (cp *Checkpoint) SaveChunk(ctx context.Context, fileName string, cur *Node,
return cur.GetID(), nil
}

// LoadChunk loads chunk info from file `chunk`
func (cp *Checkpoint) LoadChunk(fileName string) (*Node, *report.Report, error) {
// SaveChecksumState saves global-checksum checkpoint state to file.
func (cp *Checkpoint) SaveChecksumState(
ctx context.Context,
fileName string,
state *ChecksumState,
reportInfo *report.Report,
) error {
savedState := &SavedState{
Checksum: state,
Report: reportInfo,
}
checkpointData, err := json.Marshal(savedState)
if err != nil {
log.Warn("fail to save checksum checkpoint", zap.Error(err))
return errors.Trace(err)
}

return writeFileAtomic(fileName, checkpointData, config.LocalFilePerm)
}

func loadSavedState(fileName string) (*SavedState, error) {
bytes, err := os.ReadFile(fileName)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, errors.Trace(err)
}
n := &SavedState{}
err = json.Unmarshal(bytes, n)
if err != nil {
return nil, errors.Trace(err)
}
if n.Checksum != nil {
n.Checksum.init()
}
return n, nil
}

// LoadChunk loads chunk info from file `chunk`
func (cp *Checkpoint) LoadChunk(fileName string) (*Node, *report.Report, error) {
n, err := loadSavedState(fileName)
if err != nil {
return nil, nil, errors.Trace(err)
}
return n.Chunk, n.Report, nil
}

// LoadChecksumState loads checksum checkpoint state from file.
func (cp *Checkpoint) LoadChecksumState(fileName string) (*ChecksumState, *report.Report, error) {
n, err := loadSavedState(fileName)
if err != nil {
return nil, nil, errors.Trace(err)
}
return n.Checksum, n.Report, nil
}
47 changes: 47 additions & 0 deletions sync_diff_inspector/checkpoints/checkpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"math/rand"
"os"
"path/filepath"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -123,3 +124,49 @@ func TestLoadChunk(t *testing.T) {
require.Equal(t, node.GetID().Compare(id), 0)
require.Equal(t, node.ChunkRange.IndexColumnNames, testColNames)
}

func TestSaveLoadChecksumState(t *testing.T) {
checker := new(Checkpoint)
checker.Init()
ctx := context.Background()

state := &ChecksumState{
TableIndex: 3,
Upstream: &ChecksumSourceState{
LastRange: &chunk.Range{
Index: &chunk.CID{
TableIndex: 3,
BucketIndexLeft: 0,
BucketIndexRight: 0,
ChunkIndex: 1,
ChunkCnt: 5,
},
},
Checksum: 123,
Count: 456,
Done: false,
},
Downstream: &ChecksumSourceState{
Checksum: 789,
Count: 100,
Done: true,
},
}

checkpointFile := filepath.Join(t.TempDir(), "TestSaveLoadChecksumState")
err := checker.SaveChecksumState(ctx, checkpointFile, state, nil)
require.NoError(t, err)

loaded, _, err := checker.LoadChecksumState(checkpointFile)
require.NoError(t, err)
require.NotNil(t, loaded)
require.Equal(t, 3, loaded.TableIndex)
require.Equal(t, uint64(123), loaded.Upstream.Checksum)
require.Equal(t, int64(456), loaded.Upstream.Count)
require.False(t, loaded.Upstream.Done)
require.NotNil(t, loaded.Upstream.LastRange)
require.Equal(t, 1, loaded.Upstream.LastRange.Index.ChunkIndex)
require.Equal(t, uint64(789), loaded.Downstream.Checksum)
require.Equal(t, int64(100), loaded.Downstream.Count)
require.True(t, loaded.Downstream.Done)
}
8 changes: 8 additions & 0 deletions sync_diff_inspector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ type TaskConfig struct {
TargetTableConfigs []*TableConfig
TargetCheckTables filter.Filter

// ExportFixSQL mirrors the top-level config at runtime so
// ComputeConfigHash can include this mode switch without changing its
// signature, while still omitting the derived field from JSON output.
ExportFixSQL bool `json:"-"`
Comment thread
kennytm marked this conversation as resolved.

FixDir string
CheckpointDir string
HashFile string
Expand Down Expand Up @@ -346,6 +351,7 @@ func (t *TaskConfig) Init(
// we think the second sync diff can use the checkpoint.
func (t *TaskConfig) ComputeConfigHash() (string, error) {
hash := make([]byte, 0)
hash = append(hash, []byte(strconv.FormatBool(t.ExportFixSQL))...)
// compute sources
for _, c := range t.SourceInstances {
configBytes, err := json.Marshal(c)
Expand Down Expand Up @@ -590,6 +596,7 @@ func (c *Config) Init() (err error) {
if err != nil {
return errors.Annotate(err, "failed to init Task")
}
c.Task.ExportFixSQL = c.ExportFixSQL
err = c.Task.Init(c.DataSources, c.TableConfigs)
if err != nil {
return errors.Annotate(err, "failed to init Task")
Expand All @@ -615,6 +622,7 @@ func (c *Config) Init() (err error) {
}
}

c.Task.ExportFixSQL = c.ExportFixSQL
err = c.Task.Init(c.DataSources, c.TableConfigs)
if err != nil {
return errors.Annotate(err, "failed to init Task")
Expand Down
7 changes: 5 additions & 2 deletions sync_diff_inspector/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
# how many goroutines are created to check data
check-thread-count = 4

# set false if just want compare data by checksum, will skip select data when checksum is not equal.
# set true if want compare all different rows, will slow down the total compare time.
# When set to false, only checksums are compared and no fix SQL is generated.
# If both upstream and downstream are TiDB, a faster global-checksum mode is
# used automatically. When set to true, sync-diff-inspector compares rows chunk by chunk and
# generates fix SQL for any differences found, which is slower.
# Changing this value between runs invalidates existing checkpoints.
export-fix-sql = true

# ignore check table's data
Expand Down
34 changes: 29 additions & 5 deletions sync_diff_inspector/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ package config
import (
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"

"github.com/stretchr/testify/require"
)

func TestParseConfig(t *testing.T) {
tmpDir := filepath.Join(t.TempDir(), "output")

cfg := NewConfig()
require.Nil(t, cfg.Parse([]string{"-L", "info", "--config", "config.toml"}))
cfg = NewConfig()
Expand All @@ -32,31 +36,51 @@ func TestParseConfig(t *testing.T) {
require.Contains(t, err.Error(), "LL")

require.Nil(t, cfg.Parse([]string{"--config", "config.toml"}))
cfg.Task.OutputDir = tmpDir
require.Nil(t, cfg.Init())
require.Nil(t, cfg.Task.Init(cfg.DataSources, cfg.TableConfigs))

require.Nil(t, cfg.Parse([]string{"--config", "config_sharding.toml"}))
cfg.Task.OutputDir = tmpDir
// we change the config from config.toml to config_sharding.toml
// this action will raise error.
require.Contains(t, cfg.Init().Error(), "failed to init Task: config changes breaking the checkpoint, please use another outputDir and start over again")

require.NoError(t, os.RemoveAll(cfg.Task.OutputDir))
require.NoError(t, os.RemoveAll(tmpDir))
require.Nil(t, cfg.Parse([]string{"--config", "config_sharding.toml"}))
cfg.Task.OutputDir = tmpDir
// this time will be ok, because we remove the last outputDir.
require.Nil(t, cfg.Init())
require.Nil(t, cfg.Task.Init(cfg.DataSources, cfg.TableConfigs))

require.True(t, cfg.CheckConfig())

// we might not use the same config to run this test. e.g. MYSQL_PORT can be 4000
require.JSONEq(t, cfg.String(),
"{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":{\"max_execution_time\":86400,\"tidb_opt_prefer_range_scan\":\"ON\"}}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":{\"max_execution_time\":86400,\"tidb_opt_prefer_range_scan\":\"ON\"}},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}")
expectedJSON := strings.ReplaceAll(
"{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":{\"max_execution_time\":86400,\"tidb_opt_prefer_range_scan\":\"ON\"}}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"OUTPUT_DIR_PLACEHOLDER\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"sql-hint-use-index\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null,\"session\":{\"max_execution_time\":86400,\"tidb_opt_prefer_range_scan\":\"ON\"}},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"OUTPUT_DIR_PLACEHOLDER/fix-on-tidb0\",\"CheckpointDir\":\"OUTPUT_DIR_PLACEHOLDER/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}",
"OUTPUT_DIR_PLACEHOLDER", tmpDir)
require.JSONEq(t, cfg.String(), expectedJSON)
hash, err := cfg.Task.ComputeConfigHash()
require.NoError(t, err)
require.Equal(t, hash, "5a978bf48039d41b81403d635332493f031bb890a6d4e4d7df77f75e0ccc29f3")
require.Equal(t, hash, "e4b4a202a072904121101d516f05ff8144e431ca6094db0fcca375221ddde98d")
require.True(t, cfg.TableConfigs["config1"].Valid())
}

func TestComputeConfigHashIncludesExportFixSQL(t *testing.T) {
cfg := NewConfig()
require.NoError(t, cfg.Parse([]string{"--config", "config.toml"}))
cfg.Task.OutputDir = t.TempDir()
require.NoError(t, cfg.Init())
Comment thread
kennytm marked this conversation as resolved.

cfg.Task.ExportFixSQL = true
withFixSQL, err := cfg.Task.ComputeConfigHash()
require.NoError(t, err)

cfg.Task.ExportFixSQL = false
withoutFixSQL, err := cfg.Task.ComputeConfigHash()
require.NoError(t, err)

require.NoError(t, os.RemoveAll(cfg.Task.OutputDir))
require.NotEqual(t, withFixSQL, withoutFixSQL)
}

func TestError(t *testing.T) {
Expand Down
30 changes: 22 additions & 8 deletions sync_diff_inspector/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ type Diff struct {
cp *checkpoints.Checkpoint
startRange *splitter.RangeInfo
report *report.Report

checksumCheckpoint *checkpoints.ChecksumState
checksumCheckpointMu sync.Mutex
}

// NewDiff returns a Diff instance.
Expand Down Expand Up @@ -186,6 +189,9 @@ func (df *Diff) init(ctx context.Context, cfg *config.Config) (err error) {

func (df *Diff) initCheckpoint() error {
df.cp.Init()
if df.shouldUseGlobalChecksum() {
Comment thread
D3Hunter marked this conversation as resolved.
Comment thread
D3Hunter marked this conversation as resolved.
Comment thread
joechenrh marked this conversation as resolved.
return df.initChecksumCheckpoint()
}

finishTableNums := 0
path := filepath.Join(df.CheckpointDir, checkpointFile)
Expand All @@ -195,14 +201,15 @@ func (df *Diff) initCheckpoint() error {
return errors.Annotate(err, "the checkpoint load process failed")
}

// this need not be synchronized, because at the moment, the is only one thread access the section
log.Info("load checkpoint",
zap.Any("chunk index", node.GetID()),
zap.Reflect("chunk", node),
zap.String("state", node.GetState()))
df.cp.InitCurrentSavedID(node)

if node != nil {
if node == nil {
log.Warn("checkpoint file exists but contains no chunk info, starting from beginning")
} else {
// this need not be synchronized, because at the moment, the is only one thread access the section
log.Info("load checkpoint",
zap.Any("chunk index", node.GetID()),
zap.Reflect("chunk", node),
zap.String("state", node.GetState()))
df.cp.InitCurrentSavedID(node)
// remove the sql file that ID bigger than node.
// cause we will generate these sql again.
err = df.removeSQLFiles(node.GetID())
Expand Down Expand Up @@ -275,6 +282,13 @@ func getConfigsForReport(cfg *config.Config) ([][]byte, []byte, error) {

// Equal tests whether two database have same data and schema.
func (df *Diff) Equal(ctx context.Context) error {
if df.shouldUseGlobalChecksum() {
return df.equalByGlobalChecksum(ctx)
}
return df.equalByChunkChecksum(ctx)
}

func (df *Diff) equalByChunkChecksum(ctx context.Context) error {
chunksIter, err := df.generateChunksIterator(ctx)
if err != nil {
return errors.Trace(err)
Expand Down
Loading