Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
916b7db
feat: add comprehensive DDL timestamp handling for sink
haiboumich Dec 5, 2025
275df5b
sink(ticdc): align MySQL DDL timestamp handling with upstream default…
haiboumich Jan 12, 2026
f8c9cd3
tiflow: gate DDL session timestamp on origin_default
haiboumich Jan 13, 2026
8002d41
tiflow: update DDL timestamp handling
haiboumich Jan 13, 2026
ef3aeec
ticdc: revert log
haiboumich Jan 13, 2026
50a83e9
ticdc: refine code
haiboumich Jan 13, 2026
d635098
ticdc: refine code
haiboumich Jan 13, 2026
caff434
ticdc: update log
haiboumich Jan 13, 2026
d837830
Removed //go:build intest and // +build intest
haiboumich Jan 15, 2026
5ba109a
Made the targeted‑skip variant: we now ignore SET TIMESTAMP / reset f…
haiboumich Jan 16, 2026
e0b4f36
Added a MySQL‑only integration test that reproduces the ALTER TABLE …
haiboumich Jan 16, 2026
dc7a1ab
Revert "Made the targeted‑skip variant: we now ignore SET TIMESTAMP /…
haiboumich Jan 16, 2026
67267c6
Merge branch 'pingcap:master' into fix-handle-alter-add-default_curre…
haiboumich Jan 16, 2026
980163d
add more dml before and after adding column for integration test
haiboumich Jan 16, 2026
29d6e56
added a pre-DDL SET TIMESTAMP = DEFAULT so every DDL starts from a cl…
haiboumich Jan 19, 2026
83c5cb5
fix ut
haiboumich Jan 19, 2026
20e2425
sink/mysql: make ddl timestamp leak test deterministic
haiboumich Jan 19, 2026
952bafa
avoid system tz noise
haiboumich Jan 20, 2026
3d6848b
deliberate no pre ddl reset
haiboumich Jan 20, 2026
ece8374
failpoint the right place
haiboumich Jan 20, 2026
0dd9d3c
uncomment pre-ddl
haiboumich Jan 20, 2026
6f42244
sink(ticdc): improve DDL timestamp failpoint coverage
haiboumich Jan 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions cdc/sink/ddlsink/mysql/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
tidbmysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"
)

func setSessionTimestamp(ctx context.Context, tx *sql.Tx, unixTimestamp float64) error {
_, err := tx.ExecContext(ctx, fmt.Sprintf("SET TIMESTAMP = %s", formatUnixTimestamp(unixTimestamp)))
return err
}

// resetSessionTimestamp clears session @@timestamp to prevent stale values from
// leaking across DDLs using the same session; it's a cheap safety net before
// and after DDL execution.
func resetSessionTimestamp(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, "SET TIMESTAMP = DEFAULT")
return err
}

func formatUnixTimestamp(unixTimestamp float64) string {
return strconv.FormatFloat(unixTimestamp, 'f', 6, 64)
}

func ddlSessionTimestampFromOriginDefault(ddl *model.DDLEvent, timezone string) (float64, bool) {
if ddl == nil || ddl.TableInfo == nil {
return 0, false
}
targetColumns, err := extractCurrentTimestampDefaultColumns(ddl.Query)
if err != nil || len(targetColumns) == 0 {
return 0, false
}

for _, col := range ddl.TableInfo.Columns {
if _, ok := targetColumns[col.Name.L]; !ok {
continue
}
val := col.GetOriginDefaultValue()
valStr, ok := val.(string)
if !ok || valStr == "" {
continue
}
ts, err := parseOriginDefaultTimestamp(valStr, col, timezone)
if err != nil {
log.Warn("Failed to parse OriginDefaultValue for DDL timestamp",
zap.String("column", col.Name.O),
zap.String("originDefault", valStr),
zap.Error(err))
continue
}
log.Info("Using OriginDefaultValue for DDL timestamp",
zap.String("column", col.Name.O),
zap.String("originDefault", valStr),
zap.Float64("timestamp", ts),
zap.String("timezone", timezone))
return ts, true
}

return 0, false
}

