Skip to content
Merged
1 change: 1 addition & 0 deletions .github/workflows/docker-tests-8.0.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ jobs:
- 'VERSION=8.0 GODOG_FEATURE=crash_recovery.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=events_reenable.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=external_replication.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=external_replication_multiple.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=failover.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=free_space.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=host_discovery.feature make test'
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/docker-tests-8.4.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ jobs:
- 'VERSION=8.4 GODOG_FEATURE=crash_recovery.feature make test'
- 'VERSION=8.4 GODOG_FEATURE=events_reenable.84.feature make test'
- 'VERSION=8.4 GODOG_FEATURE=external_replication.feature make test'
- 'VERSION=8.4 GODOG_FEATURE=external_replication_multiple.feature make test'
- 'VERSION=8.4 GODOG_FEATURE=failover.84.feature make test'
- 'VERSION=8.4 GODOG_FEATURE=free_space.feature make test'
- 'VERSION=8.4 GODOG_FEATURE=host_discovery.feature make test'
Expand Down
3 changes: 2 additions & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) {
logger.ReOpenOnSignal(syscall.SIGUSR2, sysLog)
}
log.WriteSysLogInfo(sysLog, "logger initialization completed")
externalReplication, err := mysql.NewExternalReplication(config.ExternalReplicationType, logger)
externalReplication, err := mysql.NewExternalReplication(config.ExternalReplicationType, logger, config.ExternalReplicationChannel)
if err != nil {
logger.Errorf("external replication initialization failed: %s", err)
return nil, err
Expand Down Expand Up @@ -2059,6 +2059,7 @@ func (app *App) repairExternalReplication(masterNode *mysql.Node) {
}

if app.externalReplication.IsRunningByUser(masterNode) && !extReplStatus.ReplicationRunning() {
app.logger.Info("repair (external): calling TryRepairReplication")
// TODO: remove "". Master is not needed for external replication now
app.TryRepairReplication(masterNode, "", app.config.ExternalReplicationChannel)
}
Expand Down
87 changes: 73 additions & 14 deletions internal/app/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type ReplicationRepairAlgorithmType int
const (
StartSlave ReplicationRepairAlgorithmType = iota
ResetSlave
ChangeSource
)

type ReplicationRepairState struct {
Expand Down Expand Up @@ -74,27 +75,29 @@ func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) {
func (app *App) TryRepairReplication(node *mysql.Node, master string, channel string) {
replState, err := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel), node.Host(), channel)
if err != nil {
app.logger.Errorf("repair error: host %s, %v", node.Host(), err)
app.logger.Errorf("repair error on channel %s: host %s, %v", channel, node.Host(), err)
return
}

if !replState.cooldownPassed(app.config.ReplicationRepairCooldown) {
return
}

algorithmType, count, err := app.getSuitableAlgorithmType(replState)
algorithmType, count, err := app.getSuitableAlgorithmType(replState, channel)
if err != nil {
app.logger.Errorf("repair error: host %s, %v", node.Host(), err)
app.logger.Errorf("repair error on channel %s: host %s, %v", channel, node.Host(), err)
return
}

algorithm := getRepairAlgorithm(algorithmType)
err = algorithm(app, node, master, channel)
if err != nil {
app.logger.Errorf("repair error: %v", err)
app.logger.Errorf("repair error on channel %s: host %s, %v", channel, node.Host(), err)
}

replState.History[algorithmType] = count + 1
if algorithmType != ChangeSource {
replState.History[algorithmType] = count + 1
}
replState.LastAttempt = time.Now()
}

Expand All @@ -106,7 +109,7 @@ func (app *App) makeReplStateKey(node *mysql.Node, channel string) string {
}

