-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-39392][table] Support conditional traits for PTFs #27886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
d62d1e4
ba5b471
1a62621
4acdc14
e8f776f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,7 +44,7 @@ This is useful when you need to materialize changelog events into a downstream s | |
|
|
||
| ```sql | ||
| SELECT * FROM TO_CHANGELOG( | ||
| input => TABLE source_table PARTITION BY key_col, | ||
| input => TABLE source_table [PARTITION BY key_col], | ||
| [op => DESCRIPTOR(op_column_name),] | ||
| [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]] | ||
| ) | ||
|
|
@@ -54,7 +54,7 @@ SELECT * FROM TO_CHANGELOG( | |
|
|
||
| | Parameter | Required | Description | | ||
| |:-------------|:---------|:------------| | ||
| | `input` | Yes | The input table. Must include `PARTITION BY` for parallel execution. Accepts insert-only, retract, and upsert tables. | | ||
| | `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located for parallel execution. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, providing `PARTITION BY` is recommended for better performance. | | ||
| | `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | | ||
| | `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. | | ||
|
|
||
|
|
@@ -99,6 +99,15 @@ SELECT * FROM TO_CHANGELOG( | |
| -- +I[id:2, op:'DELETE', name:'Bob', cnt:1] | ||
| ``` | ||
|
|
||
| #### Without PARTITION BY | ||
|
|
||
| ```sql | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's remove all PARTITION BY examples for now. A default TO_CHANGELOG example should always be without PARTITION BY. They do not provide any benefit but rather add exchange overhead. |
||
| -- Each row is processed independently, no key co-location | ||
| SELECT * FROM TO_CHANGELOG(input => TABLE my_table) | ||
|
|
||
| -- Output: +I[op:'INSERT', id:1, name:'Alice'] | ||
| ``` | ||
|
|
||
| #### Custom operation column name | ||
|
|
||
| ```sql | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,7 @@ | |
| import org.apache.flink.table.types.inference.InputTypeStrategies; | ||
| import org.apache.flink.table.types.inference.StaticArgument; | ||
| import org.apache.flink.table.types.inference.StaticArgumentTrait; | ||
| import org.apache.flink.table.types.inference.TraitCondition; | ||
| import org.apache.flink.table.types.inference.TypeStrategies; | ||
| import org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy; | ||
| import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies; | ||
|
|
@@ -783,14 +784,16 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) | |
| .kind(PROCESS_TABLE) | ||
| .staticArguments( | ||
| StaticArgument.table( | ||
| "input", | ||
| Row.class, | ||
| false, | ||
| EnumSet.of( | ||
| StaticArgumentTrait.TABLE, | ||
| StaticArgumentTrait.SET_SEMANTIC_TABLE, | ||
| StaticArgumentTrait.SUPPORT_UPDATES, | ||
| StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)), | ||
| "input", | ||
| Row.class, | ||
| false, | ||
| EnumSet.of( | ||
| StaticArgumentTrait.TABLE, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to make things explicit, I would add |
||
| StaticArgumentTrait.SUPPORT_UPDATES, | ||
| StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)) | ||
| .addTraitWhen( | ||
| TraitCondition.hasPartitionBy(), | ||
| StaticArgumentTrait.SET_SEMANTIC_TABLE), | ||
| StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true), | ||
| StaticArgument.scalar( | ||
| "op_mapping", | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -31,7 +31,11 @@ | |||||
|
|
||||||
| import javax.annotation.Nullable; | ||||||
|
|
||||||
| import java.io.Serializable; | ||||||
| import java.util.ArrayList; | ||||||
| import java.util.Collections; | ||||||
| import java.util.EnumSet; | ||||||
| import java.util.List; | ||||||
| import java.util.Objects; | ||||||
| import java.util.Optional; | ||||||
| import java.util.stream.Collectors; | ||||||
|
|
@@ -57,18 +61,58 @@ public class StaticArgument { | |||||
| private final @Nullable Class<?> conversionClass; | ||||||
| private final boolean isOptional; | ||||||
| private final EnumSet<StaticArgumentTrait> traits; | ||||||
| private final List<ConditionalTrait> conditionalTraits; | ||||||
|
|
||||||
| /** A trait that is conditionally added based on a {@link TraitCondition}. */ | ||||||
| private static final class ConditionalTrait implements Serializable { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: classes to the bottom of the file
Suggested change
|
||||||
| private final TraitCondition condition; | ||||||
| private final StaticArgumentTrait trait; | ||||||
|
|
||||||
| ConditionalTrait(final TraitCondition condition, final StaticArgumentTrait trait) { | ||||||
| this.condition = condition; | ||||||
| this.trait = trait; | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public boolean equals(final Object o) { | ||||||
| if (this == o) { | ||||||
| return true; | ||||||
| } | ||||||
| if (o == null || getClass() != o.getClass()) { | ||||||
| return false; | ||||||
| } | ||||||
| final ConditionalTrait that = (ConditionalTrait) o; | ||||||
| return Objects.equals(condition, that.condition) && trait == that.trait; | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public int hashCode() { | ||||||
| return Objects.hash(condition, trait); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| private StaticArgument( | ||||||
| String name, | ||||||
| @Nullable DataType dataType, | ||||||
| @Nullable Class<?> conversionClass, | ||||||
| boolean isOptional, | ||||||
| EnumSet<StaticArgumentTrait> traits) { | ||||||
| this(name, dataType, conversionClass, isOptional, traits, List.of()); | ||||||
| } | ||||||
|
|
||||||
| private StaticArgument( | ||||||
| String name, | ||||||
| @Nullable DataType dataType, | ||||||
| @Nullable Class<?> conversionClass, | ||||||
| boolean isOptional, | ||||||
| EnumSet<StaticArgumentTrait> traits, | ||||||
| List<ConditionalTrait> conditionalTraits) { | ||||||
| this.name = Preconditions.checkNotNull(name, "Name must not be null."); | ||||||
| this.dataType = dataType; | ||||||
| this.conversionClass = conversionClass; | ||||||
| this.isOptional = isOptional; | ||||||
| this.traits = Preconditions.checkNotNull(traits, "Traits must not be null."); | ||||||
| this.conditionalTraits = Collections.unmodifiableList(conditionalTraits); | ||||||
| checkName(); | ||||||
| checkTraits(traits); | ||||||
| checkOptionalType(); | ||||||
|
|
@@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) { | |||||
| return traits.contains(trait); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Context-aware trait check. Evaluates conditional trait rules against the given context to | ||||||
| * determine the effective traits. | ||||||
| */ | ||||||
| public boolean is(StaticArgumentTrait trait, TraitContext ctx) { | ||||||
| return resolveTraits(ctx).contains(trait); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Returns a new {@link StaticArgument} with an additional conditional trait rule. The trait is | ||||||
| * added to the effective trait set when the condition evaluates to {@code true} at planning | ||||||
| * time. | ||||||
| * | ||||||
| * <p>Example: | ||||||
| * | ||||||
| * <pre>{@code | ||||||
| * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, SUPPORT_UPDATES)) | ||||||
| * .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE) | ||||||
| * .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE); | ||||||
| * }</pre> | ||||||
| */ | ||||||
| public StaticArgument addTraitWhen( | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would swap the parameter order then: |
||||||
| final TraitCondition condition, final StaticArgumentTrait trait) { | ||||||
| final List<ConditionalTrait> newList = new ArrayList<>(this.conditionalTraits); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should limit the conditional traits. e.g. a scalar should not become a table out of a sudden. root changes are not allowed. this should limit the logic to tables currently, because it is the only trait with subtraits. |
||||||
| newList.add(new ConditionalTrait(condition, trait)); | ||||||
| return new StaticArgument(name, dataType, conversionClass, isOptional, traits, newList); | ||||||
| } | ||||||
|
|
||||||
| /** Whether this argument has conditional trait rules. */ | ||||||
| public boolean hasConditionalTraits() { | ||||||
| return !conditionalTraits.isEmpty(); | ||||||
| } | ||||||
|
|
||||||
| /** Whether any conditional trait rule may add the given trait. */ | ||||||
| public boolean hasConditionalTrait(final StaticArgumentTrait trait) { | ||||||
| return conditionalTraits.stream().anyMatch(ct -> ct.trait == trait); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Resolves effective traits by evaluating conditional rules against the context. Returns the | ||||||
| * base traits combined with any conditional traits whose conditions are met. | ||||||
| */ | ||||||
| public EnumSet<StaticArgumentTrait> resolveTraits(final TraitContext ctx) { | ||||||
| if (conditionalTraits.isEmpty()) { | ||||||
| return traits; | ||||||
| } | ||||||
| final EnumSet<StaticArgumentTrait> resolved = EnumSet.copyOf(traits); | ||||||
| for (final ConditionalTrait ct : conditionalTraits) { | ||||||
| if (ct.condition.test(ctx)) { | ||||||
| // ROW_SEMANTIC_TABLE and SET_SEMANTIC_TABLE are mutually exclusive. | ||||||
| // Adding one removes the other. | ||||||
| if (ct.trait == StaticArgumentTrait.SET_SEMANTIC_TABLE) { | ||||||
| resolved.remove(StaticArgumentTrait.ROW_SEMANTIC_TABLE); | ||||||
| } else if (ct.trait == StaticArgumentTrait.ROW_SEMANTIC_TABLE) { | ||||||
| resolved.remove(StaticArgumentTrait.SET_SEMANTIC_TABLE); | ||||||
| } | ||||||
| resolved.add(ct.trait); | ||||||
| } | ||||||
| } | ||||||
| return resolved; | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public String toString() { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. update toString as well. syntax |
||||||
| final StringBuilder s = new StringBuilder(); | ||||||
|
|
@@ -233,12 +339,13 @@ public boolean equals(Object o) { | |||||
| && Objects.equals(name, that.name) | ||||||
| && Objects.equals(dataType, that.dataType) | ||||||
| && Objects.equals(conversionClass, that.conversionClass) | ||||||
| && Objects.equals(traits, that.traits); | ||||||
| && Objects.equals(traits, that.traits) | ||||||
| && Objects.equals(conditionalTraits, that.conditionalTraits); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public int hashCode() { | ||||||
| return Objects.hash(name, dataType, conversionClass, isOptional, traits); | ||||||
| return Objects.hash(name, dataType, conversionClass, isOptional, traits, conditionalTraits); | ||||||
| } | ||||||
|
|
||||||
| private void checkName() { | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -181,6 +181,28 @@ private static void checkReservedArgs(List<StaticArgument> staticArgs) { | |
| } | ||
| } | ||
|
|
||
| static TraitContext buildTraitContext( | ||
| final TableSemantics semantics, | ||
| final CallContext callContext, | ||
| final List<StaticArgument> staticArgs) { | ||
| return new TraitContext() { | ||
| @Override | ||
| public boolean hasPartitionBy() { | ||
| return semantics.partitionByColumns().length > 0; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The trait context should be available to all args. So |
||
| } | ||
|
|
||
| @Override | ||
| public <T> Optional<T> getScalarArgument(final String name, final Class<T> clazz) { | ||
| for (int i = 0; i < staticArgs.size(); i++) { | ||
| if (staticArgs.get(i).getName().equals(name)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. check for is scalar and then isLiteral. |
||
| return callContext.getArgumentValue(i, clazz); | ||
| } | ||
| } | ||
| return Optional.empty(); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| private static void checkMultipleTableArgs(List<StaticArgument> staticArgs) { | ||
| final List<StaticArgument> tableArgs = | ||
| staticArgs.stream() | ||
|
|
@@ -314,13 +336,16 @@ private List<Field> derivePassThroughFields(CallContext callContext) { | |
| if (arg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) { | ||
| return DataType.getFields(argDataTypes.get(pos)).stream(); | ||
| } | ||
| if (!arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) { | ||
| final TableSemantics semantics = | ||
| callContext.getTableSemantics(pos).orElse(null); | ||
| if (semantics == null) { | ||
| return Stream.<Field>empty(); | ||
| } | ||
| final TraitContext traitCtx = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Build the traits as the very first, before |
||
| buildTraitContext(semantics, callContext, staticArgs); | ||
| if (!arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE, traitCtx)) { | ||
| return Stream.<Field>empty(); | ||
| } | ||
| final TableSemantics semantics = | ||
| callContext | ||
| .getTableSemantics(pos) | ||
| .orElseThrow(IllegalStateException::new); | ||
| final DataType rowDataType = | ||
| DataTypes.ROW(DataType.getFields(argDataTypes.get(pos))); | ||
| final DataType projectedRow = | ||
|
|
@@ -601,8 +626,10 @@ private static void checkTableArgs( | |
| "Table expected for argument '%s'.", | ||
| staticArg.getName())); | ||
| } | ||
| checkRowSemantics(staticArg, semantics); | ||
| checkSetSemantics(staticArg, semantics); | ||
| final TraitContext traitCtx = | ||
| buildTraitContext(semantics, callContext, staticArgs); | ||
| checkRowSemantics(staticArg, semantics, traitCtx); | ||
| checkSetSemantics(staticArg, semantics, traitCtx); | ||
| tableSemantics.add(semantics); | ||
| }); | ||
| checkCoPartitioning(tableSemantics); | ||
|
|
@@ -645,8 +672,9 @@ private static void checkCoPartitioning(List<TableSemantics> tableSemantics) { | |
| } | ||
| } | ||
|
|
||
| private static void checkRowSemantics(StaticArgument staticArg, TableSemantics semantics) { | ||
| if (!staticArg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) { | ||
| private static void checkRowSemantics( | ||
| StaticArgument staticArg, TableSemantics semantics, TraitContext traitCtx) { | ||
| if (!staticArg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE, traitCtx)) { | ||
| return; | ||
| } | ||
| if (semantics.partitionByColumns().length > 0 | ||
|
|
@@ -656,12 +684,13 @@ private static void checkRowSemantics(StaticArgument staticArg, TableSemantics s | |
| } | ||
| } | ||
|
|
||
| private static void checkSetSemantics(StaticArgument staticArg, TableSemantics semantics) { | ||
| if (!staticArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) { | ||
| private static void checkSetSemantics( | ||
| StaticArgument staticArg, TableSemantics semantics, TraitContext traitCtx) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about we copy |
||
| if (!staticArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE, traitCtx)) { | ||
| return; | ||
| } | ||
| if (semantics.partitionByColumns().length == 0 | ||
| && !staticArg.is(StaticArgumentTrait.OPTIONAL_PARTITION_BY)) { | ||
| && !staticArg.is(StaticArgumentTrait.OPTIONAL_PARTITION_BY, traitCtx)) { | ||
| throw new ValidationException( | ||
| String.format( | ||
| "Table argument '%s' requires a PARTITION BY clause for parallel processing.", | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,69 @@ | ||||||
| /* | ||||||
| * Licensed to the Apache Software Foundation (ASF) under one | ||||||
| * or more contributor license agreements. See the NOTICE file | ||||||
| * distributed with this work for additional information | ||||||
| * regarding copyright ownership. The ASF licenses this file | ||||||
| * to you under the Apache License, Version 2.0 (the | ||||||
| * "License"); you may not use this file except in compliance | ||||||
| * with the License. You may obtain a copy of the License at | ||||||
| * | ||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||
| * | ||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
| * See the License for the specific language governing permissions and | ||||||
| * limitations under the License. | ||||||
| */ | ||||||
|
|
||||||
| package org.apache.flink.table.types.inference; | ||||||
|
|
||||||
| import org.apache.flink.annotation.PublicEvolving; | ||||||
|
|
||||||
| import java.io.Serializable; | ||||||
|
|
||||||
| /** | ||||||
| * A condition that determines whether a conditional trait on a {@link StaticArgument} should be | ||||||
| * active for a given call. | ||||||
| * | ||||||
| * <p>Conditions are evaluated at planning time using the {@link TraitContext} which provides access | ||||||
| * to the SQL call's properties (PARTITION BY presence, scalar argument values, etc.). | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| * | ||||||
| * <p>Use the static factory methods for common conditions: | ||||||
| * | ||||||
| * <pre>{@code | ||||||
| * import static org.apache.flink.table.types.inference.TraitCondition.*; | ||||||
| * | ||||||
| * StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, SUPPORT_UPDATES)) | ||||||
| * .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE) | ||||||
| * .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE) | ||||||
| * .addTraitWhen(argIsTrue("produces_full_deletes"), REQUIRE_UPDATE_BEFORE); | ||||||
| * }</pre> | ||||||
| */ | ||||||
| @PublicEvolving | ||||||
| @FunctionalInterface | ||||||
| public interface TraitCondition extends Serializable { | ||||||
|
|
||||||
| /** Evaluates this condition against the given context. */ | ||||||
| boolean test(TraitContext ctx); | ||||||
|
|
||||||
| /** True when PARTITION BY is provided on the table argument. */ | ||||||
| static TraitCondition hasPartitionBy() { | ||||||
| return TraitContext::hasPartitionBy; | ||||||
| } | ||||||
|
|
||||||
| /** True when the named boolean argument is provided and its value is {@code true}. */ | ||||||
| static TraitCondition argIsTrue(final String name) { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. generialize the is true and is false to: |
||||||
| return ctx -> ctx.getScalarArgument(name, Boolean.class).orElse(false); | ||||||
| } | ||||||
|
|
||||||
| /** True when the named boolean argument is not provided or its value is {@code false}. */ | ||||||
| static TraitCondition argIsFalse(final String name) { | ||||||
| return ctx -> !ctx.getScalarArgument(name, Boolean.class).orElse(false); | ||||||
| } | ||||||
|
|
||||||
| /** Negates the given condition. */ | ||||||
| static TraitCondition not(final TraitCondition condition) { | ||||||
| return ctx -> !condition.test(ctx); | ||||||
| } | ||||||
| } | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.