diff --git a/flow/alerting/classifier.go b/flow/alerting/classifier.go index 9a6463af0..b161de4fa 100644 --- a/flow/alerting/classifier.go +++ b/flow/alerting/classifier.go @@ -49,6 +49,14 @@ const ( // go-geos library when a LinearRing's points do not close. Used to give a more specific code // once we already know the error came from MySQL geometry parsing. mysqlGeometryLinearRingNotClosedError = "Points of LinearRing do not form a closed linestring" + + // clickHouseCannotConvertNullToNonNullable is the message ClickHouse raises (error code 349, + // CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN) when a NULL value is inserted into a non-Nullable + // column. By the time this surfaces from a CDC failure the underlying *clickhouse.Exception + // type is usually stripped (Temporal serializes it to a plain string), so we match on the + // message. PeerDB always creates Nullable destination columns, so this only happens through a + // user-defined MV/view that casts a nullable source column to a non-Nullable type. + clickHouseCannotConvertNullToNonNullable = "Cannot convert NULL value to non-Nullable type" ) var ( @@ -468,6 +476,17 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) { } } + // A NULL value being inserted into a non-Nullable ClickHouse column (error code 349). This + // reaches us as a serialized string with the *clickhouse.Exception type stripped, so the typed + // switch below never sees it; match on the message instead. See the constant for why this is + // always an MV/view problem. + if strings.Contains(err.Error(), clickHouseCannotConvertNullToNonNullable) { + return ErrorNotifyMVOrView, ErrorInfo{ + Source: ErrorSourceClickHouse, + Code: strconv.Itoa(int(chproto.ErrCannotInsertNullInOrdinaryColumn)), + } + } + if temporalErr, ok := errors.AsType[*temporal.ApplicationError](err); ok { switch exceptions.ApplicationErrorType(temporalErr.Type()) { case exceptions.ApplicationErrorTypeIrrecoverableInvalidSnapshot: @@ -944,6 +963,8 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) { if _, ok := errors.AsType[*peerdb_clickhouse.ViewError](err); ok { return ErrorNotifyMVOrView, chErrorInfo } + case chproto.ErrCannotInsertNullInOrdinaryColumn: + return ErrorNotifyMVOrView, chErrorInfo case chproto.ErrMemoryLimitExceeded: return ErrorNotifyOOM, chErrorInfo case chproto.ErrUnknownDatabase, diff --git a/flow/alerting/classifier_test.go b/flow/alerting/classifier_test.go index db9781f25..1d95ce093 100644 --- a/flow/alerting/classifier_test.go +++ b/flow/alerting/classifier_test.go @@ -689,6 +689,38 @@ func TestNonClassifiedNonNormalizeErrorShouldBeOtherWithSourceClickHouse(t *test }, errInfo, "Unexpected error info") } +func TestCannotInsertNullInOrdinaryColumnShouldBeNotifyMV(t *testing.T) { + // A nullable source column cast to a non-Nullable type by a user MV/view, e.g. + // "code: 349, Cannot convert NULL value to non-Nullable type ... while pushing to view ...". + // The typed *clickhouse.Exception path (e.g. when the cause is still typed at the activity + // level) must be classified by code. + err := &clickhouse.Exception{ + Code: int32(chproto.ErrCannotInsertNullInOrdinaryColumn), + //nolint:lll + Message: "Cannot convert NULL value to non-Nullable type: while converting source column street to destination column street: while pushing to view cdc_user_api.stg_cdc_user_api__customer_address_mv (some-uuid-here)", + } + errorClass, errInfo := GetErrorClass(t.Context(), fmt.Errorf("failed to normalize records: %w", err)) + assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class") + assert.Equal(t, ErrorInfo{ + Source: ErrorSourceClickHouse, + Code: strconv.Itoa(int(chproto.ErrCannotInsertNullInOrdinaryColumn)), + }, errInfo, "Unexpected error info") +} + +func TestCannotInsertNullInOrdinaryColumnSerializedShouldBeNotifyMV(t *testing.T) { + // The production case: by the time the error reaches the classifier the *clickhouse.Exception + // type has been stripped (Temporal serialized it to a plain string), so it must be classified + // by message. Without the message match this surfaces as ErrorOther / "UNKNOWN". + //nolint:lll + serialized := errors.New("failed to normalize records: ClickHouse view error: code: 349, message: Cannot convert NULL value to non-Nullable type: while pushing to view cdc_otc_api.stg_cdc_otc_api__otc_otc_trade_mv (some-uuid-here)") + errorClass, errInfo := GetErrorClass(t.Context(), serialized) + assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class") + assert.Equal(t, ErrorInfo{ + Source: ErrorSourceClickHouse, + Code: strconv.Itoa(int(chproto.ErrCannotInsertNullInOrdinaryColumn)), + }, errInfo, "Unexpected error info") +} + func TestNumericTruncateOrOutOfRangeWarningShouldBeLossyConversion(t *testing.T) { for code, err := range map[string]error{ "NUMERIC_TRUNCATED": exceptions.NewNumericTruncatedError(errors.New("testing numeric truncated warning"), "tableA1", "columnB2"),