Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions flow/alerting/classifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,30 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
}
}

if partialJSONError, ok := errors.AsType[*exceptions.MySQLPartialJSONUnsupportedError](err); ok {
attrs := map[AdditionalErrorAttributeKey]string{
ErrorAttributeKeyTable: partialJSONError.SchemaName + "." + partialJSONError.TableName,
}
if partialJSONError.ColumnName != "" {
attrs[ErrorAttributeKeyColumn] = partialJSONError.ColumnName
}
return ErrorNotifyBinlogInvalid, ErrorInfo{
Source: ErrorSourceMySQL,
Code: "UNSUPPORTED_BINLOG_ROW_VALUE_OPTIONS",
AdditionalAttributes: attrs,
}
}

if unhandledRowsEventError, ok := errors.AsType[*exceptions.MySQLUnhandledRowsEventError](err); ok {
return ErrorNotifyBinlogInvalid, ErrorInfo{
Source: ErrorSourceMySQL,
Code: "UNHANDLED_ROWS_EVENT",
AdditionalAttributes: map[AdditionalErrorAttributeKey]string{
ErrorAttributeKeyTable: unhandledRowsEventError.SchemaName + "." + unhandledRowsEventError.TableName,
},
}
}

if unsupportedDDLError, ok := errors.AsType[*exceptions.MySQLUnsupportedDDLError](err); ok {
return ErrorNotifyBinlogRowMetadataInvalid, ErrorInfo{
Source: ErrorSourceMySQL,
Expand Down
31 changes: 31 additions & 0 deletions flow/alerting/classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,3 +1214,34 @@ func TestMySQLBinlogIncidentErrorShouldBeNotifyBinlogInvalid(t *testing.T) {
Code: "BINLOG_INCIDENT",
}, errInfo)
}

func TestMySQLPartialJSONUnsupportedShouldBeNotifyBinlogInvalid(t *testing.T) {
t.Parallel()

err := exceptions.NewMySQLPartialJSONUnsupportedError("test_db", "test_table", "doc")
errorClass, errInfo := GetErrorClass(t.Context(), fmt.Errorf("pulling records failed: %w", err))
assert.Equal(t, ErrorNotifyBinlogInvalid, errorClass)
assert.Equal(t, ErrorInfo{
Source: ErrorSourceMySQL,
Code: "UNSUPPORTED_BINLOG_ROW_VALUE_OPTIONS",
AdditionalAttributes: map[AdditionalErrorAttributeKey]string{
ErrorAttributeKeyTable: "test_db.test_table",
ErrorAttributeKeyColumn: "doc",
},
}, errInfo)
}

