Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ func (this *Applier) ReadMigrationRangeValues() error {
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
// no further chunk to work through, i.e. we're past the last chunk and are done with
// iterating the range (and this done with copying row chunks)
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, expectedRowCount int64, err error) {
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
for i := 0; i < 2; i++ {
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
if i == 1 {
Expand All @@ -680,36 +680,32 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
)
if err != nil {
return hasFurtherRange, expectedRowCount, err
return hasFurtherRange, err
}

rows, err := this.db.Query(query, explodedArgs...)
if err != nil {
return hasFurtherRange, expectedRowCount, err
return hasFurtherRange, err
}
defer rows.Close()

iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len() + 1)
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
return hasFurtherRange, expectedRowCount, err
return hasFurtherRange, err
}

expectedRowCount = (*iterationRangeMaxValues.ValuesPointers[len(iterationRangeMaxValues.ValuesPointers)-1].(*interface{})).(int64)
iterationRangeMaxValues = sql.ToColumnValues(iterationRangeMaxValues.AbstractValues()[:len(iterationRangeMaxValues.AbstractValues())-1])

hasFurtherRange = expectedRowCount > 0
hasFurtherRange = true
}
if err = rows.Err(); err != nil {
return hasFurtherRange, expectedRowCount, err
return hasFurtherRange, err
}
if hasFurtherRange {
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
return hasFurtherRange, expectedRowCount, nil
return hasFurtherRange, nil
}
}
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
return hasFurtherRange, expectedRowCount, nil
return hasFurtherRange, nil
}

// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
Expand Down
65 changes: 63 additions & 2 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,9 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
suite.Require().NoError(err)

migrationContext.SetNextIterationRangeMinValues()
hasFurtherRange, expectedRangeSize, err := applier.CalculateNextIterationRangeEndValues()
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
suite.Require().NoError(err)
suite.Require().True(hasFurtherRange)
suite.Require().Equal(int64(1), expectedRangeSize)

_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
suite.Require().NoError(err)
Expand Down Expand Up @@ -598,6 +597,68 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
Equal(int64(0), migrationContext.RowsDeltaEstimate)
}

func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFailsWithTruncationWarning() {
ctx := context.Background()

var err error

_, err = suite.db.ExecContext(ctx, "CREATE TABLE test.testing (id int not null, name varchar(20), primary key(id))")
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, "CREATE TABLE test._testing_gho (id INT, name varchar(20), primary key(id));")
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, "INSERT INTO test.testing (id, name) VALUES (1, 'this string is long')")
suite.Require().NoError(err)

connectionConfig, err := GetConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.SkipPortValidation = true
migrationContext.OriginalTableName = "testing"
migrationContext.AlterStatementOptions = "modify column name varchar(10)"
migrationContext.PanicOnWarnings = true
migrationContext.SetConnectionConfig("innodb")

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "name"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "name"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "name"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
NameInGhostTable: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id"}),
}
applier := NewApplier(migrationContext)

err = applier.InitDBConnections()
suite.Require().NoError(err)

err = applier.CreateChangelogTable()
suite.Require().NoError(err)

err = applier.ReadMigrationRangeValues()
suite.Require().NoError(err)

err = applier.AlterGhost()
suite.Require().NoError(err)

migrationContext.SetNextIterationRangeMinValues()
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
suite.Require().NoError(err)
suite.Require().True(hasFurtherRange)

_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
suite.Equal(int64(1), rowsAffected)
suite.Require().NoError(err)

// Verify the warning was recorded and will cause the migrator to panic
suite.Require().NotEmpty(applier.migrationContext.MigrationLastInsertSQLWarnings)
suite.Require().Contains(applier.migrationContext.MigrationLastInsertSQLWarnings[0], "Warning: Data truncated for column 'name' at row 1")
}

func TestApplier(t *testing.T) {
suite.Run(t, new(ApplierTestSuite))
}
8 changes: 3 additions & 5 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,7 @@ func (this *Migrator) iterateChunks() error {
}

// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever
hasFurtherRange, expectedRangeSize, err := this.applier.CalculateNextIterationRangeEndValues()
hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues()
if err != nil {
return err // wrapping call will retry
}
Expand Down Expand Up @@ -1373,10 +1373,8 @@ func (this *Migrator) iterateChunks() error {
for _, warning := range this.migrationContext.MigrationLastInsertSQLWarnings {
this.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning)
}
if expectedRangeSize != rowsAffected {
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
}
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
}
}

Expand Down
63 changes: 12 additions & 51 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,46 +283,25 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string
uniqueKeyColumnAscending[i] = fmt.Sprintf("%s asc", uniqueKeyColumnNames[i])
}
}
joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */
%s,
(select count(*) from (
select
%s
from
%s.%s
where
%s and %s
limit
%d
) select_osc_chunk)
from (
select
%s
from
%s.%s
where
%s and %s

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

order by should be added here if we were to keep the offset variant

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, we're dropping the expectedRowCount entirely after discovering more issues with the count approach: github#1557 (comment)

limit
%d
) select_osc_chunk
%s
from
%s.%s
where
%s and %s
order by
%s
limit 1
offset %d`,
databaseName, tableName, hint,
joinedColumnNames, joinedColumnNames,
databaseName, tableName,
rangeStartComparison, rangeEndComparison, chunkSize,
joinedColumnNames,
strings.Join(uniqueKeyColumnNames, ", "),
databaseName, tableName,
rangeStartComparison, rangeEndComparison, chunkSize,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "),
(chunkSize - 1),
)
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
return result, append(explodedArgs, explodedArgs...), nil
return result, explodedArgs, nil
}

func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
Expand Down Expand Up @@ -360,22 +339,8 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
}
}

joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */
%s,
(select count(*) from (
select
%s
from
%s.%s
where
%s and %s
order by
%s
limit %d
) select_osc_chunk)
select /* gh-ost %s.%s %s */ %s
from (
select
%s
Expand All @@ -390,17 +355,13 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
order by
%s
limit 1`,
databaseName, tableName, hint, joinedColumnNames,
joinedColumnNames, databaseName, tableName,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
joinedColumnNames, databaseName, tableName,
databaseName, tableName, hint, strings.Join(uniqueKeyColumnNames, ", "),
strings.Join(uniqueKeyColumnNames, ", "), databaseName, tableName,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
strings.Join(uniqueKeyColumnDescending, ", "),
)
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
return result, append(explodedArgs, explodedArgs...), nil
return result, explodedArgs, nil
}

func BuildUniqueKeyMinValuesPreparedQuery(databaseName, tableName string, uniqueKey *UniqueKey) (string, error) {
Expand Down
43 changes: 30 additions & 13 deletions go/sql/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,34 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) {
}
}

func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
func TestBuildUniqueKeyRangeEndPreparedQueryViaOffset(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"
var chunkSize int64 = 500
{
uniqueKeyColumns := NewColumnList([]string{"name", "position"})
rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117}

query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
require.NoError(t, err)
expected := `
select /* gh-ost mydb.tbl test */
name, position
from
mydb.tbl
where
((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))
order by
name asc, position asc
limit 1
offset 499`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
}
}

func TestBuildUniqueKeyRangeEndPreparedQueryViaTemptable(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"
var chunkSize int64 = 500
Expand All @@ -338,17 +365,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
require.NoError(t, err)
expected := `
select /* gh-ost mydb.tbl test */
name, position,
(select count(*) from (
select
name, position
from
mydb.tbl
where ((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))
order by
name asc, position asc
limit 500
) select_osc_chunk)
name, position
from (
select
name, position
Expand All @@ -363,7 +380,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
name desc, position desc
limit 1`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117, 3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
}
}

Expand Down
Loading