From f6adeafa23fd65b00488b004fba59d226092dddd Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Thu, 18 Jun 2026 22:10:33 -0700 Subject: [PATCH 1/3] [WIP] 817 SET columns on no-metadata servers (bitmask vs labels) From f75af519299d87bf2cf9ebf84a7d8dda82abd608 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Thu, 18 Jun 2026 22:20:50 -0700 Subject: [PATCH 2/3] 817 add numeric SET mapping for old MySQL --- flow/connectors/bigquery/qvalue_convert.go | 2 +- flow/connectors/mysql/qrep.go | 2 +- flow/connectors/mysql/qrep_test.go | 9 ++ flow/connectors/mysql/qvalue_convert.go | 8 ++ flow/connectors/mysql/qvalue_convert_test.go | 59 ++++++++++ flow/connectors/mysql/type_conversion.go | 7 +- flow/connectors/postgres/qvalue_convert.go | 2 +- flow/connectors/utils/cdc_store.go | 1 + flow/e2e/clickhouse_mysql_test.go | 109 +++++++++++++++++++ flow/model/qrecord_avro_size_test.go | 12 ++ flow/model/qrecord_copy_from_source.go | 2 + flow/model/qvalue/avro_converter.go | 4 +- flow/model/qvalue/avro_converter_test.go | 1 + flow/model/qvalue/equals.go | 5 + flow/pua/peerdb.go | 15 +++ flow/shared/constants.go | 2 + flow/shared/types/kind.go | 3 + flow/shared/types/qvalue.go | 16 +++ 18 files changed, 254 insertions(+), 5 deletions(-) diff --git a/flow/connectors/bigquery/qvalue_convert.go b/flow/connectors/bigquery/qvalue_convert.go index 85fe6cc8e..14496cda7 100644 --- a/flow/connectors/bigquery/qvalue_convert.go +++ b/flow/connectors/bigquery/qvalue_convert.go @@ -22,7 +22,7 @@ func qValueKindToBigQueryType(columnDescription *protos.FieldDescription, nullab // integer types case types.QValueKindInt8, types.QValueKindInt16, types.QValueKindInt32, types.QValueKindInt64, types.QValueKindUInt8, types.QValueKindUInt16, types.QValueKindUInt32, types.QValueKindUInt64, - types.QValueKindUint16Enum: + types.QValueKindUint16Enum, types.QValueKindUint64Set: bqField.Type = bigquery.IntegerFieldType // decimal types case types.QValueKindFloat32, types.QValueKindFloat64: diff --git a/flow/connectors/mysql/qrep.go b/flow/connectors/mysql/qrep.go index bb841b80f..929fc6085 100644 --- a/flow/connectors/mysql/qrep.go +++ b/flow/connectors/mysql/qrep.go @@ -187,7 +187,7 @@ func buildSelectedColumns(cols []*protos.FieldDescription, exclude []string) str } converted := common.QuoteMySQLIdentifier(col.Name) - if col.Type == string(types.QValueKindUint16Enum) { + if col.Type == string(types.QValueKindUint16Enum) || col.Type == string(types.QValueKindUint64Set) { converted = fmt.Sprintf("CAST(%s AS UNSIGNED) AS %s", converted, converted) selectAsterisk = false } diff --git a/flow/connectors/mysql/qrep_test.go b/flow/connectors/mysql/qrep_test.go index 206dea6b7..0b413eb10 100644 --- a/flow/connectors/mysql/qrep_test.go +++ b/flow/connectors/mysql/qrep_test.go @@ -42,6 +42,15 @@ func TestBuildSelectedColumns(t *testing.T) { exclude: []string{}, expectedSelectedColumns: "`id`, CAST(`status` AS UNSIGNED) AS `status`", }, + { + name: "uint64set column is cast to unsigned", + cols: []*protos.FieldDescription{ + {Name: "id", Type: string(types.QValueKindInt32)}, + {Name: "perms", Type: string(types.QValueKindUint64Set)}, + }, + exclude: []string{}, + expectedSelectedColumns: "`id`, CAST(`perms` AS UNSIGNED) AS `perms`", + }, { name: "string enum column is not cast", cols: []*protos.FieldDescription{ diff --git a/flow/connectors/mysql/qvalue_convert.go b/flow/connectors/mysql/qvalue_convert.go index 601b918cd..070282104 100644 --- a/flow/connectors/mysql/qvalue_convert.go +++ b/flow/connectors/mysql/qvalue_convert.go @@ -219,6 +219,8 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie switch qkind { case types.QValueKindUint16Enum: return types.QValueUint16Enum{Val: uint16(v)}, nil + case types.QValueKindUint64Set: + return types.QValueUint64Set{Val: v}, nil case types.QValueKindBoolean: return types.QValueBoolean{Val: v != 0}, nil case types.QValueKindInt8: @@ -245,6 +247,8 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie switch qkind { case types.QValueKindBoolean: return types.QValueBoolean{Val: v != 0}, nil + case types.QValueKindUint64Set: + return types.QValueUint64Set{Val: uint64(v)}, nil case types.QValueKindInt8: return types.QValueInt8{Val: int8(v)}, nil case types.QValueKindInt16: @@ -443,6 +447,8 @@ func QValueFromMysqlRowEvent( switch qkind { case types.QValueKindUInt64: return types.QValueUInt64{Val: uint64(val)}, nil + case types.QValueKindUint64Set: + return types.QValueUint64Set{Val: uint64(val)}, nil case types.QValueKindInt64: return types.QValueInt64{Val: val}, nil case types.QValueKindString: // set @@ -477,6 +483,8 @@ func QValueFromMysqlRowEvent( switch qkind { case types.QValueKindInt64: return types.QValueInt64{Val: int64(val)}, nil + case types.QValueKindUint64Set: + return types.QValueUint64Set{Val: val}, nil case types.QValueKindString: return types.QValueString{Val: strconv.FormatUint(val, 10)}, nil default: diff --git a/flow/connectors/mysql/qvalue_convert_test.go b/flow/connectors/mysql/qvalue_convert_test.go index 6d6d2fca9..26067a4d3 100644 --- a/flow/connectors/mysql/qvalue_convert_test.go +++ b/flow/connectors/mysql/qvalue_convert_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" "github.com/stretchr/testify/require" "github.com/PeerDB-io/peerdb/flow/shared" @@ -38,6 +39,64 @@ func TestQkindFromMysqlType_Bit(t *testing.T) { } } +func TestQkindFromMysqlColumnType_Set(t *testing.T) { + for _, tc := range []struct { + name string + binlogMetadataSupported bool + version uint32 + want types.QValueKind + }{ + { + name: "old version without metadata maps SET to string", + binlogMetadataSupported: false, + version: shared.InternalVersion_MySQL5ConvertSetsToInts - 1, + want: types.QValueKindString, + }, + { + name: "gating version without metadata maps SET to Uint64Set", + binlogMetadataSupported: false, + version: shared.InternalVersion_MySQL5ConvertSetsToInts, + want: types.QValueKindUint64Set, + }, + { + name: "latest version without metadata maps SET to Uint64Set", + binlogMetadataSupported: false, + version: shared.InternalVersion_Latest, + want: types.QValueKindUint64Set, + }, + { + name: "metadata-supported server keeps SET as string labels", + binlogMetadataSupported: true, + version: shared.InternalVersion_Latest, + want: types.QValueKindString, + }, + } { + t.Run(tc.name, func(t *testing.T) { + qkind, err := QkindFromMysqlColumnType("set('a','b','c')", tc.binlogMetadataSupported, tc.version) + require.NoError(t, err) + require.Equal(t, tc.want, qkind) + }) + } +} + +func TestQValueFromMysqlFieldValue_Uint64Set(t *testing.T) { + fieldValue := mysql.NewFieldValue(mysql.FieldValueTypeUnsigned, 5, nil) + + qv, err := QValueFromMysqlFieldValue(types.QValueKindUint64Set, mysql.MYSQL_TYPE_SET, fieldValue) + require.NoError(t, err) + require.Equal(t, types.QValueUint64Set{Val: 5}, qv) +} + +func TestQValueFromMysqlRowEvent_Uint64Set(t *testing.T) { + tableMap := &replication.TableMapEvent{ColumnType: []byte{mysql.MYSQL_TYPE_SET}} + coercionReported := false + + qv, err := QValueFromMysqlRowEvent( + tableMap, 0, nil, nil, types.QValueKindUint64Set, int64(5), nil, &coercionReported) + require.NoError(t, err) + require.Equal(t, types.QValueUint64Set{Val: 5}, qv) +} + func TestProcessTime(t *testing.T) { epoch := time.Unix(0, 0).UTC() for _, ts := range []struct { diff --git a/flow/connectors/mysql/type_conversion.go b/flow/connectors/mysql/type_conversion.go index 60384fd22..9c458525d 100644 --- a/flow/connectors/mysql/type_conversion.go +++ b/flow/connectors/mysql/type_conversion.go @@ -17,7 +17,12 @@ func QkindFromMysqlColumnType(ct string, binlogRowMetadataSupported bool, versio switch strings.ToLower(ct) { case "json": return types.QValueKindJSON, nil - case "char", "varchar", "text", "set", "tinytext", "mediumtext", "longtext": + case "char", "varchar", "text", "tinytext", "mediumtext", "longtext": + return types.QValueKindString, nil + case "set": + if !binlogRowMetadataSupported && version >= shared.InternalVersion_MySQL5ConvertSetsToInts { + return types.QValueKindUint64Set, nil + } return types.QValueKindString, nil case "enum": if !binlogRowMetadataSupported && version >= shared.InternalVersion_MySQL5ConvertEnumsToInts { diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 9f26a84c3..ca75f4b9c 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -51,7 +51,7 @@ func qValueKindToPostgresType(colTypeStr string) string { return "SMALLINT" case types.QValueKindInt32, types.QValueKindUInt32: return "INTEGER" - case types.QValueKindInt64, types.QValueKindUInt64: + case types.QValueKindInt64, types.QValueKindUInt64, types.QValueKindUint64Set: return "BIGINT" case types.QValueKindUint16Enum: return "INTEGER" diff --git a/flow/connectors/utils/cdc_store.go b/flow/connectors/utils/cdc_store.go index 5ea706064..825ca9990 100644 --- a/flow/connectors/utils/cdc_store.go +++ b/flow/connectors/utils/cdc_store.go @@ -99,6 +99,7 @@ func init() { gob.Register(types.QValueString{}) gob.Register(types.QValueEnum{}) gob.Register(types.QValueUint16Enum{}) + gob.Register(types.QValueUint64Set{}) gob.Register(types.QValueTimestamp{}) gob.Register(types.QValueTimestampTZ{}) gob.Register(types.QValueDate{}) diff --git a/flow/e2e/clickhouse_mysql_test.go b/flow/e2e/clickhouse_mysql_test.go index 727d50d29..2aec48bb9 100644 --- a/flow/e2e/clickhouse_mysql_test.go +++ b/flow/e2e/clickhouse_mysql_test.go @@ -592,6 +592,115 @@ func (s ClickHouseSuite) Test_MySQL_Enum_Consistency_Version0() { RequireEnvCanceled(s.t, env) } +func (s ClickHouseSuite) Test_MySQL_Set_Consistency() { + if _, ok := s.source.(*MySqlSource); !ok { + s.t.Skip("only applies to mysql") + } + + srcTableName := "test_my_set_consistency" + srcFullName := s.attachSchemaSuffix(srcTableName) + dstTableName := "test_my_set_consistency_dst" + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + perms SET('a', 'b', 'c') NOT NULL + ) + `, srcFullName))) + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf( + `INSERT INTO %s (perms) VALUES ('a,c')`, srcFullName))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix(srcTableName), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + + tc := NewTemporalClient(s.t) + env := ExecutePeerflow(s.t, tc, flowConnConfig) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + EnvWaitForCount(env, s, "waiting on snapshot", dstTableName, "id,perms", 1) + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf( + `INSERT INTO %s (perms) VALUES ('a,c')`, srcFullName))) + + EnvWaitForCount(env, s, "waiting on cdc", dstTableName, "id,perms", 2) + + rows, err := s.GetRows(dstTableName, "id,perms") + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 2) + require.Equal(s.t, rows.Records[0][1].Value(), rows.Records[1][1].Value(), + "snapshot and CDC SET values should be consistent") + if mysqlEnumUsesOrdinals() { + require.EqualValues(s.t, 5, rows.Records[0][1].Value()) + } else { + require.Equal(s.t, "a,c", rows.Records[0][1].Value()) + } + + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + +func (s ClickHouseSuite) Test_MySQL_Set_Consistency_Version0() { + if _, ok := s.source.(*MySqlSource); !ok { + s.t.Skip("only applies to mysql") + } + + srcTableName := "test_my_set_consistency_v0" + srcFullName := s.attachSchemaSuffix(srcTableName) + dstTableName := "test_my_set_consistency_v0_dst" + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + perms SET('a', 'b', 'c') NOT NULL + ) + `, srcFullName))) + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf( + `INSERT INTO %s (perms) VALUES ('a,c')`, srcFullName))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix(srcTableName), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + flowConnConfig.Env = map[string]string{"PEERDB_FORCE_INTERNAL_VERSION": strconv.FormatUint(uint64(shared.InternalVersion_First), 10)} + flowConnConfig.Version = shared.InternalVersion_First + + tc := NewTemporalClient(s.t) + env := ExecutePeerflow(s.t, tc, flowConnConfig) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + EnvWaitForCount(env, s, "waiting on snapshot", dstTableName, "id,perms", 1) + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf( + `INSERT INTO %s (perms) VALUES ('a,c')`, srcFullName))) + + EnvWaitForCount(env, s, "waiting on cdc", dstTableName, "id,perms", 2) + + rows, err := s.GetRows(dstTableName, "id,perms") + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 2) + require.EqualValues(s.t, 1, rows.Records[0][0].Value()) + require.EqualValues(s.t, 2, rows.Records[1][0].Value()) + require.Equal(s.t, "a,c", rows.Records[0][1].Value()) + if mysqlEnumUsesOrdinals() { + require.Equal(s.t, "5", rows.Records[1][1].Value()) + } else { + require.Equal(s.t, "a,c", rows.Records[1][1].Value()) + } + + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + func (s ClickHouseSuite) Test_MySQL_Vector() { mysource, ok := s.source.(*MySqlSource) if !ok || mysource.Config.Flavor != protos.MySqlFlavor_MYSQL_MYSQL { diff --git a/flow/model/qrecord_avro_size_test.go b/flow/model/qrecord_avro_size_test.go index 4a2e61dfc..71209e0a1 100644 --- a/flow/model/qrecord_avro_size_test.go +++ b/flow/model/qrecord_avro_size_test.go @@ -275,6 +275,18 @@ func TestAvroSizeComputation(t *testing.T) { return types.QValueUint16Enum{Val: uint16(rand.IntN(65536))} }, }, + { + name: "uint64set", + kind: types.QValueKindUint64Set, + numRecords: 10_000, + env: map[string]string{ + "PEERDB_CLICKHOUSE_BINARY_FORMAT": "raw", + "PEERDB_CLICKHOUSE_UNBOUNDED_NUMERIC_AS_STRING": "false", + }, + genValue: func() types.QValue { + return types.QValueUint64Set{Val: rand.Uint64()} + }, + }, { name: "timestamp", kind: types.QValueKindTimestamp, diff --git a/flow/model/qrecord_copy_from_source.go b/flow/model/qrecord_copy_from_source.go index d89b03467..ff05398ac 100644 --- a/flow/model/qrecord_copy_from_source.go +++ b/flow/model/qrecord_copy_from_source.go @@ -94,6 +94,8 @@ func (src *QRecordCopyFromSource) Values() ([]any, error) { values[i] = v.Val case types.QValueUint16Enum: values[i] = v.Val + case types.QValueUint64Set: + values[i] = v.Val case types.QValueCIDR: values[i] = v.Val case types.QValueINET: diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index b3322b14d..bd0d54b27 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -76,7 +76,7 @@ func GetAvroSchemaFromQValueKind( return avro.NewPrimitiveSchema(avro.String, nil), nil case types.QValueKindInt8, types.QValueKindInt16, types.QValueKindInt32, types.QValueKindInt64, types.QValueKindUInt8, types.QValueKindUInt16, types.QValueKindUInt32, types.QValueKindUInt64, - types.QValueKindUint16Enum: + types.QValueKindUint16Enum, types.QValueKindUint64Set: return avro.NewPrimitiveSchema(avro.Long, nil), nil case types.QValueKindFloat32: if targetDWH == protos.DBType_BIGQUERY { @@ -294,6 +294,8 @@ func QValueToAvro( return c.processNullableUnion(int64(v.Val)), varIntSize(int64(v.Val), sizeOpt), nil case types.QValueUint16Enum: return c.processNullableUnion(int64(v.Val)), varIntSize(int64(v.Val), sizeOpt), nil + case types.QValueUint64Set: + return c.processNullableUnion(int64(v.Val)), varIntSize(int64(v.Val), sizeOpt), nil case types.QValueUInt32: return c.processNullableUnion(int64(v.Val)), varIntSize(int64(v.Val), sizeOpt), nil case types.QValueUInt64: diff --git a/flow/model/qvalue/avro_converter_test.go b/flow/model/qvalue/avro_converter_test.go index 5e78e6cee..9a50e5222 100644 --- a/flow/model/qvalue/avro_converter_test.go +++ b/flow/model/qvalue/avro_converter_test.go @@ -109,6 +109,7 @@ func TestAvroQValueSize(t *testing.T) { types.QValueUInt16{Val: 5000}, types.QValueUInt32{Val: 100000}, types.QValueUInt64{Val: 1000000000}, + types.QValueUint64Set{Val: 5}, // String types (varint length + bytes) types.QValueString{Val: ""}, diff --git a/flow/model/qvalue/equals.go b/flow/model/qvalue/equals.go index bd1ccd21d..40c98c765 100644 --- a/flow/model/qvalue/equals.go +++ b/flow/model/qvalue/equals.go @@ -82,6 +82,11 @@ func Equals(qv types.QValue, other types.QValue) bool { return q.Val == otherVal.Val } return false + case types.QValueUint64Set: + if otherVal, ok := other.(types.QValueUint64Set); ok { + return q.Val == otherVal.Val + } + return false case types.QValueINET: return compareString(q.Val, otherValue) case types.QValueCIDR: diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go index 146abdf14..f1097023b 100644 --- a/flow/pua/peerdb.go +++ b/flow/pua/peerdb.go @@ -267,6 +267,21 @@ func LuaRowNewIndex(ls *lua.LState) int { newqv = types.QValueEnum{Val: lua.LVAsString(val)} case types.QValueKindUint16Enum: newqv = types.QValueUint16Enum{Val: uint16(lua.LVAsNumber(val))} + case types.QValueKindUint64Set: + switch v := val.(type) { + case lua.LNumber: + newqv = types.QValueUint64Set{Val: uint64(v)} + case *lua.LUserData: + switch i64 := v.Value.(type) { + case int64: + newqv = types.QValueUint64Set{Val: uint64(i64)} + case uint64: + newqv = types.QValueUint64Set{Val: i64} + } + } + if newqv == nil { + ls.RaiseError("invalid uint64set") + } case types.QValueKindTimestamp: newqv = types.QValueTimestamp{Val: LVAsTime(ls, val)} case types.QValueKindTimestampTZ: diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 0c4bf8315..bbe85d0fb 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -39,6 +39,8 @@ const ( InternalVersion_MySQL5ConvertEnumsToInts // MySQL: convert BIT to UInt64 InternalVersion_MySQLConvertBitToUInt64 + // MySQL: convert SET to integers for older versions without binlog row metadata support + InternalVersion_MySQL5ConvertSetsToInts TotalNumberOfInternalVersions InternalVersion_Latest = TotalNumberOfInternalVersions - 1 diff --git a/flow/shared/types/kind.go b/flow/shared/types/kind.go index 76a184dad..a82548a73 100644 --- a/flow/shared/types/kind.go +++ b/flow/shared/types/kind.go @@ -25,6 +25,7 @@ const ( QValueKindString QValueKind = "string" QValueKindEnum QValueKind = "enum" QValueKindUint16Enum QValueKind = "uint16enum" + QValueKindUint64Set QValueKind = "uint64set" QValueKindTimestamp QValueKind = "timestamp" QValueKindTimestampTZ QValueKind = "timestamptz" QValueKindDate QValueKind = "date" @@ -85,6 +86,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindString: "STRING", QValueKindEnum: "STRING", QValueKindUint16Enum: "INTEGER", + QValueKindUint64Set: "INTEGER", QValueKindJSON: "VARIANT", QValueKindJSONB: "VARIANT", QValueKindTimestamp: "TIMESTAMP_NTZ", @@ -138,6 +140,7 @@ var QValueKindToClickHouseTypeMap = map[QValueKind]string{ QValueKindString: "String", QValueKindEnum: "LowCardinality(String)", QValueKindUint16Enum: "UInt16", + QValueKindUint64Set: "UInt64", QValueKindJSON: "String", QValueKindTimestamp: "DateTime64(6)", QValueKindTimestampTZ: "DateTime64(6)", diff --git a/flow/shared/types/qvalue.go b/flow/shared/types/qvalue.go index 50d5a8e93..93cb034b9 100644 --- a/flow/shared/types/qvalue.go +++ b/flow/shared/types/qvalue.go @@ -321,6 +321,22 @@ func (v QValueUint16Enum) LValue(ls *lua.LState) lua.LValue { return lua.LNumber(v.Val) } +type QValueUint64Set struct { + Val uint64 +} + +func (QValueUint64Set) Kind() QValueKind { + return QValueKindUint64Set +} + +func (v QValueUint64Set) Value() any { + return v.Val +} + +func (v QValueUint64Set) LValue(ls *lua.LState) lua.LValue { + return glua64.U64.New(ls, v.Val) +} + type QValueTimestamp struct { Val time.Time } From 1fb761f889c26cb9894db9629c6913158753a549 Mon Sep 17 00:00:00 2001 From: Ilia Demianenko Date: Fri, 19 Jun 2026 09:11:50 -0700 Subject: [PATCH 3/3] 817 cover full-width SET bitmasks --- flow/connectors/mysql/qvalue_convert_test.go | 32 +++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/flow/connectors/mysql/qvalue_convert_test.go b/flow/connectors/mysql/qvalue_convert_test.go index 26067a4d3..720b08545 100644 --- a/flow/connectors/mysql/qvalue_convert_test.go +++ b/flow/connectors/mysql/qvalue_convert_test.go @@ -80,21 +80,37 @@ func TestQkindFromMysqlColumnType_Set(t *testing.T) { } func TestQValueFromMysqlFieldValue_Uint64Set(t *testing.T) { - fieldValue := mysql.NewFieldValue(mysql.FieldValueTypeUnsigned, 5, nil) + const maxUint64 = ^uint64(0) - qv, err := QValueFromMysqlFieldValue(types.QValueKindUint64Set, mysql.MYSQL_TYPE_SET, fieldValue) - require.NoError(t, err) - require.Equal(t, types.QValueUint64Set{Val: 5}, qv) + for _, value := range []uint64{5, maxUint64} { + fieldValue := mysql.NewFieldValue(mysql.FieldValueTypeUnsigned, value, nil) + + qv, err := QValueFromMysqlFieldValue(types.QValueKindUint64Set, mysql.MYSQL_TYPE_SET, fieldValue) + require.NoError(t, err) + require.Equal(t, types.QValueUint64Set{Val: value}, qv) + } } func TestQValueFromMysqlRowEvent_Uint64Set(t *testing.T) { tableMap := &replication.TableMapEvent{ColumnType: []byte{mysql.MYSQL_TYPE_SET}} coercionReported := false - qv, err := QValueFromMysqlRowEvent( - tableMap, 0, nil, nil, types.QValueKindUint64Set, int64(5), nil, &coercionReported) - require.NoError(t, err) - require.Equal(t, types.QValueUint64Set{Val: 5}, qv) + for _, tc := range []struct { + name string + input any + want uint64 + }{ + {name: "small signed bitmask", input: int64(5), want: 5}, + {name: "max signed bitmask", input: int64(-1), want: ^uint64(0)}, + {name: "max unsigned bitmask", input: ^uint64(0), want: ^uint64(0)}, + } { + t.Run(tc.name, func(t *testing.T) { + qv, err := QValueFromMysqlRowEvent( + tableMap, 0, nil, nil, types.QValueKindUint64Set, tc.input, nil, &coercionReported) + require.NoError(t, err) + require.Equal(t, types.QValueUint64Set{Val: tc.want}, qv) + }) + } } func TestProcessTime(t *testing.T) {