func TestMySQLUnhandledRowsEventShouldBeNotifyBinlogInvalid(t *testing.T) {
t.Parallel()

err := exceptions.NewMySQLUnhandledRowsEventError("test_db", "test_table", "UnknownEvent", 255)
errorClass, errInfo := GetErrorClass(t.Context(), fmt.Errorf("pulling records failed: %w", err))
assert.Equal(t, ErrorNotifyBinlogInvalid, errorClass)
assert.Equal(t, ErrorInfo{
Source: ErrorSourceMySQL,
Code: "UNHANDLED_ROWS_EVENT",
AdditionalAttributes: map[AdditionalErrorAttributeKey]string{
ErrorAttributeKeyTable: "test_db.test_table",
},
}, errInfo)
}
11 changes: 11 additions & 0 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,11 @@ func (c *MySqlConnector) PullRecords(
return nil
}
switch event.Header.EventType {
case replication.PARTIAL_UPDATE_ROWS_EVENT:
e := exceptions.NewMySQLPartialJSONUnsupportedError(
string(ev.Table.Schema), string(ev.Table.Table), "")
c.logger.Error(e.Error())
return e
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2, replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
for _, row := range ev.Rows {
items := model.NewRecordItems(len(row))
Expand Down Expand Up @@ -755,6 +760,12 @@ func (c *MySqlConnector) PullRecords(
}
case replication.WRITE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv0:
return fmt.Errorf("mysql v0 replication protocol not supported")
default:
e := exceptions.NewMySQLUnhandledRowsEventError(
string(ev.Table.Schema), string(ev.Table.Table),
event.Header.EventType.String(), uint16(event.Header.EventType))
c.logger.Error(e.Error())
return e
}
}
if event.Header.Timestamp > 0 {
Expand Down
10 changes: 8 additions & 2 deletions flow/connectors/mysql/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,14 @@ func QValueFromMysqlRowEvent(
case time.Time:
return types.QValueTimestamp{Val: val}, nil
case *replication.JsonDiff:
// TODO support somehow??
return types.QValueNull(types.QValueKindJSON), nil
columnName := "__peerdb_unknown_" + strconv.Itoa(idx)
if len(ev.ColumnName) > idx {
columnName = string(ev.ColumnName[idx])
}
err := exceptions.NewMySQLPartialJSONUnsupportedError(
string(ev.Schema), string(ev.Table), columnName)
logger.Warn(err.Error())
return nil, err
case []byte:
switch qkind {
case types.QValueKindBytes:
Expand Down
29 changes: 29 additions & 0 deletions flow/connectors/mysql/qvalue_convert_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package connmysql

import (
"log/slog"
"testing"
"time"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/stretchr/testify/require"
temporallog "go.temporal.io/sdk/log"

"github.com/PeerDB-io/peerdb/flow/shared"
"github.com/PeerDB-io/peerdb/flow/shared/exceptions"
"github.com/PeerDB-io/peerdb/flow/shared/types"
)

Expand Down Expand Up @@ -38,6 +42,31 @@ func TestQkindFromMysqlType_Bit(t *testing.T) {
}
}

func TestQValueFromMysqlRowEventJsonDiffErrors(t *testing.T) {
logger := temporallog.NewStructuredLogger(slog.New(slog.DiscardHandler))
ev := &replication.TableMapEvent{
Schema: []byte("test_db"),
Table: []byte("test_table"),
ColumnType: []byte{mysql.MYSQL_TYPE_JSON},
ColumnMeta: []uint16{0},
ColumnName: [][]byte{[]byte("doc")},
}
var coercionReported bool

_, err := QValueFromMysqlRowEvent(
ev, 0, nil, nil, types.QValueKindJSON,
&replication.JsonDiff{Op: replication.JsonDiffOperationReplace, Path: "$.a", Value: `"b"`},
logger, &coercionReported,
)
require.Error(t, err)

var partialJSONErr *exceptions.MySQLPartialJSONUnsupportedError
require.ErrorAs(t, err, &partialJSONErr)
require.Equal(t, "test_db", partialJSONErr.SchemaName)
require.Equal(t, "test_table", partialJSONErr.TableName)
require.Equal(t, "doc", partialJSONErr.ColumnName)
}

func TestProcessTime(t *testing.T) {
epoch := time.Unix(0, 0).UTC()
for _, ts := range []struct {
Expand Down
143 changes: 143 additions & 0 deletions flow/e2e/clickhouse_mysql_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package e2e

import (
"context"
"encoding/hex"
"fmt"
"math"
Expand All @@ -10,12 +11,16 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"

"github.com/PeerDB-io/peerdb/flow/connectors"
connclickhouse "github.com/PeerDB-io/peerdb/flow/connectors/clickhouse"
"github.com/PeerDB-io/peerdb/flow/e2eshared"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/internal"
"github.com/PeerDB-io/peerdb/flow/pkg/clickhouse"
"github.com/PeerDB-io/peerdb/flow/pkg/common"
mysql_validation "github.com/PeerDB-io/peerdb/flow/pkg/mysql"
"github.com/PeerDB-io/peerdb/flow/shared"
"github.com/PeerDB-io/peerdb/flow/shared/datatypes"
Expand Down Expand Up @@ -1468,3 +1473,141 @@ func (s ClickHouseSuite) Test_MySQL_Column_Position_Shifting_DDL_Error() {
})
}
}

func (s ClickHouseSuite) Test_MySQL_PartialJSONUnsupported() {
if s.cluster {
s.t.Skip("source-side PARTIAL_JSON coverage does not need to run against ClickHouse cluster")
}
if _, ok := s.source.(*MySqlSource); !ok {
s.t.Skip("only applies to mysql")
}
if internal.MySQLTestVersionIsMaria() {
s.t.Skip("PARTIAL_UPDATE_ROWS_EVENT is MySQL-specific")
}

req := testcontainers.ContainerRequest{
Image: "ghcr.io/peerdb-io/mysql-debug:8.0.46",
Env: map[string]string{
"MYSQL_ROOT_PASSWORD": internal.MySQLTestRootPasswordWithFallback("cipass"),
"MYSQL_ROOT_HOST": "%",
},
Cmd: []string{
"mysqld",
"--server-id=1",
"--log-bin=mysql-bin",
"--binlog-format=ROW",
"--binlog-row-image=FULL",
"--binlog-row-metadata=FULL",
"--innodb-buffer-pool-size=64M",
"--performance-schema=OFF",
"--mysqlx=0",
"--max-connections=20",
"--table-open-cache=64",
"--table-definition-cache=128",
"--innodb-log-buffer-size=8M",
},
ExposedPorts: []string{"3306/tcp"},
WaitingFor: wait.ForAll(
wait.ForLog("ready for connections").WithOccurrence(2).WithStartupTimeout(3*time.Minute),
wait.ForListeningPort("3306/tcp").WithStartupTimeout(3*time.Minute),
),
}

ctr, err := testcontainers.GenericContainer(s.t.Context(), testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
if ctr != nil {
testcontainers.CleanupContainer(s.t, ctr, testcontainers.StopTimeout(30*time.Second))
}
require.NoError(s.t, err)

mapped, err := ctr.MappedPort(s.t.Context(), "3306/tcp")
require.NoError(s.t, err)
port, err := strconv.Atoi(mapped.Port())
require.NoError(s.t, err)

replication := protos.MySqlReplicationMechanism_MYSQL_GTID
if internal.MySQLTestVersionIsMySQLPos() {
replication = protos.MySqlReplicationMechanism_MYSQL_FILEPOS
}

suffix := "mypjson_" + strings.ToLower(common.RandomString(8))
config := &protos.MySqlConfig{
Host: internal.MySQLTestHost(),
Port: uint32(port),
User: "root",
Password: internal.MySQLTestRootPasswordWithFallback("cipass"),
DisableTls: true,
Flavor: protos.MySqlFlavor_MYSQL_MYSQL,
ReplicationMechanism: replication,
}
src, err := setupMyConnector(s.t, suffix, config, "mysql_partial_json_"+suffix)
require.NoError(s.t, err)
s.t.Cleanup(func() {
src.Teardown(s.t, context.Background(), suffix)
require.NoError(s.t, src.Close())
})

srcTableName := "partial_json"
srcFullName := fmt.Sprintf("e2e_test_%s.%s", suffix, srcTableName)
dstTableName := "partial_json_dst"

require.NoError(s.t, src.Exec(s.t.Context(), fmt.Sprintf(
`CREATE TABLE %s (id INT PRIMARY KEY, doc JSON NOT NULL)`, srcFullName)))
require.NoError(s.t, src.Exec(s.t.Context(), fmt.Sprintf(
`INSERT INTO %s VALUES (
1,
'{"a":"aaaaaaaaaaaaa","c":"ccccccccccccccc","ab":["abababababababa","babababababab"]}'
)`, srcFullName)))

connectionGen := FlowConnectionGenerationConfig{
FlowJobName: "test_mysql_partial_json_" + suffix,
TableMappings: []*protos.TableMapping{{
SourceTableIdentifier: srcFullName,
DestinationTableIdentifier: dstTableName,
ShardingKey: "id",
}},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.SourceName = src.GeneratePeer(s.t).Name
flowConnConfig.DoInitialSnapshot = true

tc := NewTemporalClient(s.t)
env := ExecutePeerflow(s.t, tc, flowConnConfig)
SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
EnvWaitFor(s.t, env, 3*time.Minute, "snapshot", func() bool {
sourceRows, err := src.GetRows(s.t.Context(), suffix, srcTableName, "id,doc")
if err != nil {
s.t.Log(err)
return false
}
rows, err := s.GetRows(dstTableName, "id,doc")
if err != nil {
s.t.Log(err)
return false
}
return e2eshared.CheckEqualRecordBatches(s.t, sourceRows, rows)
})

require.NoError(s.t, src.Exec(s.t.Context(), "SET GLOBAL binlog_row_value_options = 'PARTIAL_JSON'"))
require.NoError(s.t, src.Exec(s.t.Context(), "SET SESSION binlog_row_value_options = 'PARTIAL_JSON'"))
require.NoError(s.t, src.Exec(s.t.Context(), fmt.Sprintf(
`UPDATE %s SET doc = JSON_SET(doc, '$.ab', '["ab_updatedccc"]') WHERE id = 1`, srcFullName)))

catalogPool, err := internal.GetCatalogConnectionPoolFromEnv(s.t.Context())
require.NoError(s.t, err)
EnvWaitFor(s.t, env, 3*time.Minute, "waiting for partial JSON error", func() bool {
count, err := GetLogCount(s.t.Context(), catalogPool, flowConnConfig.FlowJobName, "error",
"disable binlog_row_value_options (PARTIAL_JSON)")
if err != nil {
s.t.Log("Error querying flow_errors:", err)
return false
}
return count > 0
})

env.Cancel(s.t.Context())
RequireEnvCanceled(s.t, env)
}
Loading
Loading