func StartSlaveAlgorithm(app *App, node *mysql.Node, _ string, channel string) error {
app.logger.Infof("repair: trying to repair replication using StartSlaveAlgorithm...")
app.logger.Infof("repair %s: trying to repair replication using StartSlaveAlgorithm...", channel)
if channel == app.config.ExternalReplicationChannel {
return app.externalReplication.Start(node)
}
Expand All @@ -120,7 +123,7 @@ func ResetSlaveAlgorithm(app *App, node *mysql.Node, master string, channel stri
app.logger.Infof("external repair: don't want to use ResetSlaveAlgorithm, leaving")
return nil
}
app.logger.Infof("repair: trying to repair replication using ResetSlaveAlgorithm...")
app.logger.Infof("repair %s: trying to repair replication using ResetSlaveAlgorithm...", channel)
app.logger.Infof("repair: executing set slave offline")
err := node.SetOffline()
if err != nil {
Expand Down Expand Up @@ -160,9 +163,56 @@ func ResetSlaveAlgorithm(app *App, node *mysql.Node, master string, channel stri
return nil
}

func (app *App) getSuitableAlgorithmType(state *ReplicationRepairState) (ReplicationRepairAlgorithmType, int, error) {
for i := range app.getAlgorithmOrder() {
algorithmType := ReplicationRepairAlgorithmType(i)
func ChangeSourceAlgorithm(app *App, node *mysql.Node, _ string, channel string) error {
app.logger.Infof("repair %s: trying to repair replication using ChangeSourceAlgorithm...", channel)
if channel != app.config.ExternalReplicationChannel {
app.logger.Infof("ChangeSourceAlgorithm works only for external replication")
return nil
}
replicationSources, err := node.GetExternalReplicationSources()
if err != nil {
return err
}
if replicationSources == nil {
app.logger.Infof("No available sources in external replication sources table found for channel %s", channel)
return nil
}
replicaStatus, err := app.externalReplication.GetReplicaStatus(node)
if err != nil {
return err
}
// mark current source as error and then trying to change it
app.externalReplication.SetSourcesStatus(replicaStatus.GetMasterHost(), mysql.ErrorStatus)
for _, source := range *replicationSources {
value := app.externalReplication.GetExtSourcesStatus(source.SourceHost)
if value == mysql.ErrorStatus {
app.logger.Infof("repair (external): ignoring source host %s due to error status in the past", source.SourceHost)
continue
}
app.logger.Infof("repair (external): trying change source to %s", source.SourceHost)
err := app.externalReplication.Stop(node)
if err != nil {
return err
}
err = app.externalReplication.ChangeSourceHost(node, source.SourceHost)
if err != nil {
return err
}
err = app.externalReplication.Start(node)
if err != nil {
return err
}
app.logger.Infof("repair (external): source changed to %s", source.SourceHost)
return nil
}
// if there was no return from the loop above then we assume that all hosts are now marked as error
// so we shall reset sources status and try again on next iteration
app.externalReplication.ResetSourcesStatus()
return nil
}

func (app *App) getSuitableAlgorithmType(state *ReplicationRepairState, channel string) (ReplicationRepairAlgorithmType, int, error) {
for _, algorithmType := range app.getAlgorithmOrder(channel) {
count := state.History[algorithmType]
if count < app.config.ReplicationRepairMaxAttempts {
return algorithmType, count, nil
Expand Down Expand Up @@ -207,7 +257,7 @@ func (app *App) createRepairState(hostname, channel string) (*ReplicationRepairS
LastGTIDExecuted: status.GetExecutedGtidSet(),
}

for i := range app.getAlgorithmOrder() {
for i := range app.getAlgorithmOrder(channel) {
result.History[ReplicationRepairAlgorithmType(i)] = 0
}

Expand All @@ -223,7 +273,15 @@ var aggressiveOrder = []ReplicationRepairAlgorithmType{
ResetSlave,
}

func (app *App) getAlgorithmOrder() []ReplicationRepairAlgorithmType {
var externalReplicationOrder = []ReplicationRepairAlgorithmType{
StartSlave,
ChangeSource,
}

func (app *App) getAlgorithmOrder(channel string) []ReplicationRepairAlgorithmType {
if channel == app.config.ExternalReplicationChannel {
return externalReplicationOrder
}
if app.config.ReplicationRepairAggressiveMode {
return aggressiveOrder
} else {
Expand All @@ -232,8 +290,9 @@ func (app *App) getAlgorithmOrder() []ReplicationRepairAlgorithmType {
}

var mapping = map[ReplicationRepairAlgorithmType]RepairReplicationAlgorithm{
StartSlave: StartSlaveAlgorithm,
ResetSlave: ResetSlaveAlgorithm,
StartSlave: StartSlaveAlgorithm,
ResetSlave: ResetSlaveAlgorithm,
ChangeSource: ChangeSourceAlgorithm,
}

func getRepairAlgorithm(algoType ReplicationRepairAlgorithmType) RepairReplicationAlgorithm {
Expand Down
5 changes: 5 additions & 0 deletions internal/mysql/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ type replicationSettings struct {
ReplicationFilter sql.NullString `db:"ReplicationFilter"`
}

type ReplicationSource struct {
SourceHost string `db:"SourceHost"`
Priority int `db:"Priority"`
}

// SlaveStatusStruct contains SHOW SLAVE STATUS response
type SlaveStatusStruct struct {
MasterHost string `db:"Master_Host"`
Expand Down
65 changes: 62 additions & 3 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (n *Node) queryRowWithTimeout(queryName string, arg any, result any, timeou
}

// nolint: unparam
func (n *Node) queryRows(queryName string, arg any, scanner func(*sqlx.Rows) error) error {
func (n *Node) queryRows(queryName string, arg map[string]any, scanner func(*sqlx.Rows) error) error {
// TODO we need to rewrite processQuery, to make traceQuery work properly
// traceQuery should be called with result, not *Rows
return n.processQuery(queryName, arg, func(rows *sqlx.Rows) error {
Expand All @@ -259,6 +259,22 @@ func (n *Node) queryRows(queryName string, arg any, scanner func(*sqlx.Rows) err
}, n.config.DBTimeout)
}

// nolint: unparam
func (n *Node) queryRowsMogrify(queryName string, arg map[string]any, scanner func(*sqlx.Rows) error) error {
return n.processQueryMogrify(queryName, arg, func(rows *sqlx.Rows) error {
var err error

for rows.Next() {
err = scanner(rows)
if err != nil {
break
}
}

return err
}, n.config.DBTimeout)
}

func (n *Node) processQuery(queryName string, arg any, rowsProcessor func(*sqlx.Rows) error, timeout time.Duration) error {
if arg == nil {
arg = struct{}{}
Expand All @@ -283,6 +299,27 @@ func (n *Node) processQuery(queryName string, arg any, rowsProcessor func(*sqlx.
return rowsProcessor(rows)
}

func (n *Node) processQueryMogrify(queryName string, arg map[string]any, rowsProcessor func(*sqlx.Rows) error, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

query := n.getQuery(queryName)
query = Mogrify(query, arg)
db, err := n.GetDB()
if err != nil {
return err
}
rows, err := db.NamedQueryContext(ctx, query, arg)
n.traceQuery(query, arg, rows, err)
if err != nil {
return err
}

defer func() { _ = rows.Close() }()

return rowsProcessor(rows)
}

// nolint: unparam
func (n *Node) execWithTimeout(queryName string, arg map[string]any, timeout time.Duration) error {
if arg == nil {
Expand Down Expand Up @@ -1049,8 +1086,11 @@ func (n *Node) GetStartupTime() (time.Time, error) {
}

func (n *Node) UpdateExternalCAFile() error {
var replSettings replicationSettings
err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings)
replSettings := new(replicationSettings)
err := n.queryRowMogrify(queryGetExternalReplicationSettings, map[string]any{
"channel": n.config.ExternalReplicationChannel,
},
replSettings)
if err != nil {
return nil
}
Expand Down Expand Up @@ -1350,3 +1390,22 @@ func (n *Node) GetListSlaveSideDisabledEventsQuery() (string, error) {
return queryListSlavesideDisabledEvents, nil
}
}

func (n *Node) GetExternalReplicationSources() (*[]ReplicationSource, error) {
replicationSources := new([]ReplicationSource)
err := n.queryRowsMogrify(queryGetExternalReplicationSources, map[string]any{
"channel": n.config.ExternalReplicationChannel,
}, func(rows *sqlx.Rows) error {
var source ReplicationSource
err := rows.StructScan(&source)
if err != nil {
return err
}
*replicationSources = append(*replicationSources, source)
return nil
})
if IsErrorTableDoesNotExists(err) || err == sql.ErrNoRows {
return nil, nil
}
return replicationSources, err
}
13 changes: 12 additions & 1 deletion internal/mysql/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ const (
queryCalcReplMonTSDelay = "calc_repl_mon_ts_delay"
queryCreateReplMonTable = "create_repl_mon_table"
queryUpdateReplMon = "update_repl_mon"
queryGetExternalReplicationSources = "get_external_replication_sources"
queryChangeSourceHost = "change_source_host"
)

var DefaultQueries = map[string]string{
Expand Down Expand Up @@ -135,7 +137,7 @@ var DefaultQueries = map[string]string{
replication_status AS ReplicationStatus,
replication_filter AS ReplicationFilter
FROM mysql.replication_settings
WHERE channel_name = 'external'`,
WHERE channel_name = :channel`,
queryChangeSource: `CHANGE REPLICATION SOURCE TO
SOURCE_HOST = :host,
SOURCE_PORT = :port,
Expand Down Expand Up @@ -181,4 +183,13 @@ var DefaultQueries = map[string]string{
WHERE @@read_only = 0
)
ON DUPLICATE KEY UPDATE ts = CURRENT_TIMESTAMP(3)`,
queryGetExternalReplicationSources: `SELECT
source_host AS SourceHost,
priority AS Priority
FROM mysql.replication_sources
WHERE channel_name = :channel
ORDER BY Priority DESC`,
queryChangeSourceHost: `CHANGE REPLICATION SOURCE TO
SOURCE_HOST = :host
FOR CHANNEL :channel`,
}
Loading
Loading