Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/errors/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ var (
"unknown '%s' message protocol for sink",
errors.RFCCodeText("CDC:ErrSinkUnknownProtocol"),
)
ErrExecDDLFailed = errors.Normalize(
"exec DDL failed %s",
errors.RFCCodeText("CDC:ErrExecDDLFailed"),
)
ErrDDLStateNotFound = errors.Normalize(
"DDL state not found %s",
errors.RFCCodeText("CDC:ErrDDLStateNotFound"),
)
ErrMySQLTxnError = errors.Normalize(
"MySQL txn error",
errors.RFCCodeText("CDC:ErrMySQLTxnError"),
Expand Down
72 changes: 72 additions & 0 deletions pkg/sink/mysql/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ import (
"go.uber.org/zap"
)

const checkRunningAddIndexSQL = `
SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY
FROM information_schema.ddl_jobs
WHERE TABLE_ID = "%d"
AND JOB_TYPE LIKE "add index%%"
AND (STATE = "running" OR STATE = "queueing");
`

const checkRunningSQL = `SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY FROM information_schema.ddl_jobs
WHERE CREATE_TIME >= "%s" AND QUERY = "%s";`

// CheckIfBDRModeIsSupported checks if the downstream supports BDR mode.
func CheckIfBDRModeIsSupported(ctx context.Context, db *sql.DB) (bool, error) {
isTiDB := CheckIsTiDB(ctx, db)
Expand Down Expand Up @@ -365,6 +376,17 @@ func needSwitchDB(event *commonEvent.DDLEvent) bool {
return true
}

func needWaitAsyncExecDone(t timodel.ActionType) bool {
switch t {
case timodel.ActionCreateTable, timodel.ActionCreateTables:
return false
case timodel.ActionCreateSchema:
return false
default:
return true
}
}

// SetWriteSource sets write source for the transaction.
// When this variable is set to a value other than 0, data written in this session is considered to be written by TiCDC.
// DDLs executed in a PRIMARY cluster can be replicated to a SECONDARY cluster by TiCDC.
Expand Down Expand Up @@ -436,3 +458,53 @@ func getSQLErrCode(err error) (errors.ErrCode, bool) {

return errors.ErrCode(mysqlErr.Number), true
}

func getDDLCreateTime(ctx context.Context, db *sql.DB) string {
ddlCreateTime := "" // default when scan failed
row, err := db.QueryContext(ctx, "BEGIN; SET @ticdc_ts := TIDB_PARSE_TSO(@@tidb_current_ts); ROLLBACK; SELECT @ticdc_ts; SET @ticdc_ts=NULL;")
if err != nil {
log.Warn("selecting tidb current timestamp failed", zap.Error(err))
} else {
for row.Next() {
err = row.Scan(&ddlCreateTime)
if err != nil {
log.Warn("getting ddlCreateTime failed", zap.Error(err))
}
}
//nolint:sqlclosecheck
_ = row.Close()
_ = row.Err()
}
return ddlCreateTime
}

// getDDLStateFromTiDB retrieves the ddl job status of the ddl query from downstream tidb based on the ddl query and the approximate ddl create time.
func getDDLStateFromTiDB(ctx context.Context, db *sql.DB, ddl string, createTime string) (timodel.JobState, error) {
// ddlCreateTime and createTime are both based on UTC timezone of downstream
showJobs := fmt.Sprintf(checkRunningSQL, createTime, ddl)
//nolint:rowserrcheck
jobsRows, err := db.QueryContext(ctx, showJobs)
if err != nil {
return timodel.JobStateNone, err
}

var jobsResults [][]string
jobsResults, err = export.GetSpecifiedColumnValuesAndClose(jobsRows, "QUERY", "STATE", "JOB_ID", "JOB_TYPE", "SCHEMA_STATE")
if err != nil {
return timodel.JobStateNone, err
}
if len(jobsResults) > 0 {
result := jobsResults[0]
state, jobID, jobType, schemaState := result[1], result[2], result[3], result[4]
log.Debug("Find ddl state in downstream",
zap.String("jobID", jobID),
zap.String("jobType", jobType),
zap.String("schemaState", schemaState),
zap.String("ddl", ddl),
zap.String("state", state),
zap.Any("jobsResults", jobsResults),
)
return timodel.StrToJobState(result[1]), nil
}
return timodel.JobStateNone, nil
}
17 changes: 1 addition & 16 deletions pkg/sink/mysql/mysql_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package mysql
import (
"context"
"database/sql"
"sync"
"time"

lru "github.com/hashicorp/golang-lru"
Expand Down Expand Up @@ -51,10 +50,6 @@ type Writer struct {
ddlTsTableInit bool
tableSchemaStore *util.TableSchemaStore

// asyncDDLState is used to store the state of async ddl.
// key: tableID, value: state(0: unknown state , 1: executing, 2: no executing ddl)
asyncDDLState sync.Map

// implement stmtCache to improve performance, especially when the downstream is TiDB
stmtCache *lru.Cache
// Indicate if the CachePrepStmts should be enabled or not
Expand Down Expand Up @@ -84,7 +79,6 @@ func NewWriter(
ChangefeedID: changefeedID,
lastCleanSyncPointTime: time.Now(),
ddlTsTableInit: false,
asyncDDLState: sync.Map{},
cachePrepStmts: cfg.CachePrepStmts,
maxAllowedPacket: cfg.MaxAllowedPacket,
stmtCache: cfg.stmtCache,
Expand Down Expand Up @@ -115,15 +109,7 @@ func (w *Writer) FlushDDLEvent(event *commonEvent.DDLEvent) error {
// first we check whether there is some async ddl executed now.
w.waitAsyncDDLDone(event)
}

// check the ddl should by async or sync executed.
if needAsyncExecDDL(event.GetDDLType()) && w.cfg.IsTiDB {
// for async exec ddl, we don't flush ddl ts here. Because they don't block checkpointTs.
err := w.asyncExecAddIndexDDLIfTimeout(event)
if err != nil {
return errors.Trace(err)
}
} else if !(event.TiDBOnly && !w.cfg.IsTiDB) {
if !(event.TiDBOnly && !w.cfg.IsTiDB) {
if w.cfg.IsTiDB {
// if downstream is tidb, we write ddl ts before ddl first, and update the ddl ts item after ddl executed,
// to ensure the atomic with ddl writing when server is restarted.
Expand All @@ -134,7 +120,6 @@ func (w *Writer) FlushDDLEvent(event *commonEvent.DDLEvent) error {
if err != nil {
return err
}

// We need to record ddl' ts after each ddl for each table in the downstream when sink is mysql-compatible.
// Only in this way, when the node restart, we can continue sync data from the last ddl ts at least.
// Otherwise, after restarting, we may sync old data in new schema, which will leading to data loss.
Expand Down
Loading