Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func qValueKindToBigQueryType(columnDescription *protos.FieldDescription, nullab
// integer types
case types.QValueKindInt8, types.QValueKindInt16, types.QValueKindInt32, types.QValueKindInt64,
types.QValueKindUInt8, types.QValueKindUInt16, types.QValueKindUInt32, types.QValueKindUInt64,
types.QValueKindUint16Enum:
types.QValueKindUint16Enum, types.QValueKindUint64Set:
bqField.Type = bigquery.IntegerFieldType
// decimal types
case types.QValueKindFloat32, types.QValueKindFloat64:
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/mysql/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func buildSelectedColumns(cols []*protos.FieldDescription, exclude []string) str
}

converted := common.QuoteMySQLIdentifier(col.Name)
if col.Type == string(types.QValueKindUint16Enum) {
if col.Type == string(types.QValueKindUint16Enum) || col.Type == string(types.QValueKindUint64Set) {
converted = fmt.Sprintf("CAST(%s AS UNSIGNED) AS %s", converted, converted)
selectAsterisk = false
}
Expand Down
9 changes: 9 additions & 0 deletions flow/connectors/mysql/qrep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ func TestBuildSelectedColumns(t *testing.T) {
exclude: []string{},
expectedSelectedColumns: "`id`, CAST(`status` AS UNSIGNED) AS `status`",
},
{
name: "uint64set column is cast to unsigned",
cols: []*protos.FieldDescription{
{Name: "id", Type: string(types.QValueKindInt32)},
{Name: "perms", Type: string(types.QValueKindUint64Set)},
},
exclude: []string{},
expectedSelectedColumns: "`id`, CAST(`perms` AS UNSIGNED) AS `perms`",
},
{
name: "string enum column is not cast",
cols: []*protos.FieldDescription{
Expand Down
8 changes: 8 additions & 0 deletions flow/connectors/mysql/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie
switch qkind {
case types.QValueKindUint16Enum:
return types.QValueUint16Enum{Val: uint16(v)}, nil
case types.QValueKindUint64Set:
return types.QValueUint64Set{Val: v}, nil
case types.QValueKindBoolean:
return types.QValueBoolean{Val: v != 0}, nil
case types.QValueKindInt8:
Expand All @@ -245,6 +247,8 @@ func QValueFromMysqlFieldValue(qkind types.QValueKind, mytype byte, fv mysql.Fie
switch qkind {
case types.QValueKindBoolean:
return types.QValueBoolean{Val: v != 0}, nil
case types.QValueKindUint64Set:
return types.QValueUint64Set{Val: uint64(v)}, nil
case types.QValueKindInt8:
return types.QValueInt8{Val: int8(v)}, nil
case types.QValueKindInt16:
Expand Down Expand Up @@ -443,6 +447,8 @@ func QValueFromMysqlRowEvent(
switch qkind {
case types.QValueKindUInt64:
return types.QValueUInt64{Val: uint64(val)}, nil
case types.QValueKindUint64Set:
return types.QValueUint64Set{Val: uint64(val)}, nil
case types.QValueKindInt64:
return types.QValueInt64{Val: val}, nil
case types.QValueKindString: // set
Expand Down Expand Up @@ -477,6 +483,8 @@ func QValueFromMysqlRowEvent(
switch qkind {
case types.QValueKindInt64:
return types.QValueInt64{Val: int64(val)}, nil
case types.QValueKindUint64Set:
return types.QValueUint64Set{Val: val}, nil
case types.QValueKindString:
return types.QValueString{Val: strconv.FormatUint(val, 10)}, nil
default:
Expand Down
75 changes: 75 additions & 0 deletions flow/connectors/mysql/qvalue_convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/stretchr/testify/require"

"github.com/PeerDB-io/peerdb/flow/shared"
Expand Down Expand Up @@ -38,6 +39,80 @@ func TestQkindFromMysqlType_Bit(t *testing.T) {
}
}

func TestQkindFromMysqlColumnType_Set(t *testing.T) {
for _, tc := range []struct {
name string
binlogMetadataSupported bool
version uint32
want types.QValueKind
}{
{
name: "old version without metadata maps SET to string",
binlogMetadataSupported: false,
version: shared.InternalVersion_MySQL5ConvertSetsToInts - 1,
want: types.QValueKindString,
},
{
name: "gating version without metadata maps SET to Uint64Set",
binlogMetadataSupported: false,
version: shared.InternalVersion_MySQL5ConvertSetsToInts,
want: types.QValueKindUint64Set,
},
{
name: "latest version without metadata maps SET to Uint64Set",
binlogMetadataSupported: false,
version: shared.InternalVersion_Latest,
want: types.QValueKindUint64Set,
},
{
name: "metadata-supported server keeps SET as string labels",
binlogMetadataSupported: true,
version: shared.InternalVersion_Latest,
want: types.QValueKindString,
},
} {
t.Run(tc.name, func(t *testing.T) {
qkind, err := QkindFromMysqlColumnType("set('a','b','c')", tc.binlogMetadataSupported, tc.version)
require.NoError(t, err)
require.Equal(t, tc.want, qkind)
})
}
}

func TestQValueFromMysqlFieldValue_Uint64Set(t *testing.T) {
const maxUint64 = ^uint64(0)

for _, value := range []uint64{5, maxUint64} {
fieldValue := mysql.NewFieldValue(mysql.FieldValueTypeUnsigned, value, nil)

qv, err := QValueFromMysqlFieldValue(types.QValueKindUint64Set, mysql.MYSQL_TYPE_SET, fieldValue)
require.NoError(t, err)
require.Equal(t, types.QValueUint64Set{Val: value}, qv)
}
}

func TestQValueFromMysqlRowEvent_Uint64Set(t *testing.T) {
tableMap := &replication.TableMapEvent{ColumnType: []byte{mysql.MYSQL_TYPE_SET}}
coercionReported := false

for _, tc := range []struct {
name string
input any
want uint64
}{
{name: "small signed bitmask", input: int64(5), want: 5},
{name: "max signed bitmask", input: int64(-1), want: ^uint64(0)},
{name: "max unsigned bitmask", input: ^uint64(0), want: ^uint64(0)},
} {
t.Run(tc.name, func(t *testing.T) {
qv, err := QValueFromMysqlRowEvent(
tableMap, 0, nil, nil, types.QValueKindUint64Set, tc.input, nil, &coercionReported)
require.NoError(t, err)
require.Equal(t, types.QValueUint64Set{Val: tc.want}, qv)
})
}
}

func TestProcessTime(t *testing.T) {
epoch := time.Unix(0, 0).UTC()
for _, ts := range []struct {
Expand Down
7 changes: 6 additions & 1 deletion flow/connectors/mysql/type_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ func QkindFromMysqlColumnType(ct string, binlogRowMetadataSupported bool, versio
switch strings.ToLower(ct) {
case "json":
return types.QValueKindJSON, nil
case "char", "varchar", "text", "set", "tinytext", "mediumtext", "longtext":
case "char", "varchar", "text", "tinytext", "mediumtext", "longtext":
return types.QValueKindString, nil
case "set":
if !binlogRowMetadataSupported && version >= shared.InternalVersion_MySQL5ConvertSetsToInts {
return types.QValueKindUint64Set, nil
}
return types.QValueKindString, nil
case "enum":
if !binlogRowMetadataSupported && version >= shared.InternalVersion_MySQL5ConvertEnumsToInts {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func qValueKindToPostgresType(colTypeStr string) string {
return "SMALLINT"
case types.QValueKindInt32, types.QValueKindUInt32:
return "INTEGER"
case types.QValueKindInt64, types.QValueKindUInt64:
case types.QValueKindInt64, types.QValueKindUInt64, types.QValueKindUint64Set:
return "BIGINT"
case types.QValueKindUint16Enum:
return "INTEGER"
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/utils/cdc_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func init() {
gob.Register(types.QValueString{})
gob.Register(types.QValueEnum{})
gob.Register(types.QValueUint16Enum{})
gob.Register(types.QValueUint64Set{})
gob.Register(types.QValueTimestamp{})
gob.Register(types.QValueTimestampTZ{})
gob.Register(types.QValueDate{})
Expand Down
109 changes: 109 additions & 0 deletions flow/e2e/clickhouse_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,115 @@ func (s ClickHouseSuite) Test_MySQL_Enum_Consistency_Version0() {
RequireEnvCanceled(s.t, env)
}

func (s ClickHouseSuite) Test_MySQL_Set_Consistency() {
if _, ok := s.source.(*MySqlSource); !ok {
s.t.Skip("only applies to mysql")
}

srcTableName := "test_my_set_consistency"
srcFullName := s.attachSchemaSuffix(srcTableName)
dstTableName := "test_my_set_consistency_dst"

require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
perms SET('a', 'b', 'c') NOT NULL
)
`, srcFullName)))

require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(
`INSERT INTO %s (perms) VALUES ('a,c')`, srcFullName)))

connectionGen := FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix(srcTableName),
TableNameMapping: map[string]string{srcFullName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true

tc := NewTemporalClient(s.t)
env := ExecutePeerflow(s.t, tc, flowConnConfig)
SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

EnvWaitForCount(env, s, "waiting on snapshot", dstTableName, "id,perms", 1)

require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(
`INSERT INTO %s (perms) VALUES ('a,c')`, srcFullName)))

EnvWaitForCount(env, s, "waiting on cdc", dstTableName, "id,perms", 2)

rows, err := s.GetRows(dstTableName, "id,perms")
require.NoError(s.t, err)
require.Len(s.t, rows.Records, 2)
require.Equal(s.t, rows.Records[0][1].Value(), rows.Records[1][1].Value(),
"snapshot and CDC SET values should be consistent")
if mysqlEnumUsesOrdinals() {
require.EqualValues(s.t, 5, rows.Records[0][1].Value())
} else {
require.Equal(s.t, "a,c", rows.Records[0][1].Value())
}

env.Cancel(s.t.Context())
RequireEnvCanceled(s.t, env)
}

func (s ClickHouseSuite) Test_MySQL_Set_Consistency_Version0() {
if _, ok := s.source.(*MySqlSource); !ok {
s.t.Skip("only applies to mysql")
}

srcTableName := "test_my_set_consistency_v0"
srcFullName := s.attachSchemaSuffix(srcTableName)
dstTableName := "test_my_set_consistency_v0_dst"

require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
perms SET('a', 'b', 'c') NOT NULL
)
`, srcFullName)))

require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(
`INSERT INTO %s (perms) VALUES ('a,c')`, srcFullName)))

connectionGen := FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix(srcTableName),
TableNameMapping: map[string]string{srcFullName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true
flowConnConfig.Env = map[string]string{"PEERDB_FORCE_INTERNAL_VERSION": strconv.FormatUint(uint64(shared.InternalVersion_First), 10)}
flowConnConfig.Version = shared.InternalVersion_First

tc := NewTemporalClient(s.t)
env := ExecutePeerflow(s.t, tc, flowConnConfig)
SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

EnvWaitForCount(env, s, "waiting on snapshot", dstTableName, "id,perms", 1)

require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(
`INSERT INTO %s (perms) VALUES ('a,c')`, srcFullName)))

EnvWaitForCount(env, s, "waiting on cdc", dstTableName, "id,perms", 2)

rows, err := s.GetRows(dstTableName, "id,perms")
require.NoError(s.t, err)
require.Len(s.t, rows.Records, 2)
require.EqualValues(s.t, 1, rows.Records[0][0].Value())
require.EqualValues(s.t, 2, rows.Records[1][0].Value())
require.Equal(s.t, "a,c", rows.Records[0][1].Value())
if mysqlEnumUsesOrdinals() {
require.Equal(s.t, "5", rows.Records[1][1].Value())
} else {
require.Equal(s.t, "a,c", rows.Records[1][1].Value())
}

env.Cancel(s.t.Context())
RequireEnvCanceled(s.t, env)
}

func (s ClickHouseSuite) Test_MySQL_Vector() {
mysource, ok := s.source.(*MySqlSource)
if !ok || mysource.Config.Flavor != protos.MySqlFlavor_MYSQL_MYSQL {
Expand Down
12 changes: 12 additions & 0 deletions flow/model/qrecord_avro_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,18 @@ func TestAvroSizeComputation(t *testing.T) {
return types.QValueUint16Enum{Val: uint16(rand.IntN(65536))}
},
},
{
name: "uint64set",
kind: types.QValueKindUint64Set,
numRecords: 10_000,
env: map[string]string{
"PEERDB_CLICKHOUSE_BINARY_FORMAT": "raw",
"PEERDB_CLICKHOUSE_UNBOUNDED_NUMERIC_AS_STRING": "false",
},
genValue: func() types.QValue {
return types.QValueUint64Set{Val: rand.Uint64()}
},
},
{
name: "timestamp",
kind: types.QValueKindTimestamp,
Expand Down
2 changes: 2 additions & 0 deletions flow/model/qrecord_copy_from_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func (src *QRecordCopyFromSource) Values() ([]any, error) {
values[i] = v.Val
case types.QValueUint16Enum:
values[i] = v.Val
case types.QValueUint64Set:
values[i] = v.Val
case types.QValueCIDR:
values[i] = v.Val
case types.QValueINET:
Expand Down
4 changes: 3 additions & 1 deletion flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func GetAvroSchemaFromQValueKind(
return avro.NewPrimitiveSchema(avro.String, nil), nil
case types.QValueKindInt8, types.QValueKindInt16, types.QValueKindInt32, types.QValueKindInt64,
types.QValueKindUInt8, types.QValueKindUInt16, types.QValueKindUInt32, types.QValueKindUInt64,
types.QValueKindUint16Enum:
types.QValueKindUint16Enum, types.QValueKindUint64Set:
return avro.NewPrimitiveSchema(avro.Long, nil), nil
case types.QValueKindFloat32:
if targetDWH == protos.DBType_BIGQUERY {
Expand Down Expand Up @@ -294,6 +294,8 @@ func QValueToAvro(
return c.processNullableUnion(int64(v.Val)), varIntSize(int64(v.Val), sizeOpt), nil
case types.QValueUint16Enum:
return c.processNullableUnion(int64(v.Val)), varIntSize(int64(v.Val), sizeOpt), nil
case types.QValueUint64Set:
return c.processNullableUnion(int64(v.Val)), varIntSize(int64(v.Val), sizeOpt), nil
case types.QValueUInt32:
return c.processNullableUnion(int64(v.Val)), varIntSize(int64(v.Val), sizeOpt), nil
case types.QValueUInt64:
Expand Down
1 change: 1 addition & 0 deletions flow/model/qvalue/avro_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func TestAvroQValueSize(t *testing.T) {
types.QValueUInt16{Val: 5000},
types.QValueUInt32{Val: 100000},
types.QValueUInt64{Val: 1000000000},
types.QValueUint64Set{Val: 5},

// String types (varint length + bytes)
types.QValueString{Val: ""},
Expand Down
5 changes: 5 additions & 0 deletions flow/model/qvalue/equals.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func Equals(qv types.QValue, other types.QValue) bool {
return q.Val == otherVal.Val
}
return false
case types.QValueUint64Set:
if otherVal, ok := other.(types.QValueUint64Set); ok {
return q.Val == otherVal.Val
}
return false
case types.QValueINET:
return compareString(q.Val, otherValue)
case types.QValueCIDR:
Expand Down
15 changes: 15 additions & 0 deletions flow/pua/peerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,21 @@ func LuaRowNewIndex(ls *lua.LState) int {
newqv = types.QValueEnum{Val: lua.LVAsString(val)}
case types.QValueKindUint16Enum:
newqv = types.QValueUint16Enum{Val: uint16(lua.LVAsNumber(val))}
case types.QValueKindUint64Set:
switch v := val.(type) {
case lua.LNumber:
newqv = types.QValueUint64Set{Val: uint64(v)}
case *lua.LUserData:
switch i64 := v.Value.(type) {
case int64:
newqv = types.QValueUint64Set{Val: uint64(i64)}
case uint64:
newqv = types.QValueUint64Set{Val: i64}
}
}
if newqv == nil {
ls.RaiseError("invalid uint64set")
}
case types.QValueKindTimestamp:
newqv = types.QValueTimestamp{Val: LVAsTime(ls, val)}
case types.QValueKindTimestampTZ:
Expand Down
Loading
Loading