From 07caa99bfe8b4033ac27610ce1646ed26de286bb Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Thu, 18 Jun 2026 22:10:19 -0700 Subject: [PATCH 1/6] [WIP] 813 PARTIAL_UPDATE_ROWS_EVENT (PARTIAL_JSON enabled post-creation) From 1acc23239cc681299619ef3573ebeb5c53e7c57b Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Thu, 18 Jun 2026 22:24:49 -0700 Subject: [PATCH 2/6] 813 fail loud on partial JSON row events --- flow/alerting/classifier.go | 24 ++++ flow/alerting/classifier_test.go | 31 ++++ flow/connectors/mysql/cdc.go | 11 ++ flow/connectors/mysql/qvalue_convert.go | 10 +- flow/connectors/mysql/qvalue_convert_test.go | 30 ++++ flow/e2e/clickhouse_mysql_test.go | 140 +++++++++++++++++++ flow/e2e/mysql.go | 22 ++- flow/go.mod | 30 ++++ flow/go.sum | 56 ++++++++ flow/shared/exceptions/mysql.go | 43 ++++++ 10 files changed, 389 insertions(+), 8 deletions(-) diff --git a/flow/alerting/classifier.go b/flow/alerting/classifier.go index 9a6463af0..c7b1db852 100644 --- a/flow/alerting/classifier.go +++ b/flow/alerting/classifier.go @@ -1104,6 +1104,30 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) { } } + if partialJSONError, ok := errors.AsType[*exceptions.MySQLPartialJSONUnsupportedError](err); ok { + attrs := map[AdditionalErrorAttributeKey]string{ + ErrorAttributeKeyTable: partialJSONError.SchemaName + "." + partialJSONError.TableName, + } + if partialJSONError.ColumnName != "" { + attrs[ErrorAttributeKeyColumn] = partialJSONError.ColumnName + } + return ErrorNotifyBinlogInvalid, ErrorInfo{ + Source: ErrorSourceMySQL, + Code: "UNSUPPORTED_BINLOG_ROW_VALUE_OPTIONS", + AdditionalAttributes: attrs, + } + } + + if unhandledRowsEventError, ok := errors.AsType[*exceptions.MySQLUnhandledRowsEventError](err); ok { + return ErrorNotifyBinlogInvalid, ErrorInfo{ + Source: ErrorSourceMySQL, + Code: "UNHANDLED_ROWS_EVENT", + AdditionalAttributes: map[AdditionalErrorAttributeKey]string{ + ErrorAttributeKeyTable: unhandledRowsEventError.SchemaName + "." + unhandledRowsEventError.TableName, + }, + } + } + if unsupportedDDLError, ok := errors.AsType[*exceptions.MySQLUnsupportedDDLError](err); ok { return ErrorNotifyBinlogRowMetadataInvalid, ErrorInfo{ Source: ErrorSourceMySQL, diff --git a/flow/alerting/classifier_test.go b/flow/alerting/classifier_test.go index db9781f25..2cf435b55 100644 --- a/flow/alerting/classifier_test.go +++ b/flow/alerting/classifier_test.go @@ -1214,3 +1214,34 @@ func TestMySQLBinlogIncidentErrorShouldBeNotifyBinlogInvalid(t *testing.T) { Code: "BINLOG_INCIDENT", }, errInfo) } + +func TestMySQLPartialJSONUnsupportedShouldBeNotifyBinlogInvalid(t *testing.T) { + t.Parallel() + + err := exceptions.NewMySQLPartialJSONUnsupportedError("test_db", "test_table", "doc") + errorClass, errInfo := GetErrorClass(t.Context(), fmt.Errorf("pulling records failed: %w", err)) + assert.Equal(t, ErrorNotifyBinlogInvalid, errorClass) + assert.Equal(t, ErrorInfo{ + Source: ErrorSourceMySQL, + Code: "UNSUPPORTED_BINLOG_ROW_VALUE_OPTIONS", + AdditionalAttributes: map[AdditionalErrorAttributeKey]string{ + ErrorAttributeKeyTable: "test_db.test_table", + ErrorAttributeKeyColumn: "doc", + }, + }, errInfo) +} + +func TestMySQLUnhandledRowsEventShouldBeNotifyBinlogInvalid(t *testing.T) { + t.Parallel() + + err := exceptions.NewMySQLUnhandledRowsEventError("test_db", "test_table", "UnknownEvent", 255) + errorClass, errInfo := GetErrorClass(t.Context(), fmt.Errorf("pulling records failed: %w", err)) + assert.Equal(t, ErrorNotifyBinlogInvalid, errorClass) + assert.Equal(t, ErrorInfo{ + Source: ErrorSourceMySQL, + Code: "UNHANDLED_ROWS_EVENT", + AdditionalAttributes: map[AdditionalErrorAttributeKey]string{ + ErrorAttributeKeyTable: "test_db.test_table", + }, + }, errInfo) +} diff --git a/flow/connectors/mysql/cdc.go b/flow/connectors/mysql/cdc.go index 77a4cbfc6..02a5d660c 100644 --- a/flow/connectors/mysql/cdc.go +++ b/flow/connectors/mysql/cdc.go @@ -635,6 +635,11 @@ func (c *MySqlConnector) PullRecords( return nil } switch event.Header.EventType { + case replication.PARTIAL_UPDATE_ROWS_EVENT: + e := exceptions.NewMySQLPartialJSONUnsupportedError( + string(ev.Table.Schema), string(ev.Table.Table), "") + c.logger.Error(e.Error()) + return e case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2, replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: for _, row := range ev.Rows { items := model.NewRecordItems(len(row)) @@ -755,6 +760,12 @@ func (c *MySqlConnector) PullRecords( } case replication.WRITE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv0: return fmt.Errorf("mysql v0 replication protocol not supported") + default: + e := exceptions.NewMySQLUnhandledRowsEventError( + string(ev.Table.Schema), string(ev.Table.Table), + event.Header.EventType.String(), uint16(event.Header.EventType)) + c.logger.Error(e.Error()) + return e } } if event.Header.Timestamp > 0 { diff --git a/flow/connectors/mysql/qvalue_convert.go b/flow/connectors/mysql/qvalue_convert.go index 601b918cd..9202b1ba1 100644 --- a/flow/connectors/mysql/qvalue_convert.go +++ b/flow/connectors/mysql/qvalue_convert.go @@ -500,8 +500,14 @@ func QValueFromMysqlRowEvent( case time.Time: return types.QValueTimestamp{Val: val}, nil case *replication.JsonDiff: - // TODO support somehow?? - return types.QValueNull(types.QValueKindJSON), nil + columnName := "__peerdb_unknown_" + strconv.Itoa(idx) + if len(ev.ColumnName) > idx { + columnName = string(ev.ColumnName[idx]) + } + err := exceptions.NewMySQLPartialJSONUnsupportedError( + string(ev.Schema), string(ev.Table), columnName) + logger.Warn(err.Error()) + return nil, err case []byte: switch qkind { case types.QValueKindBytes: diff --git a/flow/connectors/mysql/qvalue_convert_test.go b/flow/connectors/mysql/qvalue_convert_test.go index 6d6d2fca9..02b1021b7 100644 --- a/flow/connectors/mysql/qvalue_convert_test.go +++ b/flow/connectors/mysql/qvalue_convert_test.go @@ -1,13 +1,18 @@ package connmysql import ( + "io" + "log/slog" "testing" "time" "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" "github.com/stretchr/testify/require" + temporallog "go.temporal.io/sdk/log" "github.com/PeerDB-io/peerdb/flow/shared" + "github.com/PeerDB-io/peerdb/flow/shared/exceptions" "github.com/PeerDB-io/peerdb/flow/shared/types" ) @@ -38,6 +43,31 @@ func TestQkindFromMysqlType_Bit(t *testing.T) { } } +func TestQValueFromMysqlRowEventJsonDiffErrors(t *testing.T) { + logger := temporallog.NewStructuredLogger(slog.New(slog.NewTextHandler(io.Discard, nil))) + ev := &replication.TableMapEvent{ + Schema: []byte("test_db"), + Table: []byte("test_table"), + ColumnType: []byte{mysql.MYSQL_TYPE_JSON}, + ColumnMeta: []uint16{0}, + ColumnName: [][]byte{[]byte("doc")}, + } + var coercionReported bool + + _, err := QValueFromMysqlRowEvent( + ev, 0, nil, nil, types.QValueKindJSON, + &replication.JsonDiff{Op: replication.JsonDiffOperationReplace, Path: "$.a", Value: `"b"`}, + logger, &coercionReported, + ) + require.Error(t, err) + + var partialJSONErr *exceptions.MySQLPartialJSONUnsupportedError + require.ErrorAs(t, err, &partialJSONErr) + require.Equal(t, "test_db", partialJSONErr.SchemaName) + require.Equal(t, "test_table", partialJSONErr.TableName) + require.Equal(t, "doc", partialJSONErr.ColumnName) +} + func TestProcessTime(t *testing.T) { epoch := time.Unix(0, 0).UTC() for _, ts := range []struct { diff --git a/flow/e2e/clickhouse_mysql_test.go b/flow/e2e/clickhouse_mysql_test.go index 727d50d29..5a801a330 100644 --- a/flow/e2e/clickhouse_mysql_test.go +++ b/flow/e2e/clickhouse_mysql_test.go @@ -1,6 +1,7 @@ package e2e import ( + "context" "encoding/hex" "fmt" "math" @@ -10,12 +11,16 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" "github.com/PeerDB-io/peerdb/flow/connectors" connclickhouse "github.com/PeerDB-io/peerdb/flow/connectors/clickhouse" + "github.com/PeerDB-io/peerdb/flow/e2eshared" "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/internal" "github.com/PeerDB-io/peerdb/flow/pkg/clickhouse" + "github.com/PeerDB-io/peerdb/flow/pkg/common" mysql_validation "github.com/PeerDB-io/peerdb/flow/pkg/mysql" "github.com/PeerDB-io/peerdb/flow/shared" "github.com/PeerDB-io/peerdb/flow/shared/datatypes" @@ -1468,3 +1473,138 @@ func (s ClickHouseSuite) Test_MySQL_Column_Position_Shifting_DDL_Error() { }) } } + +func (s ClickHouseSuite) Test_MySQL_PartialJSONUnsupported() { + if s.cluster { + s.t.Skip("source-side PARTIAL_JSON coverage does not need to run against ClickHouse cluster") + } + if _, ok := s.source.(*MySqlSource); !ok { + s.t.Skip("only applies to mysql") + } + if internal.MySQLTestVersionIsMaria() { + s.t.Skip("PARTIAL_UPDATE_ROWS_EVENT is MySQL-specific") + } + + req := testcontainers.ContainerRequest{ + Image: "ghcr.io/peerdb-io/mysql-debug:8.0.46", + Env: map[string]string{ + "MYSQL_ROOT_PASSWORD": internal.MySQLTestRootPasswordWithFallback("cipass"), + "MYSQL_ROOT_HOST": "%", + }, + Cmd: []string{ + "mysqld", + "--server-id=1", + "--log-bin=mysql-bin", + "--binlog-format=ROW", + "--binlog-row-image=FULL", + "--binlog-row-metadata=FULL", + "--innodb-buffer-pool-size=64M", + "--performance-schema=OFF", + "--mysqlx=0", + "--max-connections=20", + "--table-open-cache=64", + "--table-definition-cache=128", + "--innodb-log-buffer-size=8M", + }, + ExposedPorts: []string{"3306/tcp"}, + WaitingFor: wait.ForListeningPort("3306/tcp").WithStartupTimeout(3 * time.Minute), + } + + ctr, err := testcontainers.GenericContainer(s.t.Context(), testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if ctr != nil { + testcontainers.CleanupContainer(s.t, ctr, testcontainers.StopTimeout(30*time.Second)) + } + require.NoError(s.t, err) + + mapped, err := ctr.MappedPort(s.t.Context(), "3306/tcp") + require.NoError(s.t, err) + port, err := strconv.Atoi(mapped.Port()) + require.NoError(s.t, err) + + replication := protos.MySqlReplicationMechanism_MYSQL_GTID + if internal.MySQLTestVersionIsMySQLPos() { + replication = protos.MySqlReplicationMechanism_MYSQL_FILEPOS + } + + suffix := "mypjson_" + strings.ToLower(common.RandomString(8)) + config := &protos.MySqlConfig{ + Host: internal.MySQLTestHost(), + Port: uint32(port), + User: "root", + Password: internal.MySQLTestRootPasswordWithFallback("cipass"), + DisableTls: true, + Flavor: protos.MySqlFlavor_MYSQL_MYSQL, + ReplicationMechanism: replication, + } + src, err := setupMyConnector(s.t, suffix, config, "mysql_partial_json_"+suffix) + require.NoError(s.t, err) + s.t.Cleanup(func() { + src.Teardown(s.t, context.Background(), suffix) + require.NoError(s.t, src.Close()) + }) + + srcTableName := "partial_json" + srcFullName := fmt.Sprintf("e2e_test_%s.%s", suffix, srcTableName) + dstTableName := "partial_json_dst" + + require.NoError(s.t, src.Exec(s.t.Context(), fmt.Sprintf( + `CREATE TABLE %s (id INT PRIMARY KEY, doc JSON NOT NULL)`, srcFullName))) + require.NoError(s.t, src.Exec(s.t.Context(), fmt.Sprintf( + `INSERT INTO %s VALUES ( + 1, + '{"a":"aaaaaaaaaaaaa","c":"ccccccccccccccc","ab":["abababababababa","babababababab"]}' + )`, srcFullName))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: "test_mysql_partial_json_" + suffix, + TableMappings: []*protos.TableMapping{{ + SourceTableIdentifier: srcFullName, + DestinationTableIdentifier: dstTableName, + ShardingKey: "id", + }}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.SourceName = src.GeneratePeer(s.t).Name + flowConnConfig.DoInitialSnapshot = true + + tc := NewTemporalClient(s.t) + env := ExecutePeerflow(s.t, tc, flowConnConfig) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + EnvWaitFor(s.t, env, 3*time.Minute, "snapshot", func() bool { + sourceRows, err := src.GetRows(s.t.Context(), suffix, srcTableName, "id,doc") + if err != nil { + s.t.Log(err) + return false + } + rows, err := s.GetRows(dstTableName, "id,doc") + if err != nil { + s.t.Log(err) + return false + } + return e2eshared.CheckEqualRecordBatches(s.t, sourceRows, rows) + }) + + require.NoError(s.t, src.Exec(s.t.Context(), "SET GLOBAL binlog_row_value_options = 'PARTIAL_JSON'")) + require.NoError(s.t, src.Exec(s.t.Context(), "SET SESSION binlog_row_value_options = 'PARTIAL_JSON'")) + require.NoError(s.t, src.Exec(s.t.Context(), fmt.Sprintf( + `UPDATE %s SET doc = JSON_SET(doc, '$.ab', '["ab_updatedccc"]') WHERE id = 1`, srcFullName))) + + catalogPool, err := internal.GetCatalogConnectionPoolFromEnv(s.t.Context()) + require.NoError(s.t, err) + EnvWaitFor(s.t, env, 3*time.Minute, "waiting for partial JSON error", func() bool { + count, err := GetLogCount(s.t.Context(), catalogPool, flowConnConfig.FlowJobName, "error", + "disable binlog_row_value_options (PARTIAL_JSON)") + if err != nil { + s.t.Log("Error querying flow_errors:", err) + return false + } + return count > 0 + }) + + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} diff --git a/flow/e2e/mysql.go b/flow/e2e/mysql.go index d054d5a37..aa0a9433e 100644 --- a/flow/e2e/mysql.go +++ b/flow/e2e/mysql.go @@ -20,6 +20,8 @@ import ( type MySqlSource struct { *connmysql.MySqlConnector Config *protos.MySqlConfig + // peer name, defaults to "mysql" + Name string } func SetupMySQL(t *testing.T, suffix string) (*MySqlSource, error) { @@ -60,7 +62,11 @@ func SetupMyCore(t *testing.T, suffix string, replication protos.MySqlReplicatio Flavor: flavor, ReplicationMechanism: replication, } + return setupMyConnector(t, suffix, config, "") +} +func setupMyConnector(t *testing.T, suffix string, config *protos.MySqlConfig, peerName string) (*MySqlSource, error) { + t.Helper() connector, err := connmysql.NewMySqlConnector(t.Context(), config) if err != nil { return nil, fmt.Errorf("failed to create mysql connection: %w", err) @@ -92,7 +98,7 @@ func SetupMyCore(t *testing.T, suffix string, replication protos.MySqlReplicatio setupSql = append(setupSql, "set global binlog_row_metadata=full") } - if flavor != protos.MySqlFlavor_MYSQL_MARIA { + if config.Flavor != protos.MySqlFlavor_MYSQL_MARIA { rs, err := connector.Execute(t.Context(), "select @@gtid_mode") if err != nil { connector.Close() @@ -103,7 +109,7 @@ func SetupMyCore(t *testing.T, suffix string, replication protos.MySqlReplicatio connector.Close() return nil, err } - if replication == protos.MySqlReplicationMechanism_MYSQL_GTID { + if config.ReplicationMechanism == protos.MySqlReplicationMechanism_MYSQL_GTID { if strings.EqualFold(gtidMode, "off") { // The value of @@GLOBAL.GTID_MODE can only be changed one step at a time: // OFF <-> OFF_PERMISSIVE <-> ON_PERMISSIVE <-> ON @@ -116,7 +122,7 @@ func SetupMyCore(t *testing.T, suffix string, replication protos.MySqlReplicatio "do release_lock('settings')", ) } - } else if replication == protos.MySqlReplicationMechanism_MYSQL_FILEPOS { + } else if config.ReplicationMechanism == protos.MySqlReplicationMechanism_MYSQL_FILEPOS { if strings.EqualFold(gtidMode, "on") { // The value of @@GLOBAL.GTID_MODE can only be changed one step at a time: // ON <-> ON_PERMISSIVE <-> OFF_PERMISSIVE <-> OFF @@ -130,7 +136,7 @@ func SetupMyCore(t *testing.T, suffix string, replication protos.MySqlReplicatio ) } } else { - return nil, fmt.Errorf("unexpected replication mechanism: %v", replication) + return nil, fmt.Errorf("unexpected replication mechanism: %v", config.ReplicationMechanism) } } @@ -141,7 +147,7 @@ func SetupMyCore(t *testing.T, suffix string, replication protos.MySqlReplicatio } } - return &MySqlSource{MySqlConnector: connector, Config: config}, nil + return &MySqlSource{MySqlConnector: connector, Config: config, Name: peerName}, nil } func (s *MySqlSource) Connector() connectors.Connector { @@ -161,8 +167,12 @@ func (s *MySqlSource) Teardown(t *testing.T, ctx context.Context, suffix string) func (s *MySqlSource) GeneratePeer(t *testing.T) *protos.Peer { t.Helper() + name := s.Name + if name == "" { + name = "mysql" + } peer := &protos.Peer{ - Name: "mysql", + Name: name, Type: protos.DBType_MYSQL, Config: &protos.Peer_MysqlConfig{ MysqlConfig: s.Config, diff --git a/flow/go.mod b/flow/go.mod index fff38ea24..4eeb57f9c 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -52,6 +52,7 @@ require ( github.com/slack-go/slack v0.24.0 github.com/snowflakedb/gosnowflake/v2 v2.0.2 github.com/stretchr/testify v1.11.1 + github.com/testcontainers/testcontainers-go v0.42.0 github.com/twmb/franz-go v1.21.2 github.com/twmb/franz-go/pkg/kadm v1.18.0 github.com/twmb/franz-go/plugin/kslog v1.0.0 @@ -99,6 +100,35 @@ require ( github.com/quasilyte/go-ruleguard/dsl v0.3.23 ) +require ( + dario.cat/mergo v1.0.2 // indirect + github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect + github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/containerd/errdefs v1.0.0 // indirect + github.com/containerd/errdefs/pkg v0.3.0 // indirect + github.com/containerd/log v0.1.0 // indirect + github.com/containerd/platforms v0.2.1 // indirect + github.com/cpuguy83/dockercfg v0.3.2 // indirect + github.com/distribution/reference v0.6.0 // indirect + github.com/docker/go-connections v0.6.0 // indirect + github.com/ebitengine/purego v0.10.0 // indirect + github.com/magiconair/properties v1.8.10 // indirect + github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/go-archive v0.2.0 // indirect + github.com/moby/moby/api v1.54.1 // indirect + github.com/moby/moby/client v0.4.0 // indirect + github.com/moby/patternmatcher v0.6.1 // indirect + github.com/moby/sys/sequential v0.6.0 // indirect + github.com/moby/sys/user v0.4.0 // indirect + github.com/moby/sys/userns v0.1.0 // indirect + github.com/moby/term v0.5.2 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.1 // indirect + github.com/shirou/gopsutil/v4 v4.26.3 // indirect + github.com/sirupsen/logrus v1.9.4 // indirect +) + require ( cloud.google.com/go/kms v1.31.0 cloud.google.com/go/longrunning v0.9.0 // indirect diff --git a/flow/go.sum b/flow/go.sum index ca0f7dabc..ff41e5e31 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -29,6 +29,8 @@ cloud.google.com/go/storage v1.62.1 h1:Os0G3XbUbjZumkpDUf2Y0rLoXJTCF1kU2kWUujKYX cloud.google.com/go/storage v1.62.1/go.mod h1:cpYz/kRVZ+UQAF1uHeea10/9ewcRbxGoGNKsS9daSXA= cloud.google.com/go/trace v1.11.7 h1:kDNDX8JkaAG3R2nq1lIdkb7FCSi1rCmsEtKVsty7p+U= cloud.google.com/go/trace v1.11.7/go.mod h1:TNn9d5V3fQVf6s4SCveVMIBS2LJUqo73GACmq/Tky0s= +dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8= +dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA= filippo.io/edwards25519 v1.2.0 h1:crnVqOiS4jqYleHd9vaKZ+HKtHfllngJIiOpNpoJsjo= filippo.io/edwards25519 v1.2.0/go.mod h1:xzAOLCNug/yB62zG1bQ8uziwrIqIuxhctzJT18Q77mc= github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs= @@ -57,6 +59,8 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4 h1:jWQK1GI+LeGGUKBAD github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4/go.mod h1:8mwH4klAm9DUgR2EEHyEEAQlRDvLPyg5fQry3y+cDew= github.com/Azure/go-amqp v1.5.1 h1:WyiPTz2C3zVvDL7RLAqwWdeoYhMtX62MZzQoP09fzsU= github.com/Azure/go-amqp v1.5.1/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= +github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c h1:udKWzYgxTojEKWjV8V+WSxDXJ4NFATAsZjh8iIbsQIg= +github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1 h1:WJTmL004Abzc5wDB5VtZG2PJk5ndYDgVacGqfirKxjM= @@ -83,6 +87,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/PeerDB-io/glua64 v1.0.1 h1:biXLlFF/L5pnJCwDon7hkWkuQPozC8NjKS3J7Wzi69I= github.com/PeerDB-io/glua64 v1.0.1/go.mod h1:UHmAhniv61bJPMhQvxkpC7jXbn353dSbQviu83bgQVg= github.com/PeerDB-io/gluabit32 v1.0.2 h1:AGI1Z7dwDVotakpuOOuyTX4/QGi5HUYsipL/VfodmO4= @@ -170,6 +176,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwPhhyuFkbINB+2a1xATwk8SNDWnJiD41g= github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -201,6 +209,14 @@ github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb h1:3bCgBvB github.com/cockroachdb/tokenbucket v0.0.0-20250429170803-42689b6311bb/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= +github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= +github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= +github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= +github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk= +github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= +github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= +github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= +github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw= github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64 h1:W1SHiII3e0jVwvaQFglwu3kS9NLxOeTpvik7MbKCyuQ= github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64/go.mod h1:F86k/6c7aDUdwSUevnLpHS/3Q9hzYCE99jGk2xsHnt0= github.com/coocood/freecache v1.2.1 h1:/v1CqMq45NFH9mp/Pt142reundeBM0dVUD3osQBeu/U= @@ -211,6 +227,8 @@ github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= github.com/coreos/go-systemd/v22 v22.7.0 h1:LAEzFkke61DFROc7zNLX/WA2i5J8gYqe0rSj9KI28KA= github.com/coreos/go-systemd/v22 v22.7.0/go.mod h1:xNUYtjHu2EDXbsxz1i41wouACIwT7Ybq9o0BQhMwD0w= +github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA= +github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 h1:iwZdTE0PVqJCos1vaoKsclOGD3ADKpshg3SRtYBbwso= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= @@ -228,6 +246,10 @@ github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWa github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38= github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/go-connections v0.6.0 h1:LlMG9azAe1TqfR7sO+NJttz1gy6KO7VJBh+pMmjSD94= +github.com/docker/go-connections v0.6.0/go.mod h1:AahvXYshr6JgfUJGdDCs2b5EZG/vmaMAntpSFH5BFKE= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ= @@ -238,6 +260,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/dvsekhvalnov/jose2go v1.8.0 h1:LqkkVKAlHFfH9LOEl5fe4p/zL02OhWE7pCufMBG2jLA= github.com/dvsekhvalnov/jose2go v1.8.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= +github.com/ebitengine/purego v0.10.0 h1:QIw4xfpWT6GWTzaW5XEKy3HXoqrJGx1ijYHzTF0/ISU= +github.com/ebitengine/purego v0.10.0/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/elastic/elastic-transport-go/v8 v8.10.0 h1:vzpe1BMLdShc7yWNV55U6aGk4UtYEOVsBJ5S4UIeY9Q= github.com/elastic/elastic-transport-go/v8 v8.10.0/go.mod h1:KB6jblnx4NnImxHKULFys7VQ472Av8uzrbkr6OtbOp8= github.com/elastic/go-elasticsearch/v8 v8.19.6 h1:4qa7ecJkr5rLsoHKIVGbaqcFt2o57CnOHQJi9Pts/rk= @@ -479,8 +503,28 @@ github.com/lestrrat-go/option/v2 v2.0.0/go.mod h1:oSySsmzMoR0iRzCDCaUfsCzxQHUEuh github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lufia/plan9stats v0.0.0-20260330125221-c963978e514e h1:Q6MvJtQK/iRcRtzAscm/zF23XxJlbECiGPyRicsX+Ak= github.com/lufia/plan9stats v0.0.0-20260330125221-c963978e514e/go.mod h1:autxFIvghDt3jPTLoqZ9OZ7s9qTGNAWmYCjVFWPX/zg= +github.com/magiconair/properties v1.8.10 h1:s31yESBquKXCV9a/ScB3ESkOjUYYv+X0rg8SYxI99mE= +github.com/magiconair/properties v1.8.10/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/minio/minlz v1.1.0 h1:rUOGu3EP4EqJC5k3qCsIwEnZiJULKqtRyDdqbhlvMmQ= github.com/minio/minlz v1.1.0/go.mod h1:qT0aEB35q79LLornSzeDH75LBf3aH1MV+jB5w9Wasec= +github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= +github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= +github.com/moby/go-archive v0.2.0 h1:zg5QDUM2mi0JIM9fdQZWC7U8+2ZfixfTYoHL7rWUcP8= +github.com/moby/go-archive v0.2.0/go.mod h1:mNeivT14o8xU+5q1YnNrkQVpK+dnNe/K6fHqnTg4qPU= +github.com/moby/moby/api v1.54.1 h1:TqVzuJkOLsgLDDwNLmYqACUuTehOHRGKiPhvH8V3Nn4= +github.com/moby/moby/api v1.54.1/go.mod h1:+RQ6wluLwtYaTd1WnPLykIDPekkuyD/ROWQClE83pzs= +github.com/moby/moby/client v0.4.0 h1:S+2XegzHQrrvTCvF6s5HFzcrywWQmuVnhOXe2kiWjIw= +github.com/moby/moby/client v0.4.0/go.mod h1:QWPbvWchQbxBNdaLSpoKpCdf5E+WxFAgNHogCWDoa7g= +github.com/moby/patternmatcher v0.6.1 h1:qlhtafmr6kgMIJjKJMDmMWq7WLkKIo23hsrpR3x084U= +github.com/moby/patternmatcher v0.6.1/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= +github.com/moby/sys/sequential v0.6.0 h1:qrx7XFUd/5DxtqcoH1h438hF5TmOvzC/lspjy7zgvCU= +github.com/moby/sys/sequential v0.6.0/go.mod h1:uyv8EUTrca5PnDsdMGXhZe6CCe8U/UiTWd+lL+7b/Ko= +github.com/moby/sys/user v0.4.0 h1:jhcMKit7SA80hivmFJcbB1vqmw//wU61Zdui2eQXuMs= +github.com/moby/sys/user v0.4.0/go.mod h1:bG+tYYYJgaMtRKgEmuueC0hJEAZWwtIbZTB+85uoHjs= +github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g= +github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28= +github.com/moby/term v0.5.2 h1:6qk3FJAFDs6i/q3W/pQ97SX192qKfZgGjCQqfCJkgzQ= +github.com/moby/term v0.5.2/go.mod h1:d3djjFCrjnB+fl8NJux+EJzu0msscUP+f8it8hPkFLc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -507,6 +551,10 @@ github.com/onsi/ginkgo/v2 v2.27.2 h1:LzwLj0b89qtIy6SSASkzlNvX6WktqurSHwkk2ipF/Ns github.com/onsi/ginkgo/v2 v2.27.2/go.mod h1:ArE1D/XhNXBXCBkKOLkbsb2c81dQHCRcF5zwn/ykDRo= github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A= github.com/onsi/gomega v1.38.2/go.mod h1:W2MJcYxRGV63b418Ai34Ud0hEdTVXq9NW9+Sx6uXf3k= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= +github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= github.com/opentracing/basictracer-go v1.1.0 h1:Oa1fTSBvAl8pa3U+IJYqrKm0NALwH9OsgwOqDv4xJW0= github.com/opentracing/basictracer-go v1.1.0/go.mod h1:V2HZueSJEp879yv285Aap1BS69fQMD+MNP1mRs6mBQc= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -593,9 +641,12 @@ github.com/sasha-s/go-deadlock v0.3.5 h1:tNCOEEDG6tBqrNDOX35j/7hL5FcFViG6awUGROb github.com/sasha-s/go-deadlock v0.3.5/go.mod h1:bugP6EGbdGYObIlx7pUZtWqlvo8k9H6vCBBsiChJQ5U= github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil/v3 v3.21.12/go.mod h1:BToYZVTlSVlfazpDDYFnsVZLaoRG+g8ufT6fPQLdJzA= github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk= +github.com/shirou/gopsutil/v4 v4.26.3 h1:2ESdQt90yU3oXF/CdOlRCJxrP+Am1aBYubTMTfxJ1qc= +github.com/shirou/gopsutil/v4 v4.26.3/go.mod h1:LZ6ewCSkBqUpvSOf+LsTGnRinC6iaNUNMGBtDkJBaLQ= github.com/shoenig/go-m1cpu v0.2.1 h1:yqRB4fvOge2+FyRXFkXqsyMoqPazv14Yyy+iyccT2E4= github.com/shoenig/go-m1cpu v0.2.1/go.mod h1:KkDOw6m3ZJQAPHbrzkZki4hnx+pDRR1Lo+ldA56wD5w= github.com/shoenig/test v1.7.0 h1:eWcHtTXa6QLnBvm0jgEabMRN/uJ4DMV3M8xUGgRkZmk= @@ -603,6 +654,8 @@ github.com/shoenig/test v1.7.0/go.mod h1:UxJ6u/x2v/TNs/LoLxBNJRV9DiwBBKYxXSyczsB github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w= +github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g= github.com/slack-go/slack v0.24.0 h1:oMz5WcCBVTkxpBxeA8QVxxK70R+XV/a0qNka6TRNGHQ= github.com/slack-go/slack v0.24.0/go.mod h1:H0yR/YBuRJ39RkE+JpV/d/oEsbanzTRowR82bCN0cEs= github.com/snowflakedb/gosnowflake/v2 v2.0.2 h1:8UZo+v1T2Y9sgoPk3JYT3RatAUd9o6q6yjL40TyHluA= @@ -627,6 +680,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/testcontainers/testcontainers-go v0.42.0 h1:He3IhTzTZOygSXLJPMX7n44XtK+qhjat1nI9cneBbUY= +github.com/testcontainers/testcontainers-go v0.42.0/go.mod h1:vZjdY1YmUA1qEForxOIOazfsrdyORJAbhi0bp8plN30= github.com/tiancaiamao/gp v0.0.0-20230126082955-4f9e4f1ed9b5 h1:4bvGDLXwsP4edNa9igJz+oU1kmZ6S3PSjrnOFgh5Xwk= github.com/tiancaiamao/gp v0.0.0-20230126082955-4f9e4f1ed9b5/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= github.com/tikv/pd/client v0.0.0-20251229071808-6173d50c004c h1:lmLyZv394nprmI5OQoioFlE/vvpKZs37PcPL/Wu34ug= @@ -822,6 +877,7 @@ golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211013075003-97ac67df715c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/flow/shared/exceptions/mysql.go b/flow/shared/exceptions/mysql.go index 907b6ee31..3a38957b9 100644 --- a/flow/shared/exceptions/mysql.go +++ b/flow/shared/exceptions/mysql.go @@ -55,6 +55,49 @@ func (e *MySQLUnsupportedBinlogRowMetadataError) Error() string { e.SchemaName, e.TableName) } +type MySQLPartialJSONUnsupportedError struct { + SchemaName string + TableName string + ColumnName string +} + +func NewMySQLPartialJSONUnsupportedError(schema string, table string, column string) *MySQLPartialJSONUnsupportedError { + return &MySQLPartialJSONUnsupportedError{SchemaName: schema, TableName: table, ColumnName: column} +} + +func (e *MySQLPartialJSONUnsupportedError) Error() string { + tableName := fmt.Sprintf("%s.%s", e.SchemaName, e.TableName) + if e.ColumnName != "" { + tableName = fmt.Sprintf("%s.%s", tableName, e.ColumnName) + } + return fmt.Sprintf( + "PARTIAL_UPDATE_ROWS_EVENT received for %s: disable binlog_row_value_options (PARTIAL_JSON) "+ + "on the source; PeerDB cannot apply partial JSON diffs", tableName) +} + +type MySQLUnhandledRowsEventError struct { + SchemaName string + TableName string + EventName string + EventType uint16 +} + +func NewMySQLUnhandledRowsEventError( + schema string, table string, eventName string, eventType uint16, +) *MySQLUnhandledRowsEventError { + return &MySQLUnhandledRowsEventError{ + SchemaName: schema, + TableName: table, + EventName: eventName, + EventType: eventType, + } +} + +func (e *MySQLUnhandledRowsEventError) Error() string { + return fmt.Sprintf("Unhandled MySQL rows event %s (%d) while processing %s.%s", + e.EventName, e.EventType, e.SchemaName, e.TableName) +} + type MySQLUnsupportedDDLError struct { TableName string } From 632aa339de2b1f6280310bf28222911621faa50d Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Thu, 18 Jun 2026 22:34:07 -0700 Subject: [PATCH 3/6] 813 satisfy slog lint --- flow/connectors/mysql/qvalue_convert_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flow/connectors/mysql/qvalue_convert_test.go b/flow/connectors/mysql/qvalue_convert_test.go index 02b1021b7..b097257b2 100644 --- a/flow/connectors/mysql/qvalue_convert_test.go +++ b/flow/connectors/mysql/qvalue_convert_test.go @@ -1,7 +1,6 @@ package connmysql import ( - "io" "log/slog" "testing" "time" @@ -44,7 +43,7 @@ func TestQkindFromMysqlType_Bit(t *testing.T) { } func TestQValueFromMysqlRowEventJsonDiffErrors(t *testing.T) { - logger := temporallog.NewStructuredLogger(slog.New(slog.NewTextHandler(io.Discard, nil))) + logger := temporallog.NewStructuredLogger(slog.New(slog.DiscardHandler)) ev := &replication.TableMapEvent{ Schema: []byte("test_db"), Table: []byte("test_table"), From 0dba93b352a57a20b82563bea74bc6c9e1afdafd Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Fri, 19 Jun 2026 09:10:49 -0700 Subject: [PATCH 4/6] 813 fix partial JSON test container host --- flow/e2e/clickhouse_mysql_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flow/e2e/clickhouse_mysql_test.go b/flow/e2e/clickhouse_mysql_test.go index 5a801a330..82212aa23 100644 --- a/flow/e2e/clickhouse_mysql_test.go +++ b/flow/e2e/clickhouse_mysql_test.go @@ -1521,6 +1521,8 @@ func (s ClickHouseSuite) Test_MySQL_PartialJSONUnsupported() { mapped, err := ctr.MappedPort(s.t.Context(), "3306/tcp") require.NoError(s.t, err) + host, err := ctr.Host(s.t.Context()) + require.NoError(s.t, err) port, err := strconv.Atoi(mapped.Port()) require.NoError(s.t, err) @@ -1531,7 +1533,7 @@ func (s ClickHouseSuite) Test_MySQL_PartialJSONUnsupported() { suffix := "mypjson_" + strings.ToLower(common.RandomString(8)) config := &protos.MySqlConfig{ - Host: internal.MySQLTestHost(), + Host: host, Port: uint32(port), User: "root", Password: internal.MySQLTestRootPasswordWithFallback("cipass"), From ebbb31b1b7cfb0cf444c2622ff806210ab1cceb7 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Fri, 19 Jun 2026 11:39:06 -0700 Subject: [PATCH 5/6] 813 wait for mysql debug readiness --- flow/e2e/clickhouse_mysql_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flow/e2e/clickhouse_mysql_test.go b/flow/e2e/clickhouse_mysql_test.go index 82212aa23..7604e8d5e 100644 --- a/flow/e2e/clickhouse_mysql_test.go +++ b/flow/e2e/clickhouse_mysql_test.go @@ -1507,7 +1507,10 @@ func (s ClickHouseSuite) Test_MySQL_PartialJSONUnsupported() { "--innodb-log-buffer-size=8M", }, ExposedPorts: []string{"3306/tcp"}, - WaitingFor: wait.ForListeningPort("3306/tcp").WithStartupTimeout(3 * time.Minute), + WaitingFor: wait.ForAll( + wait.ForLog("ready for connections").WithOccurrence(2).WithStartupTimeout(3*time.Minute), + wait.ForListeningPort("3306/tcp").WithStartupTimeout(3*time.Minute), + ), } ctr, err := testcontainers.GenericContainer(s.t.Context(), testcontainers.GenericContainerRequest{ From 747d2f2ee15b0e976ccbe8181d4a5b246f13ac86 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Fri, 19 Jun 2026 22:02:13 -0700 Subject: [PATCH 6/6] 813 use reachable host for partial JSON test --- flow/e2e/clickhouse_mysql_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/flow/e2e/clickhouse_mysql_test.go b/flow/e2e/clickhouse_mysql_test.go index 7604e8d5e..90b0b90ad 100644 --- a/flow/e2e/clickhouse_mysql_test.go +++ b/flow/e2e/clickhouse_mysql_test.go @@ -1524,8 +1524,6 @@ func (s ClickHouseSuite) Test_MySQL_PartialJSONUnsupported() { mapped, err := ctr.MappedPort(s.t.Context(), "3306/tcp") require.NoError(s.t, err) - host, err := ctr.Host(s.t.Context()) - require.NoError(s.t, err) port, err := strconv.Atoi(mapped.Port()) require.NoError(s.t, err) @@ -1536,7 +1534,7 @@ func (s ClickHouseSuite) Test_MySQL_PartialJSONUnsupported() { suffix := "mypjson_" + strings.ToLower(common.RandomString(8)) config := &protos.MySqlConfig{ - Host: host, + Host: internal.MySQLTestHost(), Port: uint32(port), User: "root", Password: internal.MySQLTestRootPasswordWithFallback("cipass"),