diff --git a/pkg/errors/error.go b/pkg/errors/error.go index a2b2407c20..b87e290d33 100644 --- a/pkg/errors/error.go +++ b/pkg/errors/error.go @@ -165,6 +165,14 @@ var ( "unknown '%s' message protocol for sink", errors.RFCCodeText("CDC:ErrSinkUnknownProtocol"), ) + ErrExecDDLFailed = errors.Normalize( + "exec DDL failed %s", + errors.RFCCodeText("CDC:ErrExecDDLFailed"), + ) + ErrDDLStateNotFound = errors.Normalize( + "DDL state not found %s", + errors.RFCCodeText("CDC:ErrDDLStateNotFound"), + ) ErrMySQLTxnError = errors.Normalize( "MySQL txn error", errors.RFCCodeText("CDC:ErrMySQLTxnError"), diff --git a/pkg/sink/mysql/helper.go b/pkg/sink/mysql/helper.go index ec7bf202b5..415df2ff4f 100644 --- a/pkg/sink/mysql/helper.go +++ b/pkg/sink/mysql/helper.go @@ -37,6 +37,17 @@ import ( "go.uber.org/zap" ) +const checkRunningAddIndexSQL = ` +SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY +FROM information_schema.ddl_jobs +WHERE TABLE_ID = "%d" + AND JOB_TYPE LIKE "add index%%" + AND (STATE = "running" OR STATE = "queueing"); +` + +const checkRunningSQL = `SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY FROM information_schema.ddl_jobs + WHERE CREATE_TIME >= "%s" AND QUERY = "%s";` + // CheckIfBDRModeIsSupported checks if the downstream supports BDR mode. func CheckIfBDRModeIsSupported(ctx context.Context, db *sql.DB) (bool, error) { isTiDB := CheckIsTiDB(ctx, db) @@ -365,6 +376,17 @@ func needSwitchDB(event *commonEvent.DDLEvent) bool { return true } +func needWaitAsyncExecDone(t timodel.ActionType) bool { + switch t { + case timodel.ActionCreateTable, timodel.ActionCreateTables: + return false + case timodel.ActionCreateSchema: + return false + default: + return true + } +} + // SetWriteSource sets write source for the transaction. // When this variable is set to a value other than 0, data written in this session is considered to be written by TiCDC. // DDLs executed in a PRIMARY cluster can be replicated to a SECONDARY cluster by TiCDC. @@ -436,3 +458,53 @@ func getSQLErrCode(err error) (errors.ErrCode, bool) { return errors.ErrCode(mysqlErr.Number), true } + +func getDDLCreateTime(ctx context.Context, db *sql.DB) string { + ddlCreateTime := "" // default when scan failed + row, err := db.QueryContext(ctx, "BEGIN; SET @ticdc_ts := TIDB_PARSE_TSO(@@tidb_current_ts); ROLLBACK; SELECT @ticdc_ts; SET @ticdc_ts=NULL;") + if err != nil { + log.Warn("selecting tidb current timestamp failed", zap.Error(err)) + } else { + for row.Next() { + err = row.Scan(&ddlCreateTime) + if err != nil { + log.Warn("getting ddlCreateTime failed", zap.Error(err)) + } + } + //nolint:sqlclosecheck + _ = row.Close() + _ = row.Err() + } + return ddlCreateTime +} + +// getDDLStateFromTiDB retrieves the ddl job status of the ddl query from downstream tidb based on the ddl query and the approximate ddl create time. +func getDDLStateFromTiDB(ctx context.Context, db *sql.DB, ddl string, createTime string) (timodel.JobState, error) { + // ddlCreateTime and createTime are both based on UTC timezone of downstream + showJobs := fmt.Sprintf(checkRunningSQL, createTime, ddl) + //nolint:rowserrcheck + jobsRows, err := db.QueryContext(ctx, showJobs) + if err != nil { + return timodel.JobStateNone, err + } + + var jobsResults [][]string + jobsResults, err = export.GetSpecifiedColumnValuesAndClose(jobsRows, "QUERY", "STATE", "JOB_ID", "JOB_TYPE", "SCHEMA_STATE") + if err != nil { + return timodel.JobStateNone, err + } + if len(jobsResults) > 0 { + result := jobsResults[0] + state, jobID, jobType, schemaState := result[1], result[2], result[3], result[4] + log.Debug("Find ddl state in downstream", + zap.String("jobID", jobID), + zap.String("jobType", jobType), + zap.String("schemaState", schemaState), + zap.String("ddl", ddl), + zap.String("state", state), + zap.Any("jobsResults", jobsResults), + ) + return timodel.StrToJobState(result[1]), nil + } + return timodel.JobStateNone, nil +} diff --git a/pkg/sink/mysql/mysql_writer.go b/pkg/sink/mysql/mysql_writer.go index 34657829aa..cdaf8a92f6 100644 --- a/pkg/sink/mysql/mysql_writer.go +++ b/pkg/sink/mysql/mysql_writer.go @@ -16,7 +16,6 @@ package mysql import ( "context" "database/sql" - "sync" "time" lru "github.com/hashicorp/golang-lru" @@ -51,10 +50,6 @@ type Writer struct { ddlTsTableInit bool tableSchemaStore *util.TableSchemaStore - // asyncDDLState is used to store the state of async ddl. - // key: tableID, value: state(0: unknown state , 1: executing, 2: no executing ddl) - asyncDDLState sync.Map - // implement stmtCache to improve performance, especially when the downstream is TiDB stmtCache *lru.Cache // Indicate if the CachePrepStmts should be enabled or not @@ -84,7 +79,6 @@ func NewWriter( ChangefeedID: changefeedID, lastCleanSyncPointTime: time.Now(), ddlTsTableInit: false, - asyncDDLState: sync.Map{}, cachePrepStmts: cfg.CachePrepStmts, maxAllowedPacket: cfg.MaxAllowedPacket, stmtCache: cfg.stmtCache, @@ -115,15 +109,7 @@ func (w *Writer) FlushDDLEvent(event *commonEvent.DDLEvent) error { // first we check whether there is some async ddl executed now. w.waitAsyncDDLDone(event) } - - // check the ddl should by async or sync executed. - if needAsyncExecDDL(event.GetDDLType()) && w.cfg.IsTiDB { - // for async exec ddl, we don't flush ddl ts here. Because they don't block checkpointTs. - err := w.asyncExecAddIndexDDLIfTimeout(event) - if err != nil { - return errors.Trace(err) - } - } else if !(event.TiDBOnly && !w.cfg.IsTiDB) { + if !(event.TiDBOnly && !w.cfg.IsTiDB) { if w.cfg.IsTiDB { // if downstream is tidb, we write ddl ts before ddl first, and update the ddl ts item after ddl executed, // to ensure the atomic with ddl writing when server is restarted. @@ -134,7 +120,6 @@ func (w *Writer) FlushDDLEvent(event *commonEvent.DDLEvent) error { if err != nil { return err } - // We need to record ddl' ts after each ddl for each table in the downstream when sink is mysql-compatible. // Only in this way, when the node restart, we can continue sync data from the last ddl ts at least. // Otherwise, after restarting, we may sync old data in new schema, which will leading to data loss. diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index 6ff1f37bb8..79b017d126 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -18,103 +18,18 @@ import ( "fmt" "time" - "github.com/pingcap/errors" + "github.com/go-sql-driver/mysql" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/apperror" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" - cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/retry" timodel "github.com/pingcap/tidb/pkg/meta/model" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" "go.uber.org/zap" ) -func (w *Writer) asyncExecAddIndexDDLIfTimeout(event *commonEvent.DDLEvent) error { - var tableIDs []int64 - switch event.GetBlockedTables().InfluenceType { - // only normal type may have ddl need to async exec - case commonEvent.InfluenceTypeNormal: - tableIDs = event.GetBlockedTables().TableIDs - default: - } - - for _, tableID := range tableIDs { - // change the async ddl state to 1, means the tableID have async table ddl executing - w.asyncDDLState.Store(tableID, 1) - } - - done := make(chan error, 1) - - tick := time.NewTimer(2 * time.Second) - defer tick.Stop() - log.Info("async exec add index ddl start", - zap.Uint64("commitTs", event.FinishedTs), - zap.String("ddl", event.GetDDLQuery())) - go func() { - if err := w.execDDLWithMaxRetries(event); err != nil { - log.Error("async exec add index ddl failed", - zap.Uint64("commitTs", event.FinishedTs), - zap.String("ddl", event.GetDDLQuery())) - done <- err - return - } - log.Info("async exec add index ddl done", - zap.Uint64("commitTs", event.FinishedTs), - zap.String("ddl", event.GetDDLQuery())) - done <- nil - - for _, tableID := range tableIDs { - // change the async ddl state to 2, means the tableID don't have async table ddl executing - w.asyncDDLState.Store(tableID, 2) - } - }() - - select { - case err := <-done: - // if the ddl is executed within 2 seconds, we just return the result to the caller. - return err - case <-tick.C: - // if the ddl is still running, we just return nil, - // then if the ddl is failed, the downstream ddl is lost. - // because the checkpoint ts is forwarded. - log.Info("async add index ddl is still running", - zap.Uint64("commitTs", event.FinishedTs), - zap.String("ddl", event.GetDDLQuery())) - return nil - } -} - -func needAsyncExecDDL(ddlType timodel.ActionType) bool { - switch ddlType { - case timodel.ActionAddIndex: - return true - default: - return false - } -} - -func needTimeoutCheck(ddlType timodel.ActionType) bool { - if needAsyncExecDDL(ddlType) { - return false - } - switch ddlType { - // partition related - case timodel.ActionAddTablePartition, timodel.ActionExchangeTablePartition, timodel.ActionReorganizePartition: - return false - // reorg related - case timodel.ActionAddPrimaryKey, timodel.ActionAddIndex, timodel.ActionModifyColumn: - return false - // following ddls can be fast when the downstream is TiDB, we must - // still take them into consideration to ensure compatibility with all - // MySQL-compatible databases. - case timodel.ActionAddColumn, timodel.ActionAddColumns, timodel.ActionDropColumn, timodel.ActionDropColumns: - return false - default: - return true - } -} - func (w *Writer) execDDL(event *commonEvent.DDLEvent) error { if w.cfg.DryRun { log.Info("Dry run DDL", zap.String("sql", event.GetDDLQuery())) @@ -137,15 +52,6 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error { } ctx := w.ctx - // We check the most of the ddl event executed with timeout. - if needTimeoutCheck(event.GetDDLType()) { - writeTimeout, _ := time.ParseDuration(w.cfg.WriteTimeout) - writeTimeout += networkDriftDuration - var cancelFunc func() - ctx, cancelFunc = context.WithTimeout(w.ctx, writeTimeout) - defer cancelFunc() - } - shouldSwitchDB := needSwitchDB(event) // Convert vector type to string type for unsupport database @@ -192,13 +98,18 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error { } if err = tx.Commit(); err != nil { - return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", event.GetDDLQuery()))) + return errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", event.GetDDLQuery()))) } return nil } +// execDDLWithMaxRetries will retry executing DDL statements. +// When a DDL execution takes a long time and an invalid connection error occurs. +// If the downstream is TiDB, it will query the DDL and wait until it finishes. +// For 'add index' ddl, it will return immediately without waiting and will query it during the next DDL execution. func (w *Writer) execDDLWithMaxRetries(event *commonEvent.DDLEvent) error { + ddlCreateTime := getDDLCreateTime(w.ctx, w.db) return retry.Do(w.ctx, func() error { err := w.statistics.RecordDDLExecution(func() error { return w.execDDL(event) }) if err != nil { @@ -210,10 +121,15 @@ func (w *Writer) execDDLWithMaxRetries(event *commonEvent.DDLEvent) error { // If the error is ignorable, we will ignore the error directly. return nil } + if w.cfg.IsTiDB && ddlCreateTime != "" && errors.Cause(err) == mysql.ErrInvalidConn { + log.Warn("Wait the asynchronous ddl to synchronize", zap.String("ddl", event.Query), zap.String("ddlCreateTime", ddlCreateTime), + zap.String("readTimeout", w.cfg.ReadTimeout), zap.Error(err)) + return w.waitDDLDone(w.ctx, event, ddlCreateTime) + } log.Warn("Execute DDL with error, retry later", zap.String("ddl", event.Query), zap.Error(err)) - return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Execute DDL failed, Query info: %s; ", event.GetDDLQuery()))) + return errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Execute DDL failed, Query info: %s; ", event.GetDDLQuery()))) } log.Info("Execute DDL succeeded", zap.String("changefeed", w.ChangefeedID.String()), @@ -225,6 +141,46 @@ func (w *Writer) execDDLWithMaxRetries(event *commonEvent.DDLEvent) error { retry.WithIsRetryableErr(apperror.IsRetryableDDLError)) } +// waitDDLDone wait current ddl +func (w *Writer) waitDDLDone(ctx context.Context, ddl *commonEvent.DDLEvent, ddlCreateTime string) error { + ticker := time.NewTicker(5 * time.Second) + ticker1 := time.NewTicker(10 * time.Minute) + defer ticker.Stop() + defer ticker1.Stop() + for { + state, err := getDDLStateFromTiDB(ctx, w.db, ddl.Query, ddlCreateTime) + if err != nil { + log.Error("Error when getting DDL state from TiDB", zap.Error(err)) + } + switch state { + case timodel.JobStateDone, timodel.JobStateSynced: + log.Info("DDL replicate success", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime)) + return nil + case timodel.JobStateCancelled, timodel.JobStateRollingback, timodel.JobStateRollbackDone, timodel.JobStateCancelling: + return errors.ErrExecDDLFailed.GenWithStackByArgs(ddl.Query) + case timodel.JobStateRunning, timodel.JobStateQueueing: + switch ddl.GetDDLType() { + // returned immediately if not block dml + case timodel.ActionAddIndex: + log.Info("DDL is running downstream", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime), zap.Any("ddlState", state)) + return nil + } + default: + log.Warn("Unexpected DDL state, may not be found downstream", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime), zap.Any("ddlState", state)) + return errors.ErrDDLStateNotFound.GenWithStackByArgs(state) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + case <-ticker1.C: + log.Info("DDL is still running downstream, it blocks other DDL or DML events", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime)) + } + } +} + +// waitAsyncDDLDone wait previous ddl func (w *Writer) waitAsyncDDLDone(event *commonEvent.DDLEvent) { if !needWaitAsyncExecDone(event.GetDDLType()) { return @@ -244,51 +200,25 @@ func (w *Writer) waitAsyncDDLDone(event *commonEvent.DDLEvent) { if tableID == 0 { continue } - state, ok := w.asyncDDLState.Load(tableID) - if !ok { - // query the downstream, - // if the ddl is still running, we should wait for it. - w.checkAndWaitAsyncDDLDoneDownstream(tableID) - // update async ddl state - w.asyncDDLState.Store(tableID, 2) - // TODO - } else if state.(int) == 1 { - w.waitTableAsyncDDLDone(tableID) - } - } -} - -func (w *Writer) waitTableAsyncDDLDone(tableID int64) { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - for { - select { - case <-w.ctx.Done(): - return - case <-ticker.C: - state, ok := w.asyncDDLState.Load(tableID) - if ok && state.(int) == 2 { - return - } + // query the downstream, + // if the ddl is still running, we should wait for it. + err := w.checkAndWaitAsyncDDLDoneDownstream(tableID) + if err != nil { + log.Error("check previous asynchronous ddl failed", + zap.String("namespace", w.ChangefeedID.Namespace()), + zap.Stringer("changefeed", w.ChangefeedID), + zap.Error(err)) } } } -var checkRunningAddIndexSQL = ` -SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY -FROM information_schema.ddl_jobs -WHERE TABLE_ID = "%d" - AND JOB_TYPE LIKE "add index%%" - AND (STATE = "running" OR STATE = "queueing"); -` - // true means the async ddl is still running, false means the async ddl is done. func (w *Writer) doQueryAsyncDDL(tableID int64, query string) (bool, error) { start := time.Now() rows, err := w.db.QueryContext(w.ctx, query) log.Debug("query duration", zap.Any("duration", time.Since(start)), zap.Any("query", query)) if err != nil { - return false, cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to query ddl jobs table; Query is %s", query))) + return false, errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to query ddl jobs table; Query is %s", query))) } defer rows.Close() @@ -302,7 +232,7 @@ func (w *Writer) doQueryAsyncDDL(tableID int64, query string) (bool, error) { noRows = false err := rows.Scan(&jobID, &jobType, &schemaState, &state) if err != nil { - return false, cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to query ddl jobs table; Query is %s", query))) + return false, errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("failed to query ddl jobs table; Query is %s", query))) } log.Info("async ddl is still running", @@ -353,14 +283,3 @@ func (w *Writer) checkAndWaitAsyncDDLDoneDownstream(tableID int64) error { } } } - -func needWaitAsyncExecDone(t timodel.ActionType) bool { - switch t { - case timodel.ActionCreateTable, timodel.ActionCreateTables: - return false - case timodel.ActionCreateSchema: - return false - default: - return true - } -} diff --git a/pkg/sink/mysql/mysql_writer_test.go b/pkg/sink/mysql/mysql_writer_test.go index 686f35affb..54d74bbec9 100644 --- a/pkg/sink/mysql/mysql_writer_test.go +++ b/pkg/sink/mysql/mysql_writer_test.go @@ -16,10 +16,12 @@ package mysql import ( "context" "database/sql" + "fmt" "testing" "time" "github.com/DATA-DOG/go-sqlmock" + "github.com/go-sql-driver/mysql" "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" @@ -375,6 +377,7 @@ func TestMysqlWriter_AsyncDDL(t *testing.T) { Query: job.Query, SchemaName: job.SchemaName, TableName: job.TableName, + Type: byte(job.Type), FinishedTs: 1, BlockedTables: &commonEvent.InfluencedTables{ InfluenceType: commonEvent.InfluenceTypeNormal, @@ -406,11 +409,11 @@ func TestMysqlWriter_AsyncDDL(t *testing.T) { mock.ExpectExec("INSERT INTO tidb_cdc.ddl_ts_v1 (ticdc_cluster_id, changefeed, ddl_ts, table_id, table_name_in_ddl_job, db_name_in_ddl_job, finished, is_syncpoint) VALUES ('default', 'test/test', '1', 0, '', '', 0, 0), ('default', 'test/test', '1', 1, '', '', 0, 0) ON DUPLICATE KEY UPDATE finished=VALUES(finished), table_name_in_ddl_job=VALUES(table_name_in_ddl_job), db_name_in_ddl_job=VALUES(db_name_in_ddl_job), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() + mock.ExpectQuery("BEGIN; SET @ticdc_ts := TIDB_PARSE_TSO(@@tidb_current_ts); ROLLBACK; SELECT @ticdc_ts; SET @ticdc_ts=NULL;").WillReturnRows(sqlmock.NewRows([]string{"@ticdc_ts"}).AddRow("2021-05-26 11:33:37.776000")) mock.ExpectBegin() mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("create table t (id int primary key, name varchar(32));").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - mock.ExpectBegin() mock.ExpectExec("INSERT INTO tidb_cdc.ddl_ts_v1 (ticdc_cluster_id, changefeed, ddl_ts, table_id, table_name_in_ddl_job, db_name_in_ddl_job, finished, is_syncpoint) VALUES ('default', 'test/test', '1', 0, '', '', 1, 0), ('default', 'test/test', '1', 1, '', '', 1, 0) ON DUPLICATE KEY UPDATE finished=VALUES(finished), table_name_in_ddl_job=VALUES(table_name_in_ddl_job), db_name_in_ddl_job=VALUES(db_name_in_ddl_job), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() @@ -422,11 +425,22 @@ func TestMysqlWriter_AsyncDDL(t *testing.T) { err = mock.ExpectationsWereMet() require.NoError(t, err) + mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, 1)).WillReturnError(sqlmock.ErrCancelled) + mock.ExpectBegin() + mock.ExpectExec("INSERT INTO tidb_cdc.ddl_ts_v1 (ticdc_cluster_id, changefeed, ddl_ts, table_id, table_name_in_ddl_job, db_name_in_ddl_job, finished, is_syncpoint) VALUES ('default', 'test/test', '1', 1, '', '', 0, 0) ON DUPLICATE KEY UPDATE finished=VALUES(finished), table_name_in_ddl_job=VALUES(table_name_in_ddl_job), db_name_in_ddl_job=VALUES(db_name_in_ddl_job), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + mock.ExpectQuery("BEGIN; SET @ticdc_ts := TIDB_PARSE_TSO(@@tidb_current_ts); ROLLBACK; SELECT @ticdc_ts; SET @ticdc_ts=NULL;").WillReturnRows(sqlmock.NewRows([]string{"@ticdc_ts"}).AddRow("2021-05-26 11:33:37.776000")) mock.ExpectBegin() mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) log.Info("before add index") - mock.ExpectExec("alter table t add index nameIndex(name);").WillDelayFor(10 * time.Second).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("alter table t add index nameIndex(name);").WillDelayFor(10 * time.Second).WillReturnError(mysql.ErrInvalidConn) log.Info("after add index") + mock.ExpectQuery(fmt.Sprintf(checkRunningSQL, "2021-05-26 11:33:37.776000", "alter table t add index nameIndex(name);")). + WillReturnRows(sqlmock.NewRows([]string{"JOB_ID", "JOB_TYPE", "SCHEMA_STATE", "SCHEMA_ID", "TABLE_ID", "STATE", "QUERY"}). + AddRow("", "", "", "", "", "running", "alter table t add index nameIndex(name);")) + mock.ExpectBegin() + mock.ExpectExec("INSERT INTO tidb_cdc.ddl_ts_v1 (ticdc_cluster_id, changefeed, ddl_ts, table_id, table_name_in_ddl_job, db_name_in_ddl_job, finished, is_syncpoint) VALUES ('default', 'test/test', '1', 1, '', '', 1, 0) ON DUPLICATE KEY UPDATE finished=VALUES(finished), table_name_in_ddl_job=VALUES(table_name_in_ddl_job), db_name_in_ddl_job=VALUES(db_name_in_ddl_job), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() // for dml event mock.ExpectBegin() @@ -439,29 +453,25 @@ func TestMysqlWriter_AsyncDDL(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("INSERT INTO tidb_cdc.ddl_ts_v1 (ticdc_cluster_id, changefeed, ddl_ts, table_id, table_name_in_ddl_job, db_name_in_ddl_job, finished, is_syncpoint) VALUES ('default', 'test/test', '2', 0, '', '', 0, 0), ('default', 'test/test', '2', 2, '', '', 0, 0) ON DUPLICATE KEY UPDATE finished=VALUES(finished), table_name_in_ddl_job=VALUES(table_name_in_ddl_job), db_name_in_ddl_job=VALUES(db_name_in_ddl_job), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - + mock.ExpectQuery("BEGIN; SET @ticdc_ts := TIDB_PARSE_TSO(@@tidb_current_ts); ROLLBACK; SELECT @ticdc_ts; SET @ticdc_ts=NULL;").WillReturnRows(sqlmock.NewRows([]string{"@ticdc_ts"}).AddRow("2021-05-26 11:33:37.776000")) mock.ExpectBegin() mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("create table t1 (id int primary key, name varchar(32));").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - mock.ExpectBegin() mock.ExpectExec("INSERT INTO tidb_cdc.ddl_ts_v1 (ticdc_cluster_id, changefeed, ddl_ts, table_id, table_name_in_ddl_job, db_name_in_ddl_job, finished, is_syncpoint) VALUES ('default', 'test/test', '2', 0, '', '', 1, 0), ('default', 'test/test', '2', 2, '', '', 1, 0) ON DUPLICATE KEY UPDATE finished=VALUES(finished), table_name_in_ddl_job=VALUES(table_name_in_ddl_job), db_name_in_ddl_job=VALUES(db_name_in_ddl_job), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - // this commit is for add index ddl - mock.ExpectCommit() - // for add column ddl for table t + mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, 1)).WillReturnError(sqlmock.ErrCancelled) mock.ExpectBegin() mock.ExpectExec("INSERT INTO tidb_cdc.ddl_ts_v1 (ticdc_cluster_id, changefeed, ddl_ts, table_id, table_name_in_ddl_job, db_name_in_ddl_job, finished, is_syncpoint) VALUES ('default', 'test/test', '3', 1, '', '', 0, 0) ON DUPLICATE KEY UPDATE finished=VALUES(finished), table_name_in_ddl_job=VALUES(table_name_in_ddl_job), db_name_in_ddl_job=VALUES(db_name_in_ddl_job), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - + mock.ExpectQuery("BEGIN; SET @ticdc_ts := TIDB_PARSE_TSO(@@tidb_current_ts); ROLLBACK; SELECT @ticdc_ts; SET @ticdc_ts=NULL;").WillReturnRows(sqlmock.NewRows([]string{"@ticdc_ts"}).AddRow("2021-05-26 11:33:37.776000")) mock.ExpectBegin() mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("alter table t add column age int;").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - mock.ExpectBegin() mock.ExpectExec("INSERT INTO tidb_cdc.ddl_ts_v1 (ticdc_cluster_id, changefeed, ddl_ts, table_id, table_name_in_ddl_job, db_name_in_ddl_job, finished, is_syncpoint) VALUES ('default', 'test/test', '3', 1, '', '', 1, 0) ON DUPLICATE KEY UPDATE finished=VALUES(finished), table_name_in_ddl_job=VALUES(table_name_in_ddl_job), db_name_in_ddl_job=VALUES(db_name_in_ddl_job), ddl_ts=VALUES(ddl_ts), created_at=NOW(), is_syncpoint=VALUES(is_syncpoint);").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() diff --git a/tests/integration_tests/ddl_wait/conf/diff_config.toml b/tests/integration_tests/ddl_wait/conf/diff_config.toml new file mode 100644 index 0000000000..6435d3b89b --- /dev/null +++ b/tests/integration_tests/ddl_wait/conf/diff_config.toml @@ -0,0 +1,28 @@ +# diff Configuration. +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/ddl_wait/sync_diff/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["test.*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/ddl_wait/run.sh b/tests/integration_tests/ddl_wait/run.sh new file mode 100755 index 0000000000..f9b43e4813 --- /dev/null +++ b/tests/integration_tests/ddl_wait/run.sh @@ -0,0 +1,46 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +# This test simulates DDL operations that take a long time. +# TiCDC blocks DDL operations until its state is not running, except for adding indexes. +# TiCDC also checks add index ddl state before execute a new DDL. +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + cd $CUR + GO111MODULE=on go run test.go + + TOPIC_NAME="ticdc-ddl-wait-test-$RANDOM" + SINK_URI="mysql://root@127.0.0.1:3306/?read-timeout=300ms" + + changefeed_id="ddl-wait" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id} + + run_sql "alter table test.t modify column col decimal(30,10);" + run_sql "alter table test.t add index (col);" + run_sql "create table test.finish_mark (a int primary key);" + check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + # make sure all tables are equal in upstream and downstream + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180 + check_logs_contains $WORK_DIR "DDL replicate success" + check_logs_contains $WORK_DIR "DDL is running downstream" + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/ddl_wait/test.go b/tests/integration_tests/ddl_wait/test.go new file mode 100644 index 0000000000..204e385a03 --- /dev/null +++ b/tests/integration_tests/ddl_wait/test.go @@ -0,0 +1,97 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "database/sql" + "fmt" + "log" + "os" + "sync" + + _ "github.com/go-sql-driver/mysql" +) + +func main() { + upHost := GetEnvDefault("UP_TIDB_HOST", "127.0.0.1") + upPort := GetEnvDefault("UP_TIDB_PORT", "4000") + downHost := GetEnvDefault("UP_TIDB_HOST", "127.0.0.1") + downPort := GetEnvDefault("UP_TIDB_PORT", "3306") + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + run(upHost, upPort) + }() + go func() { + defer wg.Done() + run(downHost, downPort) + }() + wg.Wait() +} + +func run(host string, port string) { + dsn := fmt.Sprintf("root@tcp(%s:%s)/", host, port) + db, err := sql.Open("mysql", dsn) + if err != nil { + log.Fatal("open db failed:", dsn, ", err: ", err) + } + defer db.Close() + + if err = db.Ping(); err != nil { + log.Fatal("ping db failed:", dsn, ", err: ", err) + } + log.Println("connect to tidb success, dsn: ", dsn) + + createTable := `create table if not exists test.t ( + id int primary key, + col decimal(65,30) + );` + insertDML := `insert into test.t values (?, ?)` + + concurrency := 200 + maxRowCnt := 1000000 + num := maxRowCnt / concurrency + db.SetMaxOpenConns(concurrency) + + _, err = db.Exec(createTable) + if err != nil { + log.Fatal("create table failed:, err: ", err) + } + + var wg sync.WaitGroup + for k := 0; k < concurrency; k++ { + wg.Add(1) + go func(k int) { + defer wg.Done() + for i := 0; i < num; i++ { + val := k*num + i + _, err = db.Exec(insertDML, val, val) + if err != nil { + log.Fatal("insert value failed:, err: ", err) + } + } + }(k) + } + wg.Wait() +} + +func GetEnvDefault(key, defaultV string) string { + val, ok := os.LookupEnv(key) + if !ok { + return defaultV + } + return val +} diff --git a/tests/integration_tests/run_light_it_in_ci.sh b/tests/integration_tests/run_light_it_in_ci.sh index 7af96f55f4..2bba758c4e 100755 --- a/tests/integration_tests/run_light_it_in_ci.sh +++ b/tests/integration_tests/run_light_it_in_ci.sh @@ -54,7 +54,7 @@ mysql_groups=( # G10 'changefeed_error bdr_mode fail_over_ddl_K' # G11 - 'multi_tables_ddl ddl_attributes multi_cdc_cluster fail_over_ddl_L' + 'multi_tables_ddl ddl_attributes multi_cdc_cluster ddl_wait fail_over_ddl_L' # G12 'row_format tiflash multi_rocks fail_over_ddl_M' # G13