Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ func (m *DDLSink) waitDDLDone(ctx context.Context, ddl *model.DDLEvent, ddlCreat
defer ticker.Stop()
defer ticker1.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
case <-ticker1.C:
log.Info("DDL is still running downstream, it blocks other DDL or DML events", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime))
}

state, err := getDDLStateFromTiDB(ctx, m.db, ddl.Query, ddlCreateTime)
if err != nil {
log.Error("Error when getting DDL state from TiDB", zap.Error(err))
Expand All @@ -256,17 +264,9 @@ func (m *DDLSink) waitDDLDone(ctx context.Context, ddl *model.DDLEvent, ddlCreat
return nil
}
default:
log.Warn("Unexpected DDL state, may not be found downstream", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime), zap.Any("ddlState", state))
log.Warn("Unexpected DDL state, may not be found downstream, retry later", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime), zap.Any("ddlState", state))
return errors.ErrDDLStateNotFound.GenWithStackByArgs(state)
}

select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
case <-ticker1.C:
log.Info("DDL is still running downstream, it blocks other DDL or DML events", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime))
}
}
}

Expand Down Expand Up @@ -346,17 +346,27 @@ func getDDLStateFromTiDB(ctx context.Context, db *sql.DB, ddl string, createTime
// ddlCreateTime and createTime are both based on UTC timezone of downstream
showJobs := fmt.Sprintf(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY FROM information_schema.ddl_jobs
WHERE CREATE_TIME >= "%s" AND QUERY = "%s";`, createTime, ddl)
//nolint:rowserrcheck
jobsRows, err := db.QueryContext(ctx, showJobs)
if err != nil {
return timodel.JobStateNone, err
}

var jobsResults [][]string
jobsResults, err = export.GetSpecifiedColumnValuesAndClose(jobsRows, "QUERY", "STATE", "JOB_ID", "JOB_TYPE", "SCHEMA_STATE")
err := retry.Do(ctx, func() error {
//nolint:rowserrcheck
jobsRows, err := db.QueryContext(ctx, showJobs)
if err != nil {
log.Warn("failed to query from downstream to get ddl state", zap.Error(err))
return err
}
jobsResults, err = export.GetSpecifiedColumnValuesAndClose(jobsRows, "QUERY", "STATE", "JOB_ID", "JOB_TYPE", "SCHEMA_STATE")
if err != nil {
log.Warn("get jobs results failed", zap.Error(err))
return err
}
return nil
}, retry.WithBackoffBaseDelay(pmysql.BackoffMaxDelay.Milliseconds()),
retry.WithBackoffMaxDelay(pmysql.BackoffMaxDelay.Milliseconds()),
retry.WithMaxTries(defaultDDLMaxRetry))
if err != nil {
return timodel.JobStateNone, err
}

if len(jobsResults) > 0 {
result := jobsResults[0]
state, jobID, jobType, schemaState := result[1], result[2], result[3], result[4]
Expand Down
4 changes: 4 additions & 0 deletions pkg/errorutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func IsRetryableDDLError(err error) bool {
return true
}

if cerror.Is(err, cerror.ErrDDLStateNotFound) {
return true
}

err = errors.Cause(err)
mysqlErr, ok := err.(*gmysql.MySQLError)
if !ok {
Expand Down
2 changes: 2 additions & 0 deletions pkg/errorutil/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
tmysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/engine/framework/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/require"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/raft/v3"
Expand Down Expand Up @@ -112,6 +113,7 @@ func TestIsRetryableDDLError(t *testing.T) {
{newMysqlErr(tmysql.ErrBadDB, "xx"), false},
{newMysqlErr(errno.ErrTableWithoutPrimaryKey, "Unable to create or change a table without a primary key"), false},
{mysql.ErrInvalidConn, true},
{cerror.ErrDDLStateNotFound.GenWithStackByArgs("none"), true},
}

for _, c := range cases {
Expand Down
15 changes: 14 additions & 1 deletion tests/integration_tests/ddl_wait/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,27 @@ function run() {
changefeed_id="ddl-wait"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id}

run_sql "update test.t set col = 11 where id = 1;"
run_sql "alter table test.t modify column col decimal(30,10);"
run_sql "update test.t set col = 22 where id = 2;"
run_sql "alter table test.t add index (col);"
# The downstream add index DDL may finish quickly with fast reorg enabled,
# so we need a short fixed-interval polling to avoid missing the running window.
for i in $(seq 1 120); do
run_sql 'SELECT JOB_ID FROM information_schema.ddl_jobs WHERE DB_NAME = "test" AND TABLE_NAME = "t" AND JOB_TYPE LIKE "add index%" AND (STATE = "running" OR STATE = "queueing") LIMIT 1;' \
"${DOWN_TIDB_HOST}" "${DOWN_TIDB_PORT}" >/dev/null 2>&1 || true
if check_contains 'JOB_ID:' >/dev/null 2>&1; then
break
fi
sleep 0.5
done
check_contains 'JOB_ID:'
run_sql "update test.t set col = 33 where id = 3;"
run_sql "create table test.finish_mark (a int primary key);"
check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
# make sure all tables are equal in upstream and downstream
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180
check_logs_contains $WORK_DIR "DDL replicate success"
check_logs_contains $WORK_DIR "DDL is running downstream"
cleanup_process $CDC_BINARY
}

Expand Down