Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/sink/codec/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func SanitizeName(name string) string {

sanitizedName := sb.String()
if changed {
log.Warn(
log.Debug(
"Name is potentially not safe for serialization, replace it",
zap.String("name", name),
zap.String("replacedName", sanitizedName),
Expand Down
99 changes: 91 additions & 8 deletions pkg/sink/codec/debezium/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,57 @@ func (c *dbzCodec) writeDebeziumFieldValues(
continue
}
colx := model.GetColumnDataX(col, tableInfo)
err = c.writeDebeziumFieldValue(writer, colx, colInfos[i].Ft)
colFt := colInfos[i].Ft
err = c.writeDebeziumFieldValue(writer, colx, colFt)
if err != nil {
log.Error("write Debezium field value meet error", zap.Error(err))
colFlag := colx.GetFlag()
mysqlTypeName := types.TypeStr(colx.GetType())
if mysqlTypeName == "" {
mysqlTypeName = fmt.Sprintf("type-%d", colx.GetType())
}
var tidbTableID int64
if tableInfo.TableInfo != nil {
tidbTableID = tableInfo.TableInfo.ID
}
err = cerror.Annotatef(err,
"table=%s, tableID=%d, column=%s(%d), mysqlType=%s, columnValueType=%T, fieldName=%s, columnIndex=%d",
tableInfo.TableName.String(),
tableInfo.TableName.TableID,
colx.GetName(),
colx.ColumnID,
mysqlTypeName,
colx.Value,
fieldName,
i,
)
log.Error("write Debezium field value meet error",
zap.Error(err),
zap.String("qualifiedTable", tableInfo.TableName.String()),
zap.String("schema", tableInfo.GetSchemaName()),
zap.String("table", tableInfo.GetTableName()),
zap.Int64("tableID", tableInfo.TableName.TableID),
zap.Int64("tidbTableID", tidbTableID),
zap.Uint64("tableVersion", tableInfo.Version),
zap.Bool("isPartitionTable", tableInfo.IsPartitionTable()),
zap.String("fieldName", fieldName),
zap.Int("columnIndex", i),
zap.String("columnName", colx.GetName()),
zap.Int64("columnID", colx.ColumnID),
zap.String("columnMySQLType", mysqlTypeName),
zap.Uint8("columnTypeCode", colx.GetType()),
zap.Any("columnValue", colx.Value),
zap.String("columnValueType", fmt.Sprintf("%T", colx.Value)),
zap.Int("columnApproximateBytes", colx.ApproximateBytes),
zap.Bool("columnIsHandleKey", colFlag.IsHandleKey()),
zap.Bool("columnIsPrimaryKey", colFlag.IsPrimaryKey()),
zap.Bool("columnIsUniqueKey", colFlag.IsUniqueKey()),
zap.Bool("columnIsNullable", colFlag.IsNullable()),
zap.Bool("columnIsUnsigned", colFlag.IsUnsigned()),
zap.Bool("columnIsBinary", colFlag.IsBinary()),
zap.Any("columnInfo", colx.GetColumnInfo()),
zap.Any("columnFieldType", colFt),
zap.Any("rowColumnInfo", colInfos[i]),
Comment on lines +102 to +105
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While the additional context is very helpful for debugging, logging the full columnInfo, columnFieldType, and rowColumnInfo objects introduces significant verbosity and redundancy.

  • rowColumnInfo is redundant as its fields (ID and Ft) are already logged as columnID and columnFieldType.
  • columnInfo is a large struct that also contains FieldType, which is logged as columnFieldType.

To keep the log entries more manageable and focused, I suggest removing these zap.Any fields for the complex structs.

					zap.Bool("columnIsBinary", colFlag.IsBinary())

)
break
}
}
Expand Down Expand Up @@ -567,14 +615,33 @@ func (c *dbzCodec) writeDebeziumFieldValue(
return nil

case mysql.TypeEnum:
v, ok := value.(uint64)
if !ok {
var (
enumVar types.Enum
err error
)
switch v := value.(type) {
case uint64:
log.Info("debezium enum column value is uint64",
zap.String("column", col.GetName()),
zap.Uint64("value", v))
enumVar, err = types.ParseEnumValue(ft.GetElems(), v)
case string:
log.Info("debezium enum column value is string",
zap.String("column", col.GetName()),
zap.String("value", v))
enumVar, err = types.ParseEnumName(ft.GetElems(), v, ft.GetCollate())
case []byte:
stringVal := string(v)
log.Info("debezium enum column value is string bytes",
zap.String("column", col.GetName()),
zap.String("value", stringVal))
enumVar, err = types.ParseEnumName(ft.GetElems(), stringVal, ft.GetCollate())
default:
return cerror.ErrDebeziumEncodeFailed.GenWithStack(
"unexpected column value type %T for enum column %s",
value,
col.GetName())
}
enumVar, err := types.ParseEnumValue(ft.GetElems(), v)
if err != nil {
// Invalid enum value inserted in non-strict mode.
writer.WriteStringField(col.GetName(), "")
Expand All @@ -584,14 +651,30 @@ func (c *dbzCodec) writeDebeziumFieldValue(
return nil

case mysql.TypeSet:
v, ok := value.(uint64)
if !ok {
var (
setVar types.Set
err error
)
switch v := value.(type) {
case uint64:
setVar, err = types.ParseSetValue(ft.GetElems(), v)
case string:
log.Info("debezium set column value is string",
zap.String("column", col.GetName()),
zap.String("value", v))
setVar, err = types.ParseSetName(ft.GetElems(), v, ft.GetCollate())
case []byte:
stringVal := string(v)
log.Info("debezium set column value is string bytes",
zap.String("column", col.GetName()),
zap.String("value", stringVal))
setVar, err = types.ParseSetName(ft.GetElems(), stringVal, ft.GetCollate())
default:
return cerror.ErrDebeziumEncodeFailed.GenWithStack(
"unexpected column value type %T for set column %s",
value,
col.GetName())
}
setVar, err := types.ParseSetValue(ft.GetElems(), v)
if err != nil {
// Invalid enum value inserted in non-strict mode.
writer.WriteStringField(col.GetName(), "")
Expand Down
Loading