From 8644f1e09abb7e71bb59657fc14c1560dad6e493 Mon Sep 17 00:00:00 2001 From: suetin Date: Mon, 2 Feb 2026 05:15:17 +0300 Subject: [PATCH 01/23] allow different source hosts for external replication --- internal/app/replication.go | 61 ++++++++++++++++++++++++++---- internal/mysql/data.go | 5 +++ internal/mysql/node.go | 17 ++++++++- internal/mysql/queries.go | 13 ++++++- internal/mysql/replication.go | 70 ++++++++++++++++++++++++++++++++--- 5 files changed, 151 insertions(+), 15 deletions(-) diff --git a/internal/app/replication.go b/internal/app/replication.go index d1e54a78..8fb7d174 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -20,6 +20,7 @@ type ReplicationRepairAlgorithmType int const ( StartSlave ReplicationRepairAlgorithmType = iota ResetSlave + ChangeSource ) type ReplicationRepairState struct { @@ -82,7 +83,7 @@ func (app *App) TryRepairReplication(node *mysql.Node, master string, channel st return } - algorithmType, count, err := app.getSuitableAlgorithmType(replState) + algorithmType, count, err := app.getSuitableAlgorithmType(replState, channel) if err != nil { app.logger.Errorf("repair error: host %s, %v", node.Host(), err) return @@ -160,8 +161,45 @@ func ResetSlaveAlgorithm(app *App, node *mysql.Node, master string, channel stri return nil } -func (app *App) getSuitableAlgorithmType(state *ReplicationRepairState) (ReplicationRepairAlgorithmType, int, error) { - for i := range app.getAlgorithmOrder() { +func ChangeSourceAlgorithm(app *App, node *mysql.Node, _ string, channel string) error { + app.logger.Infof("repair: trying to repair replication using ChangeSourceAlgorithm...") + if channel != app.config.ExternalReplicationChannel { + app.logger.Infof("ChangeSourceAlgorithm works only for external replication") + return nil + } + replicationSources, err := node.GetExternalReplicationSources() + if err != nil { + return err + } + if len(replicationSources) == 0 { + return nil + } + replicaStatus, err := app.externalReplication.GetReplicaStatus(node) + if err != nil { + return err + } + // mark current source as error and then trying to change it + app.externalReplication.SetSourcesStatus(replicaStatus.GetMasterHost(), mysql.ErrorStatus) + for _, source := range replicationSources { + value := app.externalReplication.GetSourcesStatus(source.SourceHost) + if value == mysql.ErrorStatus { + continue + } + app.logger.Infof("ChangeSourceAlgorithm repair: trying change source to %s", source.SourceHost) + err := app.externalReplication.ChangeSourceHost(node, source.SourceHost) + if err != nil { + return err + } + return nil + } + // if there was no return from the loop above then we assume that all hosts are now marked as error + // so we shall reset sources status and try again on next iteration + app.externalReplication.ResetSourcesStatus() + return nil +} + +func (app *App) getSuitableAlgorithmType(state *ReplicationRepairState, channel string) (ReplicationRepairAlgorithmType, int, error) { + for i := range app.getAlgorithmOrder(channel) { algorithmType := ReplicationRepairAlgorithmType(i) count := state.History[algorithmType] if count < app.config.ReplicationRepairMaxAttempts { @@ -207,7 +245,7 @@ func (app *App) createRepairState(hostname, channel string) (*ReplicationRepairS LastGTIDExecuted: status.GetExecutedGtidSet(), } - for i := range app.getAlgorithmOrder() { + for i := range app.getAlgorithmOrder(channel) { result.History[ReplicationRepairAlgorithmType(i)] = 0 } @@ -223,7 +261,15 @@ var aggressiveOrder = []ReplicationRepairAlgorithmType{ ResetSlave, } -func (app *App) getAlgorithmOrder() []ReplicationRepairAlgorithmType { +var externalReplicationOrder = []ReplicationRepairAlgorithmType{ + StartSlave, + ChangeSource, +} + +func (app *App) getAlgorithmOrder(channel string) []ReplicationRepairAlgorithmType { + if channel == app.config.ExternalReplicationChannel { + return externalReplicationOrder + } if app.config.ReplicationRepairAggressiveMode { return aggressiveOrder } else { @@ -232,8 +278,9 @@ func (app *App) getAlgorithmOrder() []ReplicationRepairAlgorithmType { } var mapping = map[ReplicationRepairAlgorithmType]RepairReplicationAlgorithm{ - StartSlave: StartSlaveAlgorithm, - ResetSlave: ResetSlaveAlgorithm, + StartSlave: StartSlaveAlgorithm, + ResetSlave: ResetSlaveAlgorithm, + ChangeSource: ChangeSourceAlgorithm, } func getRepairAlgorithm(algoType ReplicationRepairAlgorithmType) RepairReplicationAlgorithm { diff --git a/internal/mysql/data.go b/internal/mysql/data.go index cfe15b68..1eadd499 100644 --- a/internal/mysql/data.go +++ b/internal/mysql/data.go @@ -61,6 +61,11 @@ type replicationSettings struct { ReplicationFilter sql.NullString `db:"ReplicationFilter"` } +type ReplicationSource struct { + SourceHost string `db:"SourceHost"` + Priority int `db:"Priority"` +} + // SlaveStatusStruct contains SHOW SLAVE STATUS response type SlaveStatusStruct struct { MasterHost string `db:"Master_Host"` diff --git a/internal/mysql/node.go b/internal/mysql/node.go index aacc581e..2b177c57 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -1050,7 +1050,10 @@ func (n *Node) GetStartupTime() (time.Time, error) { func (n *Node) UpdateExternalCAFile() error { var replSettings replicationSettings - err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings) + err := n.queryRow(queryGetExternalReplicationSettings, map[string]any{ + "channel": n.config.ExternalReplicationChannel, + }, + &replSettings) if err != nil { return nil } @@ -1350,3 +1353,15 @@ func (n *Node) GetListSlaveSideDisabledEventsQuery() (string, error) { return queryListSlavesideDisabledEvents, nil } } + +func (n *Node) GetExternalReplicationSources() ([]ReplicationSource, error) { + var replicationSources []ReplicationSource + err := n.queryRow(queryGetExternalReplicationSources, map[string]any{ + "channel": n.config.ExternalReplicationChannel, + }, + &replicationSources) + if IsErrorTableDoesNotExists(err) || err == sql.ErrNoRows { + return replicationSources, nil + } + return replicationSources, err +} diff --git a/internal/mysql/queries.go b/internal/mysql/queries.go index 5d6ca27b..36c2a892 100644 --- a/internal/mysql/queries.go +++ b/internal/mysql/queries.go @@ -58,6 +58,8 @@ const ( queryCalcReplMonTSDelay = "calc_repl_mon_ts_delay" queryCreateReplMonTable = "create_repl_mon_table" queryUpdateReplMon = "update_repl_mon" + queryGetExternalReplicationSources = "get_external_replication_sources" + queryChangeSourceHost = "change_source_host" ) var DefaultQueries = map[string]string{ @@ -135,7 +137,7 @@ var DefaultQueries = map[string]string{ replication_status AS ReplicationStatus, replication_filter AS ReplicationFilter FROM mysql.replication_settings - WHERE channel_name = 'external'`, + WHERE channel_name = :channel`, queryChangeSource: `CHANGE REPLICATION SOURCE TO SOURCE_HOST = :host, SOURCE_PORT = :port, @@ -181,4 +183,13 @@ var DefaultQueries = map[string]string{ WHERE @@read_only = 0 ) ON DUPLICATE KEY UPDATE ts = CURRENT_TIMESTAMP(3)`, + queryGetExternalReplicationSources: `SELECT + source_host AS SourceHost, + priority AS Priority + FROM mysql.replication_sources + WHERE channel_name = :channel + ORDER BY priority DESC`, + queryChangeSourceHost: `CHANGE REPLICATION SOURCE TO + SOURCE_HOST = :host, + FOR CHANNEL :channel`, } diff --git a/internal/mysql/replication.go b/internal/mysql/replication.go index 1bf20ebb..2b23ccaf 100644 --- a/internal/mysql/replication.go +++ b/internal/mysql/replication.go @@ -15,6 +15,10 @@ type IExternalReplication interface { GetReplicaStatus(*Node) (ReplicaStatus, error) Stop(*Node) error IsRunningByUser(*Node) bool + ChangeSourceHost(*Node, string) error + GetSourcesStatus(string) ExternalSourceStatus + SetSourcesStatus(string, ExternalSourceStatus) + ResetSourcesStatus() } type UnimplementedExternalReplication struct{} @@ -47,8 +51,33 @@ func (d *UnimplementedExternalReplication) Stop(*Node) error { return nil } +func (d *UnimplementedExternalReplication) ChangeSourceHost(*Node, string) error { + return nil +} + +func (d *UnimplementedExternalReplication) GetSourcesStatus(string) ExternalSourceStatus { + return UnknownStatus +} + +func (d *UnimplementedExternalReplication) SetSourcesStatus(string, ExternalSourceStatus) { + +} + +func (d *UnimplementedExternalReplication) ResetSourcesStatus() { + +} + +type ExternalSourceStatus int + +const ( + UnknownStatus ExternalSourceStatus = iota + OkStatus + ErrorStatus +) + type ExternalReplication struct { - logger *log.Logger + logger *log.Logger + sourcesStatus map[string]ExternalSourceStatus } func NewExternalReplication(replicationType util.ExternalReplicationType, logger *log.Logger) (IExternalReplication, error) { @@ -74,7 +103,10 @@ func (er *ExternalReplication) IsSupported(n *Node) (bool, error) { func (er *ExternalReplication) Set(n *Node) error { var replSettings replicationSettings - err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings) + err := n.queryRow(queryGetExternalReplicationSettings, map[string]any{ + "channel": n.config.ExternalReplicationChannel, + }, + &replSettings) if err != nil { // If no table in scheme then we consider external replication not existing so we do nothing if IsErrorTableDoesNotExists(err) { @@ -112,14 +144,14 @@ func (er *ExternalReplication) Set(n *Node) error { "retryCount": n.config.MySQL.ReplicationRetryCount, "connectRetry": n.config.MySQL.ReplicationConnectRetry, "heartbeatPeriod": n.config.MySQL.ReplicationHeartbeatPeriod, - "channel": "external", + "channel": n.config.ExternalReplicationChannel, }) if err != nil { return err } err = n.execMogrify(queryIgnoreDB, map[string]any{ "ignoreList": schemaname("mysql"), - "channel": "external", + "channel": n.config.ExternalReplicationChannel, }) if err != nil { return err @@ -128,7 +160,7 @@ func (er *ExternalReplication) Set(n *Node) error { if filter.Valid && filter.String != "" { err = n.execMogrify(querySetReplFilter, map[string]any{ "filter": inlinestr(filter.String), - "channel": "external", + "channel": n.config.ExternalReplicationChannel, }) if err != nil { return err @@ -142,7 +174,10 @@ func (er *ExternalReplication) Set(n *Node) error { func (er *ExternalReplication) IsRunningByUser(n *Node) bool { var replSettings replicationSettings - err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings) + err := n.queryRow(queryGetExternalReplicationSettings, map[string]any{ + "channel": n.config.ExternalReplicationChannel, + }, + &replSettings) if err != nil { return false } @@ -215,3 +250,26 @@ func (er *ExternalReplication) Reset(n *Node) error { } return nil } + +func (er *ExternalReplication) ChangeSourceHost(n *Node, host string) error { + return n.execMogrify(queryChangeSourceHost, map[string]any{ + "host": host, + "channel": n.config.ExternalReplicationChannel, + }) +} + +func (er *ExternalReplication) GetSourcesStatus(host string) ExternalSourceStatus { + value, ok := er.sourcesStatus[host] + if ok { + return value + } + return UnknownStatus +} + +func (er *ExternalReplication) SetSourcesStatus(host string, status ExternalSourceStatus) { + er.sourcesStatus[host] = status +} + +func (er *ExternalReplication) ResetSourcesStatus() { + er.sourcesStatus = make(map[string]ExternalSourceStatus) +} From c7eb85a4ba20b8d6245a1f6f9ea820371352c94a Mon Sep 17 00:00:00 2001 From: suetin Date: Mon, 2 Feb 2026 09:12:25 +0300 Subject: [PATCH 02/23] fix getAlgorithmOrder call --- internal/app/replication.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/app/replication.go b/internal/app/replication.go index 8fb7d174..ab6c6f9b 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -75,7 +75,7 @@ func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) { func (app *App) TryRepairReplication(node *mysql.Node, master string, channel string) { replState, err := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel), node.Host(), channel) if err != nil { - app.logger.Errorf("repair error: host %s, %v", node.Host(), err) + app.logger.Errorf("repair error on channel %s: host %s, %v", channel, node.Host(), err) return } @@ -85,14 +85,14 @@ func (app *App) TryRepairReplication(node *mysql.Node, master string, channel st algorithmType, count, err := app.getSuitableAlgorithmType(replState, channel) if err != nil { - app.logger.Errorf("repair error: host %s, %v", node.Host(), err) + app.logger.Errorf("repair error on channel %s: host %s, %v", channel, node.Host(), err) return } algorithm := getRepairAlgorithm(algorithmType) err = algorithm(app, node, master, channel) if err != nil { - app.logger.Errorf("repair error: %v", err) + app.logger.Errorf("repair error on channel %s: host %s, %v", channel, node.Host(), err) } replState.History[algorithmType] = count + 1 @@ -199,7 +199,7 @@ func ChangeSourceAlgorithm(app *App, node *mysql.Node, _ string, channel string) } func (app *App) getSuitableAlgorithmType(state *ReplicationRepairState, channel string) (ReplicationRepairAlgorithmType, int, error) { - for i := range app.getAlgorithmOrder(channel) { + for _, i := range app.getAlgorithmOrder(channel) { algorithmType := ReplicationRepairAlgorithmType(i) count := state.History[algorithmType] if count < app.config.ReplicationRepairMaxAttempts { From 319e28d1809045710dee2bf9e84cf83584f704fd Mon Sep 17 00:00:00 2001 From: suetin Date: Mon, 2 Feb 2026 09:23:44 +0300 Subject: [PATCH 03/23] fix getAlgorithmOrder call --- internal/app/replication.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/app/replication.go b/internal/app/replication.go index ab6c6f9b..617af17c 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -199,8 +199,7 @@ func ChangeSourceAlgorithm(app *App, node *mysql.Node, _ string, channel string) } func (app *App) getSuitableAlgorithmType(state *ReplicationRepairState, channel string) (ReplicationRepairAlgorithmType, int, error) { - for _, i := range app.getAlgorithmOrder(channel) { - algorithmType := ReplicationRepairAlgorithmType(i) + for _, algorithmType := range app.getAlgorithmOrder(channel) { count := state.History[algorithmType] if count < app.config.ReplicationRepairMaxAttempts { return algorithmType, count, nil From 38c3b523482bf42bd11e370d4a59509baaa07f9a Mon Sep 17 00:00:00 2001 From: suetin Date: Mon, 2 Feb 2026 10:17:38 +0300 Subject: [PATCH 04/23] fixes --- internal/app/app.go | 1 + internal/app/replication.go | 5 ++++- internal/mysql/node.go | 4 ++-- internal/mysql/queries.go | 2 +- internal/mysql/replication.go | 4 ++-- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 41f586db..b2cbee24 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -2059,6 +2059,7 @@ func (app *App) repairExternalReplication(masterNode *mysql.Node) { } if app.externalReplication.IsRunningByUser(masterNode) && !extReplStatus.ReplicationRunning() { + app.logger.Info("repair (external): calling TryRepairReplication") // TODO: remove "". Master is not needed for external replication now app.TryRepairReplication(masterNode, "", app.config.ExternalReplicationChannel) } diff --git a/internal/app/replication.go b/internal/app/replication.go index 617af17c..173e1119 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -95,7 +95,9 @@ func (app *App) TryRepairReplication(node *mysql.Node, master string, channel st app.logger.Errorf("repair error on channel %s: host %s, %v", channel, node.Host(), err) } - replState.History[algorithmType] = count + 1 + if algorithmType != ChangeSource { + replState.History[algorithmType] = count + 1 + } replState.LastAttempt = time.Now() } @@ -172,6 +174,7 @@ func ChangeSourceAlgorithm(app *App, node *mysql.Node, _ string, channel string) return err } if len(replicationSources) == 0 { + app.logger.Infof("No available sources in external replication sources table found for channel %s", channel) return nil } replicaStatus, err := app.externalReplication.GetReplicaStatus(node) diff --git a/internal/mysql/node.go b/internal/mysql/node.go index 2b177c57..46d74186 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -1050,7 +1050,7 @@ func (n *Node) GetStartupTime() (time.Time, error) { func (n *Node) UpdateExternalCAFile() error { var replSettings replicationSettings - err := n.queryRow(queryGetExternalReplicationSettings, map[string]any{ + err := n.queryRowMogrify(queryGetExternalReplicationSettings, map[string]any{ "channel": n.config.ExternalReplicationChannel, }, &replSettings) @@ -1356,7 +1356,7 @@ func (n *Node) GetListSlaveSideDisabledEventsQuery() (string, error) { func (n *Node) GetExternalReplicationSources() ([]ReplicationSource, error) { var replicationSources []ReplicationSource - err := n.queryRow(queryGetExternalReplicationSources, map[string]any{ + err := n.queryRowMogrify(queryGetExternalReplicationSources, map[string]any{ "channel": n.config.ExternalReplicationChannel, }, &replicationSources) diff --git a/internal/mysql/queries.go b/internal/mysql/queries.go index 36c2a892..914a0a4a 100644 --- a/internal/mysql/queries.go +++ b/internal/mysql/queries.go @@ -188,7 +188,7 @@ var DefaultQueries = map[string]string{ priority AS Priority FROM mysql.replication_sources WHERE channel_name = :channel - ORDER BY priority DESC`, + ORDER BY Priority DESC`, queryChangeSourceHost: `CHANGE REPLICATION SOURCE TO SOURCE_HOST = :host, FOR CHANNEL :channel`, diff --git a/internal/mysql/replication.go b/internal/mysql/replication.go index 2b23ccaf..4e5d782d 100644 --- a/internal/mysql/replication.go +++ b/internal/mysql/replication.go @@ -103,7 +103,7 @@ func (er *ExternalReplication) IsSupported(n *Node) (bool, error) { func (er *ExternalReplication) Set(n *Node) error { var replSettings replicationSettings - err := n.queryRow(queryGetExternalReplicationSettings, map[string]any{ + err := n.queryRowMogrify(queryGetExternalReplicationSettings, map[string]any{ "channel": n.config.ExternalReplicationChannel, }, &replSettings) @@ -174,7 +174,7 @@ func (er *ExternalReplication) Set(n *Node) error { func (er *ExternalReplication) IsRunningByUser(n *Node) bool { var replSettings replicationSettings - err := n.queryRow(queryGetExternalReplicationSettings, map[string]any{ + err := n.queryRowMogrify(queryGetExternalReplicationSettings, map[string]any{ "channel": n.config.ExternalReplicationChannel, }, &replSettings) From 47f2011f6bcaa5af259e30f9217e15e046417470 Mon Sep 17 00:00:00 2001 From: suetin Date: Mon, 2 Feb 2026 10:38:45 +0300 Subject: [PATCH 05/23] fixes --- internal/app/replication.go | 4 ++-- internal/mysql/node.go | 12 ++++++------ internal/mysql/replication.go | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/internal/app/replication.go b/internal/app/replication.go index 173e1119..b4976317 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -173,7 +173,7 @@ func ChangeSourceAlgorithm(app *App, node *mysql.Node, _ string, channel string) if err != nil { return err } - if len(replicationSources) == 0 { + if replicationSources == nil { app.logger.Infof("No available sources in external replication sources table found for channel %s", channel) return nil } @@ -183,7 +183,7 @@ func ChangeSourceAlgorithm(app *App, node *mysql.Node, _ string, channel string) } // mark current source as error and then trying to change it app.externalReplication.SetSourcesStatus(replicaStatus.GetMasterHost(), mysql.ErrorStatus) - for _, source := range replicationSources { + for _, source := range *replicationSources { value := app.externalReplication.GetSourcesStatus(source.SourceHost) if value == mysql.ErrorStatus { continue diff --git a/internal/mysql/node.go b/internal/mysql/node.go index 46d74186..8eae601a 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -1049,11 +1049,11 @@ func (n *Node) GetStartupTime() (time.Time, error) { } func (n *Node) UpdateExternalCAFile() error { - var replSettings replicationSettings + replSettings := new(replicationSettings) err := n.queryRowMogrify(queryGetExternalReplicationSettings, map[string]any{ "channel": n.config.ExternalReplicationChannel, }, - &replSettings) + replSettings) if err != nil { return nil } @@ -1354,14 +1354,14 @@ func (n *Node) GetListSlaveSideDisabledEventsQuery() (string, error) { } } -func (n *Node) GetExternalReplicationSources() ([]ReplicationSource, error) { - var replicationSources []ReplicationSource +func (n *Node) GetExternalReplicationSources() (*[]ReplicationSource, error) { + replicationSources := new([]ReplicationSource) err := n.queryRowMogrify(queryGetExternalReplicationSources, map[string]any{ "channel": n.config.ExternalReplicationChannel, }, - &replicationSources) + replicationSources) if IsErrorTableDoesNotExists(err) || err == sql.ErrNoRows { - return replicationSources, nil + return nil, nil } return replicationSources, err } diff --git a/internal/mysql/replication.go b/internal/mysql/replication.go index 4e5d782d..d8f7ad41 100644 --- a/internal/mysql/replication.go +++ b/internal/mysql/replication.go @@ -102,11 +102,11 @@ func (er *ExternalReplication) IsSupported(n *Node) (bool, error) { } func (er *ExternalReplication) Set(n *Node) error { - var replSettings replicationSettings + replSettings := new(replicationSettings) err := n.queryRowMogrify(queryGetExternalReplicationSettings, map[string]any{ "channel": n.config.ExternalReplicationChannel, }, - &replSettings) + replSettings) if err != nil { // If no table in scheme then we consider external replication not existing so we do nothing if IsErrorTableDoesNotExists(err) { @@ -173,11 +173,11 @@ func (er *ExternalReplication) Set(n *Node) error { } func (er *ExternalReplication) IsRunningByUser(n *Node) bool { - var replSettings replicationSettings + replSettings := new(replicationSettings) err := n.queryRowMogrify(queryGetExternalReplicationSettings, map[string]any{ "channel": n.config.ExternalReplicationChannel, }, - &replSettings) + replSettings) if err != nil { return false } From a46dbbcd1caa91a25518bba2db2ff94deab53143 Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 03:01:16 +0300 Subject: [PATCH 06/23] moar fixes&config 4 tests --- internal/app/replication.go | 20 ++++++++++++++----- internal/mysql/node.go | 33 ++++++++++++++++++++++++++++---- internal/mysql/queries.go | 2 +- internal/mysql/replication.go | 3 ++- tests/images/docker-compose.yaml | 3 +++ tests/images/mysql/mysync.yaml | 2 +- 6 files changed, 51 insertions(+), 12 deletions(-) diff --git a/internal/app/replication.go b/internal/app/replication.go index b4976317..7f192bd6 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -109,7 +109,7 @@ func (app *App) makeReplStateKey(node *mysql.Node, channel string) string { } func StartSlaveAlgorithm(app *App, node *mysql.Node, _ string, channel string) error { - app.logger.Infof("repair: trying to repair replication using StartSlaveAlgorithm...") + app.logger.Infof("repair %s: trying to repair replication using StartSlaveAlgorithm...", channel) if channel == app.config.ExternalReplicationChannel { return app.externalReplication.Start(node) } @@ -123,7 +123,7 @@ func ResetSlaveAlgorithm(app *App, node *mysql.Node, master string, channel stri app.logger.Infof("external repair: don't want to use ResetSlaveAlgorithm, leaving") return nil } - app.logger.Infof("repair: trying to repair replication using ResetSlaveAlgorithm...") + app.logger.Infof("repair %: trying to repair replication using ResetSlaveAlgorithm...", channel) app.logger.Infof("repair: executing set slave offline") err := node.SetOffline() if err != nil { @@ -164,7 +164,7 @@ func ResetSlaveAlgorithm(app *App, node *mysql.Node, master string, channel stri } func ChangeSourceAlgorithm(app *App, node *mysql.Node, _ string, channel string) error { - app.logger.Infof("repair: trying to repair replication using ChangeSourceAlgorithm...") + app.logger.Infof("repair %s: trying to repair replication using ChangeSourceAlgorithm...", channel) if channel != app.config.ExternalReplicationChannel { app.logger.Infof("ChangeSourceAlgorithm works only for external replication") return nil @@ -186,13 +186,23 @@ func ChangeSourceAlgorithm(app *App, node *mysql.Node, _ string, channel string) for _, source := range *replicationSources { value := app.externalReplication.GetSourcesStatus(source.SourceHost) if value == mysql.ErrorStatus { + app.logger.Infof("repair (external): ignoring source host %s due to error status in the past", source.SourceHost) continue } - app.logger.Infof("ChangeSourceAlgorithm repair: trying change source to %s", source.SourceHost) - err := app.externalReplication.ChangeSourceHost(node, source.SourceHost) + app.logger.Infof("repair (external): trying change source to %s", source.SourceHost) + err := app.externalReplication.Stop(node) if err != nil { return err } + err = app.externalReplication.ChangeSourceHost(node, source.SourceHost) + if err != nil { + return err + } + err = app.externalReplication.Start(node) + if err != nil { + return err + } + app.logger.Infof("repair (external): source changed to %s", source.SourceHost) return nil } // if there was no return from the loop above then we assume that all hosts are now marked as error diff --git a/internal/mysql/node.go b/internal/mysql/node.go index 8eae601a..e489e15d 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -242,7 +242,7 @@ func (n *Node) queryRowWithTimeout(queryName string, arg any, result any, timeou } // nolint: unparam -func (n *Node) queryRows(queryName string, arg any, scanner func(*sqlx.Rows) error) error { +func (n *Node) queryRows(queryName string, arg map[string]any, scanner func(*sqlx.Rows) error) error { // TODO we need to rewrite processQuery, to make traceQuery work properly // traceQuery should be called with result, not *Rows return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error { @@ -259,6 +259,24 @@ func (n *Node) queryRows(queryName string, arg any, scanner func(*sqlx.Rows) err }, n.config.DBTimeout) } +// nolint: unparam +func (n *Node) queryRowsMogrify(queryName string, arg map[string]any, scanner func(*sqlx.Rows) error) error { + query := n.getQuery(queryName) + query = Mogrify(query, arg) + return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error { + var err error + + for rows.Next() { + err = scanner(rows) + if err != nil { + break + } + } + + return err + }, n.config.DBTimeout) +} + func (n *Node) processQuery(queryName string, arg any, rowsProcessor func(*sqlx.Rows) error, timeout time.Duration) error { if arg == nil { arg = struct{}{} @@ -1356,10 +1374,17 @@ func (n *Node) GetListSlaveSideDisabledEventsQuery() (string, error) { func (n *Node) GetExternalReplicationSources() (*[]ReplicationSource, error) { replicationSources := new([]ReplicationSource) - err := n.queryRowMogrify(queryGetExternalReplicationSources, map[string]any{ + err := n.queryRowsMogrify(queryGetExternalReplicationSources, map[string]any{ "channel": n.config.ExternalReplicationChannel, - }, - replicationSources) + }, func(rows *sqlx.Rows) error { + var source ReplicationSource + err := rows.StructScan(&source) + if err != nil { + return err + } + *replicationSources = append(*replicationSources, source) + return nil + }) if IsErrorTableDoesNotExists(err) || err == sql.ErrNoRows { return nil, nil } diff --git a/internal/mysql/queries.go b/internal/mysql/queries.go index 914a0a4a..0ae4af29 100644 --- a/internal/mysql/queries.go +++ b/internal/mysql/queries.go @@ -190,6 +190,6 @@ var DefaultQueries = map[string]string{ WHERE channel_name = :channel ORDER BY Priority DESC`, queryChangeSourceHost: `CHANGE REPLICATION SOURCE TO - SOURCE_HOST = :host, + SOURCE_HOST = :host FOR CHANNEL :channel`, } diff --git a/internal/mysql/replication.go b/internal/mysql/replication.go index d8f7ad41..34909e4a 100644 --- a/internal/mysql/replication.go +++ b/internal/mysql/replication.go @@ -85,7 +85,8 @@ func NewExternalReplication(replicationType util.ExternalReplicationType, logger case util.MyExternalReplication: logger.Info("external replication is enabled") return &ExternalReplication{ - logger: logger, + logger: logger, + sourcesStatus: make(map[string]ExternalSourceStatus), }, nil default: logger.Info("external replication is disabled") diff --git a/tests/images/docker-compose.yaml b/tests/images/docker-compose.yaml index 759adb43..f888bc54 100644 --- a/tests/images/docker-compose.yaml +++ b/tests/images/docker-compose.yaml @@ -93,6 +93,7 @@ services: MYSYNC_WAIT_FOR_SLAVE_COUNT: MYSYNC_STREAM_FROM_REASONABLE_LAG: MYSYNC_RESETUP_CRASHED_HOSTS: + MYSYNC_REPLICATION_REPAIR_COOLDOWN: MYSYNC_REPLICATION_REPAIR_AGGRESSIVE_MODE: MYSYNC_SET_RO_TIMEOUT: MYSYNC_REPLICATION_LAG_QUERY: @@ -146,6 +147,7 @@ services: MYSYNC_WAIT_FOR_SLAVE_COUNT: MYSYNC_STREAM_FROM_REASONABLE_LAG: MYSYNC_RESETUP_CRASHED_HOSTS: + MYSYNC_REPLICATION_REPAIR_COOLDOWN: MYSYNC_REPLICATION_REPAIR_AGGRESSIVE_MODE: MYSYNC_SET_RO_TIMEOUT: MYSYNC_REPLICATION_LAG_QUERY: @@ -193,6 +195,7 @@ services: MYSYNC_WAIT_FOR_SLAVE_COUNT: MYSYNC_STREAM_FROM_REASONABLE_LAG: MYSYNC_RESETUP_CRASHED_HOSTS: + MYSYNC_REPLICATION_REPAIR_COOLDOWN: MYSYNC_REPLICATION_REPAIR_AGGRESSIVE_MODE: MYSYNC_SET_RO_TIMEOUT: MYSYNC_REPLICATION_LAG_QUERY: diff --git a/tests/images/mysql/mysync.yaml b/tests/images/mysql/mysync.yaml index 435795c4..f29ac7c8 100644 --- a/tests/images/mysql/mysync.yaml +++ b/tests/images/mysql/mysync.yaml @@ -55,7 +55,7 @@ exclude_users: - 'admin' - 'monitor' - 'event_scheduler' -replication_repair_cooldown: 10s +replication_repair_cooldown: ${MYSYNC_REPLICATION_REPAIR_COOLDOWN:-10s} replication_repair_aggressive_mode: ${MYSYNC_REPLICATION_REPAIR_AGGRESSIVE_MODE:-false} test_filesystem_readonly_file: /tmp/readonly replication_channel: '' From fe811d645b6069cf9bd8169c362cac0d5bf738e8 Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 03:16:11 +0300 Subject: [PATCH 07/23] fixes --- internal/app/replication.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/app/replication.go b/internal/app/replication.go index 7f192bd6..0bd6a5f7 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -123,7 +123,7 @@ func ResetSlaveAlgorithm(app *App, node *mysql.Node, master string, channel stri app.logger.Infof("external repair: don't want to use ResetSlaveAlgorithm, leaving") return nil } - app.logger.Infof("repair %: trying to repair replication using ResetSlaveAlgorithm...", channel) + app.logger.Infof("repair %s: trying to repair replication using ResetSlaveAlgorithm...", channel) app.logger.Infof("repair: executing set slave offline") err := node.SetOffline() if err != nil { From b17a8d7ec0fc1f9a1b5c12a877f45941977224f0 Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 03:36:37 +0300 Subject: [PATCH 08/23] fixes --- internal/mysql/node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/mysql/node.go b/internal/mysql/node.go index e489e15d..702b1a5c 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -263,7 +263,7 @@ func (n *Node) queryRows(queryName string, arg map[string]any, scanner func(*sql func (n *Node) queryRowsMogrify(queryName string, arg map[string]any, scanner func(*sqlx.Rows) error) error { query := n.getQuery(queryName) query = Mogrify(query, arg) - return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error { + return n.processQuery(query, arg, func(rows *sqlx.Rows) error { var err error for rows.Next() { From 31db86ee150d024d68bd0231c8aef33b5676b2ad Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 04:13:53 +0300 Subject: [PATCH 09/23] fixes --- internal/mysql/node.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/internal/mysql/node.go b/internal/mysql/node.go index 702b1a5c..9e9278bc 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -261,9 +261,7 @@ func (n *Node) queryRows(queryName string, arg map[string]any, scanner func(*sql // nolint: unparam func (n *Node) queryRowsMogrify(queryName string, arg map[string]any, scanner func(*sqlx.Rows) error) error { - query := n.getQuery(queryName) - query = Mogrify(query, arg) - return n.processQuery(query, arg, func(rows *sqlx.Rows) error { + return n.processQueryMogrify(queryName, arg, func(rows *sqlx.Rows) error { var err error for rows.Next() { @@ -301,6 +299,27 @@ func (n *Node) processQuery(queryName string, arg any, rowsProcessor func(*sqlx. return rowsProcessor(rows) } +func (n *Node) processQueryMogrify(queryName string, arg map[string]any, rowsProcessor func(*sqlx.Rows) error, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + query := n.getQuery(queryName) + query = Mogrify(query, arg) + db, err := n.GetDB() + if err != nil { + return err + } + rows, err := db.NamedQueryContext(ctx, query, arg) + n.traceQuery(query, arg, rows, err) + if err != nil { + return err + } + + defer func() { _ = rows.Close() }() + + return rowsProcessor(rows) +} + // nolint: unparam func (n *Node) execWithTimeout(queryName string, arg map[string]any, timeout time.Duration) error { if arg == nil { From 4f59f7d2f79cf3b42cfcddc70909752b5d778677 Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 04:19:47 +0300 Subject: [PATCH 10/23] add external_replication_multiple.feature tests --- .github/workflows/docker-tests-8.0.yml | 1 + .github/workflows/docker-tests-8.4.yml | 1 + .../external_replication_multiple.feature | 573 ++++++++++++++++++ 3 files changed, 575 insertions(+) create mode 100644 tests/features/external_replication_multiple.feature diff --git a/.github/workflows/docker-tests-8.0.yml b/.github/workflows/docker-tests-8.0.yml index 38bcb990..6b75f377 100644 --- a/.github/workflows/docker-tests-8.0.yml +++ b/.github/workflows/docker-tests-8.0.yml @@ -55,6 +55,7 @@ jobs: - 'VERSION=8.0 GODOG_FEATURE=crash_recovery.feature make test' - 'VERSION=8.0 GODOG_FEATURE=events_reenable.feature make test' - 'VERSION=8.0 GODOG_FEATURE=external_replication.feature make test' + - 'VERSION=8.0 GODOG_FEATURE=external_replication_multiple.feature make test' - 'VERSION=8.0 GODOG_FEATURE=failover.feature make test' - 'VERSION=8.0 GODOG_FEATURE=free_space.feature make test' - 'VERSION=8.0 GODOG_FEATURE=host_discovery.feature make test' diff --git a/.github/workflows/docker-tests-8.4.yml b/.github/workflows/docker-tests-8.4.yml index 2e6fb400..8809136b 100644 --- a/.github/workflows/docker-tests-8.4.yml +++ b/.github/workflows/docker-tests-8.4.yml @@ -55,6 +55,7 @@ jobs: - 'VERSION=8.4 GODOG_FEATURE=crash_recovery.feature make test' - 'VERSION=8.4 GODOG_FEATURE=events_reenable.84.feature make test' - 'VERSION=8.4 GODOG_FEATURE=external_replication.feature make test' + - 'VERSION=8.4 GODOG_FEATURE=external_replication_multiple.feature make test' - 'VERSION=8.4 GODOG_FEATURE=failover.84.feature make test' - 'VERSION=8.4 GODOG_FEATURE=free_space.feature make test' - 'VERSION=8.4 GODOG_FEATURE=host_discovery.feature make test' diff --git a/tests/features/external_replication_multiple.feature b/tests/features/external_replication_multiple.feature new file mode 100644 index 00000000..c8d32e92 --- /dev/null +++ b/tests/features/external_replication_multiple.feature @@ -0,0 +1,573 @@ +Feature: external replication switchover + + Scenario: external replication works with multiple sources + Given cluster environment is + """ + MYSYNC_REPLICATION_REPAIR_COOLDOWN=30s + """ + Given cluster is up and running + Then mysql host "mysql1" should be master + And mysql host "mysql2" should be replica of "mysql1" + And mysql host "mysql3" should be replica of "mysql1" + When I run SQL on mysql host "mysql1" + """ + CREATE TABLE mysql.replication_settings( + channel_name VARCHAR(50) NOT NULL, + source_host VARCHAR(50) NOT NULL, + source_user VARCHAR(50) NOT NULL, + source_password VARCHAR(50) NOT NULL, + source_port INT UNSIGNED NOT NULL, + source_ssl_ca VARCHAR(4096) NOT NULL DEFAULT '', + source_delay INT UNSIGNED NOT NULL DEFAULT 0, + source_log_file VARCHAR(150) NOT NULL DEFAULT '', + source_log_pos INT UNSIGNED NOT NULL DEFAULT 0, + replication_status ENUM ('stopped', 'running') NOT NULL DEFAULT 'stopped', + replication_filter VARCHAR(4096) NOT NULL DEFAULT '', + PRIMARY KEY (channel_name) + ) ENGINE=INNODB; + """ + And I run SQL on mysql host "mysql1" + """ + INSERT INTO mysql.replication_settings + (channel_name, source_host, source_user, source_password, source_port, replication_status, replication_filter) + VALUES ('external', 'test_source_2', 'test_user_2', 'test_pass_2', 2222, 'running', 'REPLICATE_DO_DB = (testdb1)'); + """ + And I run SQL on mysql host "mysql1" expecting error on number "3074" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + And I run SQL on mysql host "mysql1" + """ + SELECT source_host, source_user, source_password, source_port, replication_filter + FROM mysql.replication_settings WHERE channel_name = 'external' + """ + Then SQL result should match json + """ + [{ + "source_host": "test_source_2", + "source_password": "test_pass_2", + "source_port": 2222, + "source_user": "test_user_2", + "replication_filter": "REPLICATE_DO_DB = (testdb1)" + }] + """ + When I wait for "5" seconds + And I run SQL on mysql host "mysql2" + """ + SELECT source_host, source_user, source_password, source_port, replication_filter + FROM mysql.replication_settings WHERE channel_name = 'external' + """ + Then SQL result should match json + """ + [{ + "source_host": "test_source_2", + "source_port": 2222, + "source_password": "test_pass_2", + "source_user": "test_user_2", + "replication_filter": "REPLICATE_DO_DB = (testdb1)" + }] + """ + And I run SQL on mysql host "mysql1" expecting error on number "3074" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + When I run SQL on mysql host "mysql1" + """ + CHANGE REPLICATION SOURCE TO SOURCE_HOST = 'test_source', + SOURCE_USER = 'test_user', + SOURCE_PASSWORD = 'test_pass', + SOURCE_PORT = 1111, + SOURCE_AUTO_POSITION = 1 + FOR CHANNEL 'external' + """ + And I run SQL on mysql host "mysql2" expecting error on number "3074" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + And I run SQL on mysql host "mysql1" + """ + START REPLICA FOR CHANNEL 'external' + """ + And I run SQL on mysql host "mysql1" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Exec_Source_Log_Pos": 0, + "Replica_IO_State": "Connecting to source", + "Replica_SQL_Running": "Yes", + "Source_Host": "test_source", + "Source_Port": 1111, + "Source_User": "test_user", + "Replica_IO_Running": "Connecting", + "Relay_Source_Log_File": "", + "Channel_Name": "external", + "Source_SSL_CA_File": "" + }] + """ + When I run SQL on mysql host "mysql1" + """ + UPDATE mysql.replication_settings + SET source_ssl_ca = '-----BEGIN CERTIFICATE----- +MIIDDDCCAfSgAwIBAgIBATANBgkqhkiG9w0BAQsFADA/MT0wOwYDVQQDDDRNeVNR +TF9TZXJ2ZXJfOC4wLjMyLTI0X0F1dG9fR2VuZXJhdGVkX0NBX0NlcnRpZmljYXRl +MB4XDTIzMDUxNDE2NDA1OFoXDTMzMDUxMTE2NDA1OFowPzE9MDsGA1UEAww0TXlT +UUxfU2VydmVyXzguMC4zMi0yNF9BdXRvX0dlbmVyYXRlZF9DQV9DZXJ0aWZpY2F0 +ZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAOFExOlSI8gd0LtIko+z +SpVP94Kk0mxRALdNWry6Ua1PoLogq+ScE0OMN6JamaLqG268K5gIdydLOaK9kx2h +4XXyPUTTepuivpnpiI4KqMcaWYQzmot5eoSOOQL6E5hb09oRXY+IhlaynFg0l/E7 +t5uMMUopmcfOH6OGMXTCFXebKbWGnzHx83bXkyzMWWc1p4X+aP18dewHsYuwZOdx +1goNZNNz0BaJq2y0RYnfYeNOLV6d+S6BAMAUkWbABdols8Pi8ezsPwZ8x/1vk7uy +tUOmiuMkLsC6LzJnnUaoGR3tflCH+yU3XSPQpnZYzaFaeA3d6mgV93w7y3Jreavx +tHkCAwEAAaMTMBEwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEA +dZ9vGVJaAauomoDp9VY4zOr0G4n7WnEElqMAxOQPzLJwRXe81/GchmUKWvX5Fc6o +6RiEa7Nw4YiXKyFMqoJbQN3j8EkOiHs1FtrwJNsobzmlVmjuqxqCBWmVQPpUfOQh +f6I/gQr2BVxvNsj+IvuI0vIVjP5J3GBxL9ySvFKsfp4xtk1oTHIuA2G3haIv2AJp +j/Hm7nVvoXWrb/zX+fagi0rrf+3hDCsHMXtxaxXk2sGRLKHgkTYTVwEPQ6SKEqrW +qnSOx+SMl4up6AVfEq6kVR8ZIt/CzJBWZ4qYQnOf0eK4KQC6UB22adzsaFMmhzRB +YZQy1bHIhscLf8wjTYbzAg== +-----END CERTIFICATE-----' + """ + Then host "mysql1" should have file "/etc/mysql/ssl/external_CA.pem" within "10" seconds + And host "mysql2" should have no file "/etc/mysql/ssl/external_CA.pem" + And file "/etc/mysql/ssl/external_CA.pem" on host "mysql1" should have content + """ +-----BEGIN CERTIFICATE----- +MIIDDDCCAfSgAwIBAgIBATANBgkqhkiG9w0BAQsFADA/MT0wOwYDVQQDDDRNeVNR +TF9TZXJ2ZXJfOC4wLjMyLTI0X0F1dG9fR2VuZXJhdGVkX0NBX0NlcnRpZmljYXRl +MB4XDTIzMDUxNDE2NDA1OFoXDTMzMDUxMTE2NDA1OFowPzE9MDsGA1UEAww0TXlT +UUxfU2VydmVyXzguMC4zMi0yNF9BdXRvX0dlbmVyYXRlZF9DQV9DZXJ0aWZpY2F0 +ZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAOFExOlSI8gd0LtIko+z +SpVP94Kk0mxRALdNWry6Ua1PoLogq+ScE0OMN6JamaLqG268K5gIdydLOaK9kx2h +4XXyPUTTepuivpnpiI4KqMcaWYQzmot5eoSOOQL6E5hb09oRXY+IhlaynFg0l/E7 +t5uMMUopmcfOH6OGMXTCFXebKbWGnzHx83bXkyzMWWc1p4X+aP18dewHsYuwZOdx +1goNZNNz0BaJq2y0RYnfYeNOLV6d+S6BAMAUkWbABdols8Pi8ezsPwZ8x/1vk7uy +tUOmiuMkLsC6LzJnnUaoGR3tflCH+yU3XSPQpnZYzaFaeA3d6mgV93w7y3Jreavx +tHkCAwEAAaMTMBEwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEA +dZ9vGVJaAauomoDp9VY4zOr0G4n7WnEElqMAxOQPzLJwRXe81/GchmUKWvX5Fc6o +6RiEa7Nw4YiXKyFMqoJbQN3j8EkOiHs1FtrwJNsobzmlVmjuqxqCBWmVQPpUfOQh +f6I/gQr2BVxvNsj+IvuI0vIVjP5J3GBxL9ySvFKsfp4xtk1oTHIuA2G3haIv2AJp +j/Hm7nVvoXWrb/zX+fagi0rrf+3hDCsHMXtxaxXk2sGRLKHgkTYTVwEPQ6SKEqrW +qnSOx+SMl4up6AVfEq6kVR8ZIt/CzJBWZ4qYQnOf0eK4KQC6UB22adzsaFMmhzRB +YZQy1bHIhscLf8wjTYbzAg== +-----END CERTIFICATE----- + """ + When I run command on host "mysql1" + """ + mysync switch --to mysql2 --wait=0s + """ + Then command return code should be "0" + And command output should match regexp + """ + switchover scheduled + """ + And zookeeper node "/test/switch" should match json + """ + { + "from": "", + "to": "mysql2" + } + """ + Then zookeeper node "/test/last_switch" should match json within "60" seconds + """ + { + "from": "", + "to": "mysql2", + "result": { + "ok": true + } + } + """ + Then mysql host "mysql2" should be master + And mysql host "mysql2" should be writable + When I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source_2", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + And host "mysql2" should have file "/etc/mysql/ssl/external_CA.pem" within "10" seconds + + When host "mysql1" is started + Then mysql host "mysql1" should become available within "20" seconds + And mysql host "mysql1" should become replica of "mysql2" within "10" seconds + And I run SQL on mysql host "mysql1" expecting error on number "3074" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then I run SQL on mysql host "mysql1" + """ + SELECT source_host, source_user, source_password, source_port FROM mysql.replication_settings WHERE channel_name = 'external' + """ + Then SQL result should match json + """ + [{ + "source_host": "test_source_2", + "source_user": "test_user_2", + "source_password": "test_pass_2", + "source_port": 2222 + }] + """ + When I run SQL on mysql host "mysql2" + """ + CREATE TABLE mysql.replication_sources( + channel_name VARCHAR(50) NOT NULL, + source_host VARCHAR(50) NOT NULL, + priority INT NOT NULL DEFAULT 0, + PRIMARY KEY (channel_name, source_host) + ) ENGINE=INNODB + """ + Then I wait for "60" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source_2", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + When I run SQL on mysql host "mysql2" + """ + INSERT INTO mysql.replication_sources (channel_name, source_host, priority) VALUES + ('external', 'test_source', 100), + ('external', 'test_source_2', 50), + ('external', 'test_source_3', 10) + """ + Then I wait for "30" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source_2", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "16" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "16" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "16" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source_3", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "16" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source_3", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "16" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "16" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "16" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source_2", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "16" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source_2", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "16" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "16" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "16" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source3", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "16" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source_3", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ + Then I wait for "30" seconds + And I run SQL on mysql host "mysql2" + """ + SHOW REPLICA STATUS FOR CHANNEL 'external' + """ + Then SQL result should match json + """ + [{ + "Replica_IO_State": "Connecting to source", + "Source_Host": "test_source", + "Source_Port": 2222, + "Source_User": "test_user_2", + "Replica_IO_Running": "Connecting", + "Replica_SQL_Running": "Yes", + "Relay_Source_Log_File": "", + "Exec_Source_Log_Pos": 0, + "Channel_Name": "external", + "Replicate_Ignore_DB": "mysql", + "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", + "Replicate_Do_DB": "testdb1" + }] + """ From f689521d19ea7d83f2d11da5b53fe540ab7372d8 Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 04:42:09 +0300 Subject: [PATCH 11/23] fixes --- tests/features/external_replication_multiple.feature | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/features/external_replication_multiple.feature b/tests/features/external_replication_multiple.feature index c8d32e92..1fbdb4a4 100644 --- a/tests/features/external_replication_multiple.feature +++ b/tests/features/external_replication_multiple.feature @@ -263,7 +263,7 @@ YZQy1bHIhscLf8wjTYbzAg== ('external', 'test_source_2', 50), ('external', 'test_source_3', 10) """ - Then I wait for "30" seconds + Then I wait for "45" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' From 0a974894c6e7bd3a39f49dcce0d5ec3894ff0ced Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 04:53:46 +0300 Subject: [PATCH 12/23] fixes --- .../external_replication_multiple.feature | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/features/external_replication_multiple.feature b/tests/features/external_replication_multiple.feature index 1fbdb4a4..ec8d992a 100644 --- a/tests/features/external_replication_multiple.feature +++ b/tests/features/external_replication_multiple.feature @@ -285,7 +285,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "15" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -307,7 +307,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "15" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -329,7 +329,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "15" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -351,7 +351,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "15" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -373,7 +373,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "45" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -395,7 +395,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "15" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -417,7 +417,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "15" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -439,7 +439,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "15" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -461,7 +461,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "45" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -483,7 +483,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "15" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -505,7 +505,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "15" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -527,7 +527,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "15" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -549,7 +549,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "30" seconds + Then I wait for "45" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' From 64e7e0cc2e95e73cdf991c8e08ac8f326eeb8db8 Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 05:05:50 +0300 Subject: [PATCH 13/23] fixes --- .../external_replication_multiple.feature | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/features/external_replication_multiple.feature b/tests/features/external_replication_multiple.feature index ec8d992a..66373e7a 100644 --- a/tests/features/external_replication_multiple.feature +++ b/tests/features/external_replication_multiple.feature @@ -285,7 +285,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "15" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -307,7 +307,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "15" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -329,7 +329,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "15" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -351,7 +351,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "15" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -395,7 +395,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "15" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -417,7 +417,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "15" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -439,7 +439,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "15" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -483,7 +483,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "15" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -505,7 +505,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "15" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -527,7 +527,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "15" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' From f64c587ebfce182a956638a2720c3ded832515d2 Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 05:52:39 +0300 Subject: [PATCH 14/23] fixes --- .../external_replication_multiple.feature | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/features/external_replication_multiple.feature b/tests/features/external_replication_multiple.feature index 66373e7a..fbaf8342 100644 --- a/tests/features/external_replication_multiple.feature +++ b/tests/features/external_replication_multiple.feature @@ -285,7 +285,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "17" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -307,7 +307,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "17" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -329,7 +329,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "17" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -351,7 +351,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "17" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -395,7 +395,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "17" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -417,7 +417,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "17" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -439,7 +439,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "17" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -483,7 +483,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "17" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -505,7 +505,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "17" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -527,7 +527,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "16" seconds + Then I wait for "17" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' From 92b5e04ee0672956c1e7060942b751a9b33208f2 Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 06:16:55 +0300 Subject: [PATCH 15/23] fixes --- .../external_replication_multiple.feature | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/features/external_replication_multiple.feature b/tests/features/external_replication_multiple.feature index fbaf8342..f7938aa0 100644 --- a/tests/features/external_replication_multiple.feature +++ b/tests/features/external_replication_multiple.feature @@ -285,7 +285,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "17" seconds + Then I wait for "18" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -307,7 +307,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "17" seconds + Then I wait for "18" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -329,7 +329,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "17" seconds + Then I wait for "18" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -351,7 +351,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "17" seconds + Then I wait for "18" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -395,7 +395,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "17" seconds + Then I wait for "18" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -417,7 +417,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "17" seconds + Then I wait for "18" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -439,7 +439,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "17" seconds + Then I wait for "18" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -483,7 +483,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "17" seconds + Then I wait for "18" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -505,7 +505,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "17" seconds + Then I wait for "18" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -527,7 +527,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "17" seconds + Then I wait for "18" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' From 5e33136427b6be12074d647d8986528a10c60241 Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 06:36:59 +0300 Subject: [PATCH 16/23] fixes --- .../external_replication_multiple.feature | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/features/external_replication_multiple.feature b/tests/features/external_replication_multiple.feature index f7938aa0..82e2e38b 100644 --- a/tests/features/external_replication_multiple.feature +++ b/tests/features/external_replication_multiple.feature @@ -285,7 +285,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "18" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -307,7 +307,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "18" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -329,7 +329,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "18" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -351,7 +351,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "18" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -395,7 +395,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "18" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -417,7 +417,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "18" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -439,7 +439,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "18" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -483,7 +483,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "18" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -505,7 +505,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "18" seconds + Then I wait for "20" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -527,7 +527,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "18" seconds + Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' From 143aa67c8b06acae3ba5b85dc88a4e23a32d65ac Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 07:07:36 +0300 Subject: [PATCH 17/23] fixes --- tests/features/external_replication_multiple.feature | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/features/external_replication_multiple.feature b/tests/features/external_replication_multiple.feature index 82e2e38b..36803bf3 100644 --- a/tests/features/external_replication_multiple.feature +++ b/tests/features/external_replication_multiple.feature @@ -373,7 +373,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "45" seconds + Then I wait for "50" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -461,7 +461,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "45" seconds + Then I wait for "50" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -549,7 +549,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "45" seconds + Then I wait for "50" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' From 064fc38562b47dc1a537db63ba78d249b160c5ab Mon Sep 17 00:00:00 2001 From: suetin Date: Tue, 3 Feb 2026 07:28:31 +0300 Subject: [PATCH 18/23] fixes --- tests/features/external_replication_multiple.feature | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/features/external_replication_multiple.feature b/tests/features/external_replication_multiple.feature index 36803bf3..612b6c17 100644 --- a/tests/features/external_replication_multiple.feature +++ b/tests/features/external_replication_multiple.feature @@ -373,7 +373,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "50" seconds + Then I wait for "45" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -461,7 +461,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "50" seconds + Then I wait for "45" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' @@ -514,7 +514,7 @@ YZQy1bHIhscLf8wjTYbzAg== """ [{ "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source3", + "Source_Host": "test_source_3", "Source_Port": 2222, "Source_User": "test_user_2", "Replica_IO_Running": "Connecting", @@ -549,7 +549,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Do_DB": "testdb1" }] """ - Then I wait for "50" seconds + Then I wait for "45" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' From 57e10af91b56f5454c1688f7f247764e30dcd0a2 Mon Sep 17 00:00:00 2001 From: suetin Date: Sun, 8 Feb 2026 09:11:15 +0300 Subject: [PATCH 19/23] fixes --- internal/app/app.go | 2 +- internal/app/replication.go | 2 +- internal/mysql/replication.go | 38 +-- .../external_replication_multiple.feature | 263 +++--------------- tests/mysync_test.go | 78 +++++- 5 files changed, 123 insertions(+), 260 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index b2cbee24..e06a5723 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -85,7 +85,7 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) { logger.ReOpenOnSignal(syscall.SIGUSR2, sysLog) } log.WriteSysLogInfo(sysLog, "logger initialization completed") - externalReplication, err := mysql.NewExternalReplication(config.ExternalReplicationType, logger) + externalReplication, err := mysql.NewExternalReplication(config.ExternalReplicationType, logger, config.ExternalReplicationChannel) if err != nil { logger.Errorf("external replication initialization failed: %s", err) return nil, err diff --git a/internal/app/replication.go b/internal/app/replication.go index 0bd6a5f7..0da07a43 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -184,7 +184,7 @@ func ChangeSourceAlgorithm(app *App, node *mysql.Node, _ string, channel string) // mark current source as error and then trying to change it app.externalReplication.SetSourcesStatus(replicaStatus.GetMasterHost(), mysql.ErrorStatus) for _, source := range *replicationSources { - value := app.externalReplication.GetSourcesStatus(source.SourceHost) + value := app.externalReplication.GetExtSourcesStatus(source.SourceHost) if value == mysql.ErrorStatus { app.logger.Infof("repair (external): ignoring source host %s due to error status in the past", source.SourceHost) continue diff --git a/internal/mysql/replication.go b/internal/mysql/replication.go index 34909e4a..96fa2d36 100644 --- a/internal/mysql/replication.go +++ b/internal/mysql/replication.go @@ -16,7 +16,7 @@ type IExternalReplication interface { Stop(*Node) error IsRunningByUser(*Node) bool ChangeSourceHost(*Node, string) error - GetSourcesStatus(string) ExternalSourceStatus + GetExtSourcesStatus(string) ExternalSourceStatus SetSourcesStatus(string, ExternalSourceStatus) ResetSourcesStatus() } @@ -55,7 +55,7 @@ func (d *UnimplementedExternalReplication) ChangeSourceHost(*Node, string) error return nil } -func (d *UnimplementedExternalReplication) GetSourcesStatus(string) ExternalSourceStatus { +func (d *UnimplementedExternalReplication) GetExtSourcesStatus(string) ExternalSourceStatus { return UnknownStatus } @@ -76,17 +76,19 @@ const ( ) type ExternalReplication struct { - logger *log.Logger - sourcesStatus map[string]ExternalSourceStatus + logger *log.Logger + sourcesStatus map[string]ExternalSourceStatus + replicationChannel string } -func NewExternalReplication(replicationType util.ExternalReplicationType, logger *log.Logger) (IExternalReplication, error) { +func NewExternalReplication(replicationType util.ExternalReplicationType, logger *log.Logger, replicationChannel string) (IExternalReplication, error) { switch replicationType { case util.MyExternalReplication: logger.Info("external replication is enabled") return &ExternalReplication{ - logger: logger, - sourcesStatus: make(map[string]ExternalSourceStatus), + logger: logger, + sourcesStatus: make(map[string]ExternalSourceStatus), + replicationChannel: replicationChannel, }, nil default: logger.Info("external replication is disabled") @@ -105,7 +107,7 @@ func (er *ExternalReplication) IsSupported(n *Node) (bool, error) { func (er *ExternalReplication) Set(n *Node) error { replSettings := new(replicationSettings) err := n.queryRowMogrify(queryGetExternalReplicationSettings, map[string]any{ - "channel": n.config.ExternalReplicationChannel, + "channel": er.replicationChannel, }, replSettings) if err != nil { @@ -145,14 +147,14 @@ func (er *ExternalReplication) Set(n *Node) error { "retryCount": n.config.MySQL.ReplicationRetryCount, "connectRetry": n.config.MySQL.ReplicationConnectRetry, "heartbeatPeriod": n.config.MySQL.ReplicationHeartbeatPeriod, - "channel": n.config.ExternalReplicationChannel, + "channel": er.replicationChannel, }) if err != nil { return err } err = n.execMogrify(queryIgnoreDB, map[string]any{ "ignoreList": schemaname("mysql"), - "channel": n.config.ExternalReplicationChannel, + "channel": er.replicationChannel, }) if err != nil { return err @@ -161,7 +163,7 @@ func (er *ExternalReplication) Set(n *Node) error { if filter.Valid && filter.String != "" { err = n.execMogrify(querySetReplFilter, map[string]any{ "filter": inlinestr(filter.String), - "channel": n.config.ExternalReplicationChannel, + "channel": er.replicationChannel, }) if err != nil { return err @@ -176,7 +178,7 @@ func (er *ExternalReplication) Set(n *Node) error { func (er *ExternalReplication) IsRunningByUser(n *Node) bool { replSettings := new(replicationSettings) err := n.queryRowMogrify(queryGetExternalReplicationSettings, map[string]any{ - "channel": n.config.ExternalReplicationChannel, + "channel": er.replicationChannel, }, replSettings) if err != nil { @@ -198,7 +200,7 @@ func (er *ExternalReplication) GetReplicaStatus(n *Node) (ReplicaStatus, error) return nil, nil } - return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ExternalReplicationChannel) + return n.ReplicaStatusWithTimeout(n.config.DBTimeout, er.replicationChannel) } // StartExternalReplication starts external replication @@ -209,7 +211,7 @@ func (er *ExternalReplication) Start(n *Node) error { } if checked { err := n.execMogrify(queryStartReplica, map[string]any{ - "channel": n.config.ExternalReplicationChannel, + "channel": er.replicationChannel, }) if err != nil { return err @@ -226,7 +228,7 @@ func (er *ExternalReplication) Stop(n *Node) error { } if checked { err := n.execMogrify(queryStopReplica, map[string]any{ - "channel": n.config.ExternalReplicationChannel, + "channel": er.replicationChannel, }) if err != nil && !IsErrorChannelDoesNotExists(err) { return err @@ -243,7 +245,7 @@ func (er *ExternalReplication) Reset(n *Node) error { } if checked { err := n.execMogrify(queryResetReplicaAll, map[string]any{ - "channel": n.config.ExternalReplicationChannel, + "channel": er.replicationChannel, }) if err != nil && !IsErrorChannelDoesNotExists(err) { return err @@ -255,11 +257,11 @@ func (er *ExternalReplication) Reset(n *Node) error { func (er *ExternalReplication) ChangeSourceHost(n *Node, host string) error { return n.execMogrify(queryChangeSourceHost, map[string]any{ "host": host, - "channel": n.config.ExternalReplicationChannel, + "channel": er.replicationChannel, }) } -func (er *ExternalReplication) GetSourcesStatus(host string) ExternalSourceStatus { +func (er *ExternalReplication) GetExtSourcesStatus(host string) ExternalSourceStatus { value, ok := er.sourcesStatus[host] if ok { return value diff --git a/tests/features/external_replication_multiple.feature b/tests/features/external_replication_multiple.feature index 612b6c17..e423ae2f 100644 --- a/tests/features/external_replication_multiple.feature +++ b/tests/features/external_replication_multiple.feature @@ -186,9 +186,9 @@ YZQy1bHIhscLf8wjTYbzAg== """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match json witch I will save as "external_repl_status" """ - [{ + { "Replica_IO_State": "Connecting to source", "Source_Host": "test_source_2", "Source_Port": 2222, @@ -201,7 +201,7 @@ YZQy1bHIhscLf8wjTYbzAg== "Replicate_Ignore_DB": "mysql", "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", "Replicate_Do_DB": "testdb1" - }] + } """ And host "mysql2" should have file "/etc/mysql/ssl/external_CA.pem" within "10" seconds @@ -239,23 +239,7 @@ YZQy1bHIhscLf8wjTYbzAg== """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json - """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source_2", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] - """ + Then SQL result should match saved json text on path "external_repl_status" When I run SQL on mysql host "mysql2" """ INSERT INTO mysql.replication_sources (channel_name, source_host, priority) VALUES @@ -268,306 +252,121 @@ YZQy1bHIhscLf8wjTYbzAg== """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json - """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source_2", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] - """ + Then SQL result should match saved json text on path "external_repl_status" Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source"} """ Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source"} """ Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source_3", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source_3"} """ Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source_3", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source_3"} """ Then I wait for "45" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source"} """ Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source"} """ Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source_2", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source_2"} """ Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source_2", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source_2"} """ Then I wait for "45" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source"} """ Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source"} """ Then I wait for "20" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source_3", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source_3"} """ Then I wait for "16" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source_3", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source_3"} """ Then I wait for "45" seconds And I run SQL on mysql host "mysql2" """ SHOW REPLICA STATUS FOR CHANNEL 'external' """ - Then SQL result should match json + Then SQL result should match saved json text on path "external_repl_status" with following changes """ - [{ - "Replica_IO_State": "Connecting to source", - "Source_Host": "test_source", - "Source_Port": 2222, - "Source_User": "test_user_2", - "Replica_IO_Running": "Connecting", - "Replica_SQL_Running": "Yes", - "Relay_Source_Log_File": "", - "Exec_Source_Log_Pos": 0, - "Channel_Name": "external", - "Replicate_Ignore_DB": "mysql", - "Source_SSL_CA_File": "/etc/mysql/ssl/external_CA.pem", - "Replicate_Do_DB": "testdb1" - }] + {"Source_Host": "test_source"} """ diff --git a/tests/mysync_test.go b/tests/mysync_test.go index ebc7717c..d030806a 100644 --- a/tests/mysync_test.go +++ b/tests/mysync_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "log" + "maps" "os" "path/filepath" "strings" @@ -67,6 +68,8 @@ func (noLogger) Printf(string, ...any) {} func (noLogger) Print(...any) {} +type sqlQueryResultRow map[string]any + type testContext struct { variables map[string]any templateErr error @@ -75,11 +78,12 @@ type testContext struct { zk *zk.Conn dbs map[string]*sqlx.DB zkQueryResult string - sqlQueryResult []map[string]any + sqlQueryResult []sqlQueryResultRow sqlUserQueryError sync.Map // host -> error commandRetcode int commandOutput string acl []zk.ACL + comparisonSaved map[string]string } func newTestContext() (*testContext, error) { @@ -209,10 +213,11 @@ func (tctx *testContext) cleanup() { tctx.variables = make(map[string]any) tctx.composerEnv = make([]string, 0) tctx.zkQueryResult = "" - tctx.sqlQueryResult = make([]map[string]any, 0) + tctx.sqlQueryResult = make([]sqlQueryResultRow, 0) tctx.sqlUserQueryError = sync.Map{} tctx.commandRetcode = 0 tctx.commandOutput = "" + tctx.comparisonSaved = make(map[string]string, 0) } func (tctx *testContext) connectZookeeper(addrs []string, timeout time.Duration) (*zk.Conn, error) { @@ -289,7 +294,7 @@ func (tctx *testContext) getMysqlConnection(host string) (*sqlx.DB, error) { return db, nil } -func (tctx *testContext) queryMysql(host string, query string, args any) ([]map[string]any, error) { +func (tctx *testContext) queryMysql(host string, query string, args any) ([]sqlQueryResultRow, error) { if args == nil { args = struct{}{} } @@ -301,7 +306,7 @@ func (tctx *testContext) queryMysql(host string, query string, args any) ([]map[ // sqlx can't execute requests with semicolon // we will execute them in single connection queries := strings.Split(query, ";") - var result []map[string]any + var result []sqlQueryResultRow for _, q := range queries { tctx.sqlQueryResult = nil @@ -317,7 +322,7 @@ func (tctx *testContext) queryMysql(host string, query string, args any) ([]map[ return result, err } -func (tctx *testContext) queryMysqlViaConnection(db *sqlx.DB, host string, query string, args any, timeout time.Duration) ([]map[string]any, error) { +func (tctx *testContext) queryMysqlViaConnection(db *sqlx.DB, host string, query string, args any, timeout time.Duration) ([]sqlQueryResultRow, error) { if args == nil { args = struct{}{} } @@ -332,7 +337,7 @@ func (tctx *testContext) queryMysqlViaConnection(db *sqlx.DB, host string, query return result, err } -func (tctx *testContext) doMysqlQuery(db *sqlx.DB, query string, args any, timeout time.Duration) ([]map[string]any, error) { +func (tctx *testContext) doMysqlQuery(db *sqlx.DB, query string, args any, timeout time.Duration) ([]sqlQueryResultRow, error) { if args == nil { args = struct{}{} } @@ -347,7 +352,7 @@ func (tctx *testContext) doMysqlQuery(db *sqlx.DB, query string, args any, timeo _ = rows.Close() }() - result := make([]map[string]any, 0) + result := make([]sqlQueryResultRow, 0) for rows.Next() { rowmap := make(map[string]any) err = rows.MapScan(rowmap) @@ -936,7 +941,7 @@ func (tctx *testContext) stepSQLResultShouldMatch(matcher string, body *godog.Do } res, err := json.Marshal(tctx.sqlQueryResult) if err != nil { - panic(err) + return err } return m(string(res), strings.TrimSpace(body.Content)) } @@ -1492,6 +1497,60 @@ func (tctx *testContext) stepInfoFileOnHostMatch(filepath, host, matcher string, return err } +func (tctx *testContext) stepSQLResultShouldMatchWithTextIWillSave(matcher string, name string, body *godog.DocString) error { + m, err := matchers.GetMatcher(matcher) + if err != nil { + return err + } + if len(tctx.sqlQueryResult) != 1 { + err := fmt.Errorf("using saved query result works only for 1 row responces") + return err + } + res, err := json.Marshal(tctx.sqlQueryResult[0]) + tctx.comparisonSaved[name] = strings.TrimSpace(body.Content) + return m(string(res), strings.TrimSpace(body.Content)) +} + +func (tctx *testContext) stepSQLResultShouldMatchSavedWithChanges(matcher string, name string, body *godog.DocString) error { + m, err := matchers.GetMatcher(matcher) + if err != nil { + return err + } + strSaved := tctx.comparisonSaved[name] + extected := string(strSaved) + if body != nil { + var changes, saved sqlQueryResultRow + err = json.Unmarshal([]byte(strings.TrimSpace(body.Content)), &changes) + if err != nil { + return err + } + err = json.Unmarshal([]byte(strSaved), &saved) + if err != nil { + return err + } + maps.Copy(saved, changes) + s, err := json.Marshal(saved) + if err != nil { + return err + } + extected = string(s) + } + + if len(tctx.sqlQueryResult) != 1 { + err := fmt.Errorf("using saved query result works only for 1 row responces") + return err + } + res, err := json.Marshal(tctx.sqlQueryResult[0]) + if err != nil { + return err + } + return m(string(res), string(extected)) +} + +func (tctx *testContext) stepSQLResultShouldMatchSaved(matcher string, name string) error { + return tctx.stepSQLResultShouldMatchSavedWithChanges(matcher, name, nil) +} + // nolint: unused func InitializeScenario(s *godog.ScenarioContext) { tctx, err := newTestContext() @@ -1563,6 +1622,9 @@ func InitializeScenario(s *godog.ScenarioContext) { s.Step(`^I break replication on host "([^"]*)" in repairable way$`, tctx.stepBreakReplicationOnHostInARepairableWay) s.Step(`^I set used space on host "([^"]*)" to (\d+)%$`, tctx.stepSetUsedSpace) s.Step(`^I set readonly file system on host "([^"]*)" to "([^"]*)"$`, tctx.stepSetReadonlyStatus) + s.Step(`^SQL result should match (\w+) witch I will save as "(\w+)"$`, tctx.stepSQLResultShouldMatchWithTextIWillSave) + s.Step(`^SQL result should match saved (\w+) text on path "([^"]*)"`, tctx.stepSQLResultShouldMatchSaved) + s.Step(`^SQL result should match saved (\w+) text on path "([^"]*) with following changes$ (\w+)`, tctx.stepSQLResultShouldMatchSavedWithChanges) // zookeeper manipulation s.Step(`^I get zookeeper node "([^"]*)"$`, tctx.stepIGetZookeeperNode) From 90c05deb87fcc21699434a46643f3b18f2774c4b Mon Sep 17 00:00:00 2001 From: suetin Date: Sun, 8 Feb 2026 09:16:39 +0300 Subject: [PATCH 20/23] fixes --- tests/mysync_test.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/mysync_test.go b/tests/mysync_test.go index d030806a..65b6a845 100644 --- a/tests/mysync_test.go +++ b/tests/mysync_test.go @@ -1503,10 +1503,13 @@ func (tctx *testContext) stepSQLResultShouldMatchWithTextIWillSave(matcher strin return err } if len(tctx.sqlQueryResult) != 1 { - err := fmt.Errorf("using saved query result works only for 1 row responces") + err := fmt.Errorf("using saved query result works only for 1 row responce") return err } res, err := json.Marshal(tctx.sqlQueryResult[0]) + if err != nil { + return err + } tctx.comparisonSaved[name] = strings.TrimSpace(body.Content) return m(string(res), strings.TrimSpace(body.Content)) } @@ -1516,15 +1519,14 @@ func (tctx *testContext) stepSQLResultShouldMatchSavedWithChanges(matcher string if err != nil { return err } - strSaved := tctx.comparisonSaved[name] - extected := string(strSaved) + extected := tctx.comparisonSaved[name] if body != nil { var changes, saved sqlQueryResultRow err = json.Unmarshal([]byte(strings.TrimSpace(body.Content)), &changes) if err != nil { return err } - err = json.Unmarshal([]byte(strSaved), &saved) + err = json.Unmarshal([]byte(extected), &saved) if err != nil { return err } @@ -1537,14 +1539,14 @@ func (tctx *testContext) stepSQLResultShouldMatchSavedWithChanges(matcher string } if len(tctx.sqlQueryResult) != 1 { - err := fmt.Errorf("using saved query result works only for 1 row responces") + err := fmt.Errorf("using saved query result works only for 1 row responce") return err } res, err := json.Marshal(tctx.sqlQueryResult[0]) if err != nil { return err } - return m(string(res), string(extected)) + return m(string(res), extected) } func (tctx *testContext) stepSQLResultShouldMatchSaved(matcher string, name string) error { From 7120f699c22413d13f6842a8d6c276325c87b2d6 Mon Sep 17 00:00:00 2001 From: suetin Date: Sun, 8 Feb 2026 09:18:19 +0300 Subject: [PATCH 21/23] fixes --- tests/mysync_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/mysync_test.go b/tests/mysync_test.go index 65b6a845..c26bc6a6 100644 --- a/tests/mysync_test.go +++ b/tests/mysync_test.go @@ -1503,7 +1503,7 @@ func (tctx *testContext) stepSQLResultShouldMatchWithTextIWillSave(matcher strin return err } if len(tctx.sqlQueryResult) != 1 { - err := fmt.Errorf("using saved query result works only for 1 row responce") + err := fmt.Errorf("using saved query result works only for 1 row response") return err } res, err := json.Marshal(tctx.sqlQueryResult[0]) @@ -1539,7 +1539,7 @@ func (tctx *testContext) stepSQLResultShouldMatchSavedWithChanges(matcher string } if len(tctx.sqlQueryResult) != 1 { - err := fmt.Errorf("using saved query result works only for 1 row responce") + err := fmt.Errorf("using saved query result works only for 1 row response") return err } res, err := json.Marshal(tctx.sqlQueryResult[0]) From dbdfaf589a0cfdbf078208883852725547eb1f54 Mon Sep 17 00:00:00 2001 From: suetin Date: Sun, 8 Feb 2026 09:34:44 +0300 Subject: [PATCH 22/23] fixes --- tests/mysync_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/mysync_test.go b/tests/mysync_test.go index c26bc6a6..cef99b6e 100644 --- a/tests/mysync_test.go +++ b/tests/mysync_test.go @@ -1625,8 +1625,8 @@ func InitializeScenario(s *godog.ScenarioContext) { s.Step(`^I set used space on host "([^"]*)" to (\d+)%$`, tctx.stepSetUsedSpace) s.Step(`^I set readonly file system on host "([^"]*)" to "([^"]*)"$`, tctx.stepSetReadonlyStatus) s.Step(`^SQL result should match (\w+) witch I will save as "(\w+)"$`, tctx.stepSQLResultShouldMatchWithTextIWillSave) - s.Step(`^SQL result should match saved (\w+) text on path "([^"]*)"`, tctx.stepSQLResultShouldMatchSaved) - s.Step(`^SQL result should match saved (\w+) text on path "([^"]*) with following changes$ (\w+)`, tctx.stepSQLResultShouldMatchSavedWithChanges) + s.Step(`^SQL result should match saved (\w+) text on path "(\w+)"$"`, tctx.stepSQLResultShouldMatchSaved) + s.Step(`^SQL result should match saved (\w+) text on path "(\w+)" with following changes$`, tctx.stepSQLResultShouldMatchSavedWithChanges) // zookeeper manipulation s.Step(`^I get zookeeper node "([^"]*)"$`, tctx.stepIGetZookeeperNode) From 85a744c8775bfd5ff3b70bb4b47771d70bbc441a Mon Sep 17 00:00:00 2001 From: suetin Date: Sun, 8 Feb 2026 09:44:58 +0300 Subject: [PATCH 23/23] fixes --- tests/mysync_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/mysync_test.go b/tests/mysync_test.go index cef99b6e..45e37085 100644 --- a/tests/mysync_test.go +++ b/tests/mysync_test.go @@ -1625,7 +1625,7 @@ func InitializeScenario(s *godog.ScenarioContext) { s.Step(`^I set used space on host "([^"]*)" to (\d+)%$`, tctx.stepSetUsedSpace) s.Step(`^I set readonly file system on host "([^"]*)" to "([^"]*)"$`, tctx.stepSetReadonlyStatus) s.Step(`^SQL result should match (\w+) witch I will save as "(\w+)"$`, tctx.stepSQLResultShouldMatchWithTextIWillSave) - s.Step(`^SQL result should match saved (\w+) text on path "(\w+)"$"`, tctx.stepSQLResultShouldMatchSaved) + s.Step(`^SQL result should match saved (\w+) text on path "(\w+)"$`, tctx.stepSQLResultShouldMatchSaved) s.Step(`^SQL result should match saved (\w+) text on path "(\w+)" with following changes$`, tctx.stepSQLResultShouldMatchSavedWithChanges) // zookeeper manipulation