func extractCurrentTimestampDefaultColumns(query string) (map[string]struct{}, error) {
p := parser.New()
stmt, err := p.ParseOneStmt(query, "", "")
if err != nil {
return nil, err
}

cols := make(map[string]struct{})
switch s := stmt.(type) {
case *ast.CreateTableStmt:
for _, col := range s.Cols {
if hasCurrentTimestampDefault(col) {
cols[col.Name.Name.L] = struct{}{}
}
}
case *ast.AlterTableStmt:
for _, spec := range s.Specs {
switch spec.Tp {
case ast.AlterTableAddColumns, ast.AlterTableModifyColumn, ast.AlterTableChangeColumn, ast.AlterTableAlterColumn:
for _, col := range spec.NewColumns {
if hasCurrentTimestampDefault(col) {
cols[col.Name.Name.L] = struct{}{}
}
}
}
}
}

return cols, nil
}

func hasCurrentTimestampDefault(col *ast.ColumnDef) bool {
if col == nil {
return false
}
for _, opt := range col.Options {
if opt.Tp != ast.ColumnOptionDefaultValue {
continue
}
if isCurrentTimestampExpr(opt.Expr) {
return true
}
}
return false
}

func isCurrentTimestampExpr(expr ast.ExprNode) bool {
if expr == nil {
return false
}
switch v := expr.(type) {
case *ast.FuncCallExpr:
return isCurrentTimestampFuncName(v.FnName.L)
case ast.ValueExpr:
return isCurrentTimestampFuncName(strings.ToLower(v.GetString()))
default:
return false
}
}

func isCurrentTimestampFuncName(name string) bool {
switch name {
case ast.CurrentTimestamp, ast.Now, ast.LocalTime, ast.LocalTimestamp:
return true
default:
return false
}
}

func parseOriginDefaultTimestamp(val string, col *timodel.ColumnInfo, timezone string) (float64, error) {
loc, err := resolveOriginDefaultLocation(col, timezone)
if err != nil {
return 0, err
}
return parseTimestampInLocation(val, loc)
}

func resolveOriginDefaultLocation(col *timodel.ColumnInfo, timezone string) (*time.Location, error) {
if col != nil && col.GetType() == tidbmysql.TypeTimestamp && col.Version >= timodel.ColumnInfoVersion1 {
return time.UTC, nil
}
if timezone == "" {
return time.UTC, nil
}
tz := strings.Trim(timezone, "\"")
return time.LoadLocation(tz)
}

func parseTimestampInLocation(val string, loc *time.Location) (float64, error) {
formats := []string{
"2006-01-02 15:04:05",
"2006-01-02 15:04:05.999999",
}
for _, f := range formats {
t, err := time.ParseInLocation(f, val, loc)
if err == nil {
return float64(t.UnixNano()) / float64(time.Second), nil
}
}
return 0, fmt.Errorf("failed to parse timestamp: %s", val)
}

func matchFailpointValue(val failpoint.Value, ddlQuery string) bool {
if val == nil {
return true
}
switch v := val.(type) {
case bool:
return v
case string:
if v == "" {
return true
}
return strings.Contains(strings.ToLower(ddlQuery), strings.ToLower(v))
default:
return true
}
}
44 changes: 44 additions & 0 deletions cdc/sink/ddlsink/mysql/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestMatchFailpointValue(t *testing.T) {
ddl := "ALTER TABLE t ADD COLUMN c2 int"
tests := []struct {
name string
val any
want bool
}{
{name: "nil", val: nil, want: true},
{name: "bool-true", val: true, want: true},
{name: "bool-false", val: false, want: false},
{name: "empty-string", val: "", want: true},
{name: "match-string", val: "c2", want: true},
{name: "match-string-case", val: "C2", want: true},
{name: "no-match", val: "d2", want: false},
{name: "unknown-type", val: 123, want: true},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.want, matchFailpointValue(tc.val, ddl))
})
}
}
97 changes: 93 additions & 4 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func NewDDLSink(
return nil, err
}

failpoint.Inject("MySQLSinkForceSingleConnection", func() {
db.SetMaxIdleConns(1)
db.SetMaxOpenConns(1)
})

cfg.IsTiDB = pmysql.CheckIsTiDB(ctx, db)

cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, db)
Expand Down Expand Up @@ -211,7 +216,65 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
return err
}

// Reset session timestamp before DDL to avoid leaking from pooled connections.
if err := resetSessionTimestamp(ctx, tx); err != nil {
log.Error("Failed to reset session timestamp before DDL execution",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(err))
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("changefeed", m.id.ID), zap.Error(rbErr))
}
return err
}

