sink(ticdc): retry later when meet unexpected DDL state (#12483)#12515
sink(ticdc): retry later when meet unexpected DDL state (#12483)#12515ti-chi-bot wants to merge 1 commit intopingcap:release-8.5from
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
@wk989898 This PR has conflicts, I have hold it. |
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @ti-chi-bot, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the robustness of DDL replication in TiCDC's MySQL sink. By implementing a retry and polling mechanism for DDL operations that encounter unexpected states, the system can now gracefully handle transient issues or long-running DDLs without resorting to full CDC restarts. This change improves the overall stability and resilience of the data synchronization process, particularly in environments with complex or slow DDL executions. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request aims to improve TiCDC's resilience to slow DDLs by introducing a retry mechanism for unexpected DDL states, preventing changefeed failures. However, it introduces a critical SQL injection vulnerability in the getDDLStateFromTiDB function due to constructing a SQL query with untrusted input (ddl.Query) via string formatting. This must be fixed by using parameterized queries. Additionally, the file cdc/sink/ddlsink/mysql/mysql_ddl_sink.go contains unresolved merge conflicts and duplicated code, which will prevent compilation.
| <<<<<<< HEAD | ||
| ======= | ||
| func (m *DDLSink) waitDDLDone(ctx context.Context, ddl *model.DDLEvent, ddlCreateTime string) error { | ||
| ticker := time.NewTicker(5 * time.Second) | ||
| ticker1 := time.NewTicker(10 * time.Minute) | ||
| defer ticker.Stop() | ||
| defer ticker1.Stop() | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case <-ticker.C: | ||
| case <-ticker1.C: | ||
| log.Info("DDL is still running downstream, it blocks other DDL or DML events", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime)) | ||
| } | ||
|
|
||
| state, err := getDDLStateFromTiDB(ctx, m.db, ddl.Query, ddlCreateTime) | ||
| if err != nil { | ||
| log.Error("Error when getting DDL state from TiDB", zap.Error(err)) | ||
| } | ||
| switch state { | ||
| case timodel.JobStateDone, timodel.JobStateSynced: | ||
| log.Info("DDL replicate success", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime)) | ||
| return nil | ||
| case timodel.JobStateCancelled, timodel.JobStateRollingback, timodel.JobStateRollbackDone, timodel.JobStateCancelling: | ||
| return errors.ErrExecDDLFailed.GenWithStackByArgs(ddl.Query) | ||
| case timodel.JobStateRunning, timodel.JobStateQueueing: | ||
| switch ddl.Type { | ||
| // returned immediately if not block dml | ||
| case timodel.ActionAddIndex: | ||
| log.Info("DDL is running downstream", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime), zap.Any("ddlState", state)) | ||
| return nil | ||
| } | ||
| default: | ||
| log.Warn("Unexpected DDL state, may not be found downstream, retry later", zap.String("ddl", ddl.Query), zap.String("ddlCreateTime", ddlCreateTime), zap.Any("ddlState", state)) | ||
| return errors.ErrDDLStateNotFound.GenWithStackByArgs(state) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // WriteCheckpointTs does nothing. | ||
| func (m *DDLSink) WriteCheckpointTs(_ context.Context, _ uint64, _ []*model.TableInfo) error { | ||
| // Only for RowSink for now. | ||
| return nil | ||
| } | ||
|
|
||
| // Close closes the database connection. | ||
| func (m *DDLSink) Close() { | ||
| if m.statistics != nil { | ||
| m.statistics.Close() | ||
| } | ||
| if m.db != nil { | ||
| if err := m.db.Close(); err != nil { | ||
| log.Warn("MySQL ddl sink close db wit error", | ||
| zap.String("namespace", m.id.Namespace), | ||
| zap.String("changefeed", m.id.ID), | ||
| zap.Error(err)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| >>>>>>> 1da8b3e23a (sink(ticdc): retry later when meet unexpected DDL state (#12483)) |
There was a problem hiding this comment.
This section of the code contains unresolved merge conflict markers (<<<<<<< HEAD, =======, >>>>>>>) and also introduces duplicated definitions for WriteCheckpointTs and Close functions, which are already defined elsewhere in the file. This will cause a compilation failure. Please resolve the merge conflict and remove the duplicated code.
| showJobs := fmt.Sprintf(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY FROM information_schema.ddl_jobs | ||
| WHERE CREATE_TIME >= "%s" AND QUERY = "%s";`, createTime, ddl) |
There was a problem hiding this comment.
The function getDDLStateFromTiDB constructs a SQL query by directly embedding the ddl and createTime strings into a query string using fmt.Sprintf with double quotes. This leads to a SQL injection vulnerability, as ddl can contain special characters or be manipulated, potentially allowing arbitrary SQL execution, especially since MultiStatements is enabled. To remediate this, use parameterized queries instead of string formatting. After changing the query string as suggested, you'll also need to update the db.QueryContext call on line 403 to pass createTime and ddl as arguments: db.QueryContext(ctx, showJobs, createTime, ddl).
showJobs := `SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY FROM information_schema.ddl_jobs WHERE CREATE_TIME >= ? AND QUERY = ?;`| var jobsResults [][]string | ||
| err := retry.Do(ctx, func() error { | ||
| //nolint:rowserrcheck | ||
| jobsRows, err := db.QueryContext(ctx, showJobs) |
| if err != nil { | ||
| log.Error("Error when getting DDL state from TiDB", zap.Error(err)) | ||
| } |
There was a problem hiding this comment.
The error returned from getDDLStateFromTiDB is logged as an Error but then effectively ignored. The function proceeds to return ErrDDLStateNotFound, which is always retryable. This hides the original error and could lead to excessive retries for what might be a non-retryable issue. It would be better to return the original error and let the caller's retry logic, which uses IsRetryableDDLError, decide whether to retry. This would also provide better error propagation. Consider also logging the error at a Warn level before returning.
if err != nil {
return err
}|
@ti-chi-bot: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
This is an automated cherry-pick of #12483
What problem does this PR solve?
Issue Number: close #12482
What is changed and how it works?
When meeting the unexpected DDL state, instead of immediately failing, CDC now attempts to retry the DDL. This change aims to avoid CDC restart.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note