Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/check_and_build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:

- name: Cache Tools
id: cache-tools
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: tools/bin
key: macos-latest-ticdc-tools-${{ hashFiles('tools/check/go.sum') }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/dataflow_engine_chaos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
go-version: 1.19

- name: Cache go modules
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-dataflow-${{ hashFiles('go.sum') }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/dm_binlog_999999.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ jobs:
ref: refs/pull/${{ github.event.inputs.pr }}/head

- name: Cache go modules
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-ticdc-${{ hashFiles('go.sum') }}

- name: Cache Tools
id: cache-tools
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: tools/bin
key: ${{ runner.os }}-ticdc-tools-${{ hashFiles('tools/check/go.sum') }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/dm_chaos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ jobs:
ref: refs/pull/${{ github.event.inputs.pr }}/head

- name: Cache go modules
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-ticdc-${{ hashFiles('go.sum') }}

- name: Cache Tools
id: cache-tools
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: tools/bin
key: ${{ runner.os }}-ticdc-tools-${{ hashFiles('tools/check/go.sum') }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/dm_upstream_switch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ jobs:
ref: refs/pull/${{ github.event.inputs.pr }}/head

- name: Cache go modules
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-ticdc-${{ hashFiles('go.sum') }}

- name: Cache Tools
id: cache-tools
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: tools/bin
key: ${{ runner.os }}-ticdc-tools-${{ hashFiles('tools/check/go.sum') }}
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/ticdc_integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:

- name: Cache Vendor
id: cache-vendor
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: vendor
key: ${{ runner.os }}-cdc-integration-vendor-${{ hashFiles('go.sum') }}
Expand Down Expand Up @@ -93,7 +93,7 @@ jobs:

- name: Cache Vendor
id: cache-vendor
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: vendor
key: ${{ runner.os }}-cdc-integration-vendor-${{ hashFiles('go.sum') }}
Expand Down Expand Up @@ -145,7 +145,7 @@ jobs:

- name: Cache Vendor
id: cache-vendor
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: vendor
key: ${{ runner.os }}-cdc-integration-vendor-${{ hashFiles('go.sum') }}
Expand Down
20 changes: 18 additions & 2 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,15 +780,31 @@ func (s *snapshot) doDropTable(tbInfo *model.TableInfo, currentTs uint64) {
// truncateTable truncate the table with the given ID, and replace it with a new `tbInfo`.
// NOTE: after a table is truncated:
// - physicalTableByID(id) will return nil;
// - IsTruncateTableID(id) should return true.
// - IsTruncateTableID(physicalTableID) should return true.
func (s *snapshot) truncateTable(id int64, tbInfo *model.TableInfo, currentTs uint64) (err error) {
old, ok := s.physicalTableByID(id)
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(id)
}
s.doDropTable(old, currentTs)
s.doCreateTable(tbInfo, currentTs)
s.truncatedTables.ReplaceOrInsert(newVersionedID(id, negative(currentTs)))
tag := negative(currentTs)
// when the table is a partition table, we have to record all partition ids
if old.GetPartitionInfo() != nil {
newPi := tbInfo.GetPartitionInfo()
oldPi := old.GetPartitionInfo()
newPartitionIDMap := make(map[int64]struct{}, len(newPi.Definitions))
for _, partition := range newPi.Definitions {
newPartitionIDMap[partition.ID] = struct{}{}
}
for _, partition := range oldPi.Definitions {
if _, ok := newPartitionIDMap[partition.ID]; !ok {
s.truncatedTables.ReplaceOrInsert(newVersionedID(partition.ID, tag))
}
}
} else {
s.truncatedTables.ReplaceOrInsert(newVersionedID(id, tag))
}
s.currentTs = currentTs
log.Debug("truncate table success",
zap.String("schema", tbInfo.TableName.Schema),
Expand Down
4 changes: 2 additions & 2 deletions cdc/entry/schema/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestTable(t *testing.T) {
require.False(t, ok)
_, ok = snap.PhysicalTableByID(11 + 65536)
require.False(t, ok)
require.True(t, snap.IsTruncateTableID(11))
require.True(t, snap.IsTruncateTableID(11+65536))
_, ok = snap.PhysicalTableByID(12)
require.True(t, ok)
_, ok = snap.PhysicalTableByID(12 + 65536)
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestDrop(t *testing.T) {
require.Equal(t, 1, snap.inner.schemaNameToID.Len())
require.Equal(t, 1, snap.inner.tableNameToID.Len())
require.Equal(t, 1, snap.inner.partitions.Len())
require.Equal(t, 0, snap.inner.truncatedTables.Len())
require.Equal(t, 1, snap.inner.truncatedTables.Len())
require.Equal(t, 2, snap.inner.ineligibleTables.Len())
}

Expand Down
4 changes: 4 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,10 @@ type DDLEvent struct {
Done atomic.Bool `msg:"-"`
Charset string `msg:"-"`
Collate string `msg:"-"`
// Seq is used to order the DDLs with the same commit ts
// Only used in the splited DDLEvent generated by a multi-table DDL,
// we need to keep the order of the original multi-table DDL
Seq uint64 `msg:"seq"`
}

// FromJob fills the values with DDLEvent from DDL job
Expand Down
Loading
Loading