Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions flow/alerting/classifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ const (
// go-geos library when a LinearRing's points do not close. Used to give a more specific code
// once we already know the error came from MySQL geometry parsing.
mysqlGeometryLinearRingNotClosedError = "Points of LinearRing do not form a closed linestring"

// clickHouseCannotConvertNullToNonNullable is the message ClickHouse raises (error code 349,
// CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN) when a NULL value is inserted into a non-Nullable
// column. By the time this surfaces from a CDC failure the underlying *clickhouse.Exception
// type is usually stripped (Temporal serializes it to a plain string), so we match on the
// message. PeerDB always creates Nullable destination columns, so this only happens through a
// user-defined MV/view that casts a nullable source column to a non-Nullable type.
clickHouseCannotConvertNullToNonNullable = "Cannot convert NULL value to non-Nullable type"
)

var (
Expand Down Expand Up @@ -468,6 +476,17 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
}
}

// A NULL value being inserted into a non-Nullable ClickHouse column (error code 349). This
// reaches us as a serialized string with the *clickhouse.Exception type stripped, so the typed
// switch below never sees it; match on the message instead. See the constant for why this is
// always an MV/view problem.
if strings.Contains(err.Error(), clickHouseCannotConvertNullToNonNullable) {
return ErrorNotifyMVOrView, ErrorInfo{
Source: ErrorSourceClickHouse,
Code: strconv.Itoa(int(chproto.ErrCannotInsertNullInOrdinaryColumn)),
}
}

if temporalErr, ok := errors.AsType[*temporal.ApplicationError](err); ok {
switch exceptions.ApplicationErrorType(temporalErr.Type()) {
case exceptions.ApplicationErrorTypeIrrecoverableInvalidSnapshot:
Expand Down Expand Up @@ -944,6 +963,8 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
if _, ok := errors.AsType[*peerdb_clickhouse.ViewError](err); ok {
return ErrorNotifyMVOrView, chErrorInfo
}
case chproto.ErrCannotInsertNullInOrdinaryColumn:
return ErrorNotifyMVOrView, chErrorInfo
case chproto.ErrMemoryLimitExceeded:
return ErrorNotifyOOM, chErrorInfo
case chproto.ErrUnknownDatabase,
Expand Down
32 changes: 32 additions & 0 deletions flow/alerting/classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,38 @@ func TestNonClassifiedNonNormalizeErrorShouldBeOtherWithSourceClickHouse(t *test
}, errInfo, "Unexpected error info")
}

func TestCannotInsertNullInOrdinaryColumnShouldBeNotifyMV(t *testing.T) {
// A nullable source column cast to a non-Nullable type by a user MV/view, e.g.
// "code: 349, Cannot convert NULL value to non-Nullable type ... while pushing to view ...".
// The typed *clickhouse.Exception path (e.g. when the cause is still typed at the activity
// level) must be classified by code.
err := &clickhouse.Exception{
Code: int32(chproto.ErrCannotInsertNullInOrdinaryColumn),
//nolint:lll
Message: "Cannot convert NULL value to non-Nullable type: while converting source column street to destination column street: while pushing to view cdc_user_api.stg_cdc_user_api__customer_address_mv (some-uuid-here)",
}
errorClass, errInfo := GetErrorClass(t.Context(), fmt.Errorf("failed to normalize records: %w", err))
assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class")
assert.Equal(t, ErrorInfo{
Source: ErrorSourceClickHouse,
Code: strconv.Itoa(int(chproto.ErrCannotInsertNullInOrdinaryColumn)),
}, errInfo, "Unexpected error info")
}

func TestCannotInsertNullInOrdinaryColumnSerializedShouldBeNotifyMV(t *testing.T) {
// The production case: by the time the error reaches the classifier the *clickhouse.Exception
// type has been stripped (Temporal serialized it to a plain string), so it must be classified
// by message. Without the message match this surfaces as ErrorOther / "UNKNOWN".
//nolint:lll
serialized := errors.New("failed to normalize records: ClickHouse view error: code: 349, message: Cannot convert NULL value to non-Nullable type: while pushing to view cdc_otc_api.stg_cdc_otc_api__otc_otc_trade_mv (some-uuid-here)")
errorClass, errInfo := GetErrorClass(t.Context(), serialized)
assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class")
assert.Equal(t, ErrorInfo{
Source: ErrorSourceClickHouse,
Code: strconv.Itoa(int(chproto.ErrCannotInsertNullInOrdinaryColumn)),
}, errInfo, "Unexpected error info")
}

func TestNumericTruncateOrOutOfRangeWarningShouldBeLossyConversion(t *testing.T) {
for code, err := range map[string]error{
"NUMERIC_TRUNCATED": exceptions.NewNumericTruncatedError(errors.New("testing numeric truncated warning"), "tableA1", "columnB2"),
Expand Down
Loading