ddlTimestamp, useSessionTimestamp := ddlSessionTimestampFromOriginDefault(ddl, m.cfg.Timezone)
skipSetTimestamp := false
failpoint.Inject("MySQLSinkSkipSetSessionTimestamp", func(val failpoint.Value) {
skipSetTimestamp = matchFailpointValue(val, ddl.Query)
})
skipResetAfterDDL := false
failpoint.Inject("MySQLSinkSkipResetSessionTimestampAfterDDL", func(val failpoint.Value) {
skipResetAfterDDL = matchFailpointValue(val, ddl.Query)
})

if useSessionTimestamp && skipSetTimestamp {
log.Warn("Skip setting session timestamp due to failpoint",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.String("query", ddl.Query))
}
if useSessionTimestamp && !skipSetTimestamp {
// set the session timestamp to match upstream DDL execution time
if err := setSessionTimestamp(ctx, tx, ddlTimestamp); err != nil {
log.Error("Fail to set session timestamp for DDL",
zap.Float64("timestamp", ddlTimestamp),
zap.Uint64("startTs", ddl.StartTs),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("query", ddl.Query),
zap.Error(err))
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("changefeed", m.id.ID), zap.Error(rbErr))
}
return err
}
}

if _, err = tx.ExecContext(ctx, ddl.Query); err != nil {
log.Error("Failed to ExecContext", zap.Any("err", err), zap.Any("query", ddl.Query))
if useSessionTimestamp {
if skipResetAfterDDL {
log.Warn("Skip resetting session timestamp after DDL execution failure due to failpoint",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.String("query", ddl.Query))
} else if tsErr := resetSessionTimestamp(ctx, tx); tsErr != nil {
log.Warn("Failed to reset session timestamp after DDL execution failure",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Error(tsErr))
}
}
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback",
zap.String("namespace", m.id.Namespace),
Expand All @@ -222,13 +285,39 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
return err
}

if useSessionTimestamp {
// reset session timestamp after DDL execution to avoid affecting subsequent operations
if skipResetAfterDDL {
log.Warn("Skip resetting session timestamp after DDL execution due to failpoint",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.String("query", ddl.Query))
} else if err := resetSessionTimestamp(ctx, tx); err != nil {
log.Error("Failed to reset session timestamp after DDL execution", zap.Error(err))
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("sql", ddl.Query), zap.Error(rbErr))
}
return errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query)))
}
}

if err = tx.Commit(); err != nil {
return errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query)))
}

log.Info("Exec DDL succeeded",
zap.String("namespace", m.id.Namespace), zap.String("changefeed", m.id.ID),
zap.Duration("duration", time.Since(start)), zap.String("sql", ddl.Query))
logFields := []zap.Field{
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
zap.Duration("duration", time.Since(start)),
zap.String("sql", ddl.Query),
}

if useSessionTimestamp {
logFields = append(logFields, zap.Float64("sessionTimestamp", ddlTimestamp))
}

log.Info("Exec DDL succeeded", logFields...)

return nil
}

Expand Down Expand Up @@ -344,7 +433,7 @@ func getDDLCreateTime(ctx context.Context, db *sql.DB) string {
// getDDLStateFromTiDB retrieves the ddl job status of the ddl query from downstream tidb based on the ddl query and the approximate ddl create time.
func getDDLStateFromTiDB(ctx context.Context, db *sql.DB, ddl string, createTime string) (timodel.JobState, error) {
// ddlCreateTime and createTime are both based on UTC timezone of downstream
showJobs := fmt.Sprintf(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY FROM information_schema.ddl_jobs
showJobs := fmt.Sprintf(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY FROM information_schema.ddl_jobs
WHERE CREATE_TIME >= "%s" AND QUERY = "%s";`, createTime, ddl)
//nolint:rowserrcheck
jobsRows, err := db.QueryContext(ctx, showJobs)
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ func TestWriteDDLEvent(t *testing.T) {
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("SET SESSION tidb_cdc_write_source = 1").WillReturnResult(sqlmock.NewResult(1, 0))
mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("SET SESSION tidb_cdc_write_source = 1").WillReturnResult(sqlmock.NewResult(1, 0))
mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("ALTER TABLE test.t1 ADD COLUMN a int").
WillReturnError(&dmysql.MySQLError{
Number: uint16(infoschema.ErrColumnExists.Code()),
Expand Down
Loading