Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions docs/content/docs/sql/reference/queries/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ This is useful when you need to materialize changelog events into a downstream s

```sql
SELECT * FROM TO_CHANGELOG(
input => TABLE source_table PARTITION BY key_col,
input => TABLE source_table [PARTITION BY key_col],
[op => DESCRIPTOR(op_column_name),]
[op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
)
Expand All @@ -54,7 +54,7 @@ SELECT * FROM TO_CHANGELOG(

| Parameter | Required | Description |
|:-------------|:---------|:------------|
| `input` | Yes | The input table. Must include `PARTITION BY` for parallel execution. Accepts insert-only, retract, and upsert tables. |
| `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located for parallel execution. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, providing `PARTITION BY` is recommended for better performance. |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located for parallel execution. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, providing `PARTITION BY` is recommended for better performance. |
| `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located for parallel execution (set semantics). Without `PARTITION BY`, each row is processed independently (row semantics). Accepts insert-only, retract, and upsert tables. |

| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. |
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. |

Expand Down Expand Up @@ -99,6 +99,15 @@ SELECT * FROM TO_CHANGELOG(
-- +I[id:2, op:'DELETE', name:'Bob', cnt:1]
```

#### Without PARTITION BY

```sql
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove all PARTITION BY examples for now. A default TO_CHANGELOG example should always be without PARTITION BY. They do not provide any benefit but rather add exchange overhead.

-- Each row is processed independently, no key co-location
SELECT * FROM TO_CHANGELOG(input => TABLE my_table)

-- Output: +I[op:'INSERT', id:1, name:'Alice']
```

#### Custom operation column name

```sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.StaticArgument;
import org.apache.flink.table.types.inference.StaticArgumentTrait;
import org.apache.flink.table.types.inference.TraitCondition;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
Expand Down Expand Up @@ -783,14 +784,16 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.kind(PROCESS_TABLE)
.staticArguments(
StaticArgument.table(
"input",
Row.class,
false,
EnumSet.of(
StaticArgumentTrait.TABLE,
StaticArgumentTrait.SET_SEMANTIC_TABLE,
StaticArgumentTrait.SUPPORT_UPDATES,
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)),
"input",
Row.class,
false,
EnumSet.of(
StaticArgumentTrait.TABLE,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make things explicit, I would add StateicArgumentTrait.ROW_SEMANTIC here as well.

StaticArgumentTrait.SUPPORT_UPDATES,
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE))
.addTraitWhen(
TraitCondition.hasPartitionBy(),
StaticArgumentTrait.SET_SEMANTIC_TABLE),
StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
StaticArgument.scalar(
"op_mapping",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
Expand All @@ -57,18 +61,58 @@ public class StaticArgument {
private final @Nullable Class<?> conversionClass;
private final boolean isOptional;
private final EnumSet<StaticArgumentTrait> traits;
private final List<ConditionalTrait> conditionalTraits;

/** A trait that is conditionally added based on a {@link TraitCondition}. */
private static final class ConditionalTrait implements Serializable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: classes to the bottom of the file

Suggested change
private static final class ConditionalTrait implements Serializable {
private static final class ConditionalTrait {

private final TraitCondition condition;
private final StaticArgumentTrait trait;

ConditionalTrait(final TraitCondition condition, final StaticArgumentTrait trait) {
this.condition = condition;
this.trait = trait;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ConditionalTrait that = (ConditionalTrait) o;
return Objects.equals(condition, that.condition) && trait == that.trait;
}

@Override
public int hashCode() {
return Objects.hash(condition, trait);
}
}

private StaticArgument(
String name,
@Nullable DataType dataType,
@Nullable Class<?> conversionClass,
boolean isOptional,
EnumSet<StaticArgumentTrait> traits) {
this(name, dataType, conversionClass, isOptional, traits, List.of());
}

private StaticArgument(
String name,
@Nullable DataType dataType,
@Nullable Class<?> conversionClass,
boolean isOptional,
EnumSet<StaticArgumentTrait> traits,
List<ConditionalTrait> conditionalTraits) {
this.name = Preconditions.checkNotNull(name, "Name must not be null.");
this.dataType = dataType;
this.conversionClass = conversionClass;
this.isOptional = isOptional;
this.traits = Preconditions.checkNotNull(traits, "Traits must not be null.");
this.conditionalTraits = Collections.unmodifiableList(conditionalTraits);
checkName();
checkTraits(traits);
checkOptionalType();
Expand Down Expand Up @@ -196,6 +240,68 @@ public boolean is(StaticArgumentTrait trait) {
return traits.contains(trait);
}

/**
* Context-aware trait check. Evaluates conditional trait rules against the given context to
* determine the effective traits.
*/
public boolean is(StaticArgumentTrait trait, TraitContext ctx) {
return resolveTraits(ctx).contains(trait);
}

/**
* Returns a new {@link StaticArgument} with an additional conditional trait rule. The trait is
* added to the effective trait set when the condition evaluates to {@code true} at planning
* time.
*
* <p>Example:
*
* <pre>{@code
* StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, SUPPORT_UPDATES))
* .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
* .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE);
* }</pre>
*/
public StaticArgument addTraitWhen(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public StaticArgument addTraitWhen(
public StaticArgument withConditionalTrait(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would swap the parameter order then: withConditionalTrait(trait, condition)

final TraitCondition condition, final StaticArgumentTrait trait) {
final List<ConditionalTrait> newList = new ArrayList<>(this.conditionalTraits);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should limit the conditional traits. e.g. a scalar should not become a table out of a sudden. root changes are not allowed. this should limit the logic to tables currently, because it is the only trait with subtraits.

newList.add(new ConditionalTrait(condition, trait));
return new StaticArgument(name, dataType, conversionClass, isOptional, traits, newList);
}

/** Whether this argument has conditional trait rules. */
public boolean hasConditionalTraits() {
return !conditionalTraits.isEmpty();
}

/** Whether any conditional trait rule may add the given trait. */
public boolean hasConditionalTrait(final StaticArgumentTrait trait) {
return conditionalTraits.stream().anyMatch(ct -> ct.trait == trait);
}

/**
* Resolves effective traits by evaluating conditional rules against the context. Returns the
* base traits combined with any conditional traits whose conditions are met.
*/
public EnumSet<StaticArgumentTrait> resolveTraits(final TraitContext ctx) {
if (conditionalTraits.isEmpty()) {
return traits;
}
final EnumSet<StaticArgumentTrait> resolved = EnumSet.copyOf(traits);
for (final ConditionalTrait ct : conditionalTraits) {
if (ct.condition.test(ctx)) {
// ROW_SEMANTIC_TABLE and SET_SEMANTIC_TABLE are mutually exclusive.
// Adding one removes the other.
if (ct.trait == StaticArgumentTrait.SET_SEMANTIC_TABLE) {
resolved.remove(StaticArgumentTrait.ROW_SEMANTIC_TABLE);
} else if (ct.trait == StaticArgumentTrait.ROW_SEMANTIC_TABLE) {
resolved.remove(StaticArgumentTrait.SET_SEMANTIC_TABLE);
}
resolved.add(ct.trait);
}
}
return resolved;
}

@Override
public String toString() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update toString as well. syntax myUntypedTable {TABLE BY ROW, TABLE BY SET} we add all potential conditional traits to the list but without any special syntax, the toString format is complex enough.

final StringBuilder s = new StringBuilder();
Expand Down Expand Up @@ -233,12 +339,13 @@ public boolean equals(Object o) {
&& Objects.equals(name, that.name)
&& Objects.equals(dataType, that.dataType)
&& Objects.equals(conversionClass, that.conversionClass)
&& Objects.equals(traits, that.traits);
&& Objects.equals(traits, that.traits)
&& Objects.equals(conditionalTraits, that.conditionalTraits);
}

@Override
public int hashCode() {
return Objects.hash(name, dataType, conversionClass, isOptional, traits);
return Objects.hash(name, dataType, conversionClass, isOptional, traits, conditionalTraits);
}

private void checkName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,28 @@ private static void checkReservedArgs(List<StaticArgument> staticArgs) {
}
}

static TraitContext buildTraitContext(
final TableSemantics semantics,
final CallContext callContext,
final List<StaticArgument> staticArgs) {
return new TraitContext() {
@Override
public boolean hasPartitionBy() {
return semantics.partitionByColumns().length > 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trait context should be available to all args. So TableSemantics should be nullable.

}

@Override
public <T> Optional<T> getScalarArgument(final String name, final Class<T> clazz) {
for (int i = 0; i < staticArgs.size(); i++) {
if (staticArgs.get(i).getName().equals(name)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check for is scalar and then isLiteral.

return callContext.getArgumentValue(i, clazz);
}
}
return Optional.empty();
}
};
}

private static void checkMultipleTableArgs(List<StaticArgument> staticArgs) {
final List<StaticArgument> tableArgs =
staticArgs.stream()
Expand Down Expand Up @@ -314,13 +336,16 @@ private List<Field> derivePassThroughFields(CallContext callContext) {
if (arg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)) {
return DataType.getFields(argDataTypes.get(pos)).stream();
}
if (!arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
final TableSemantics semantics =
callContext.getTableSemantics(pos).orElse(null);
if (semantics == null) {
return Stream.<Field>empty();
}
final TraitContext traitCtx =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build the traits as the very first, before arg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH)

buildTraitContext(semantics, callContext, staticArgs);
if (!arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE, traitCtx)) {
return Stream.<Field>empty();
}
final TableSemantics semantics =
callContext
.getTableSemantics(pos)
.orElseThrow(IllegalStateException::new);
final DataType rowDataType =
DataTypes.ROW(DataType.getFields(argDataTypes.get(pos)));
final DataType projectedRow =
Expand Down Expand Up @@ -601,8 +626,10 @@ private static void checkTableArgs(
"Table expected for argument '%s'.",
staticArg.getName()));
}
checkRowSemantics(staticArg, semantics);
checkSetSemantics(staticArg, semantics);
final TraitContext traitCtx =
buildTraitContext(semantics, callContext, staticArgs);
checkRowSemantics(staticArg, semantics, traitCtx);
checkSetSemantics(staticArg, semantics, traitCtx);
tableSemantics.add(semantics);
});
checkCoPartitioning(tableSemantics);
Expand Down Expand Up @@ -645,8 +672,9 @@ private static void checkCoPartitioning(List<TableSemantics> tableSemantics) {
}
}

private static void checkRowSemantics(StaticArgument staticArg, TableSemantics semantics) {
if (!staticArg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) {
private static void checkRowSemantics(
StaticArgument staticArg, TableSemantics semantics, TraitContext traitCtx) {
if (!staticArg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE, traitCtx)) {
return;
}
if (semantics.partitionByColumns().length > 0
Expand All @@ -656,12 +684,13 @@ private static void checkRowSemantics(StaticArgument staticArg, TableSemantics s
}
}

private static void checkSetSemantics(StaticArgument staticArg, TableSemantics semantics) {
if (!staticArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE)) {
private static void checkSetSemantics(
StaticArgument staticArg, TableSemantics semantics, TraitContext traitCtx) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we copy StaticArgument with the effective traits. this would simplify this tool and you don't have to pass TraitContext everywhere. we can provide StaticArgument.applyConditionalTraits(ctx): StaticArgument

if (!staticArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE, traitCtx)) {
return;
}
if (semantics.partitionByColumns().length == 0
&& !staticArg.is(StaticArgumentTrait.OPTIONAL_PARTITION_BY)) {
&& !staticArg.is(StaticArgumentTrait.OPTIONAL_PARTITION_BY, traitCtx)) {
throw new ValidationException(
String.format(
"Table argument '%s' requires a PARTITION BY clause for parallel processing.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;

/**
* A condition that determines whether a conditional trait on a {@link StaticArgument} should be
* active for a given call.
*
* <p>Conditions are evaluated at planning time using the {@link TraitContext} which provides access
* to the SQL call's properties (PARTITION BY presence, scalar argument values, etc.).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* to the SQL call's properties (PARTITION BY presence, scalar argument values, etc.).
* to the SQL call's properties (PARTITION BY presence, scalar literal values, etc.).

*
* <p>Use the static factory methods for common conditions:
*
* <pre>{@code
* import static org.apache.flink.table.types.inference.TraitCondition.*;
*
* StaticArgument.table("input", Row.class, false, EnumSet.of(TABLE, SUPPORT_UPDATES))
* .addTraitWhen(hasPartitionBy(), SET_SEMANTIC_TABLE)
* .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE)
* .addTraitWhen(argIsTrue("produces_full_deletes"), REQUIRE_UPDATE_BEFORE);
* }</pre>
*/
@PublicEvolving
@FunctionalInterface
public interface TraitCondition extends Serializable {

/** Evaluates this condition against the given context. */
boolean test(TraitContext ctx);

/** True when PARTITION BY is provided on the table argument. */
static TraitCondition hasPartitionBy() {
return TraitContext::hasPartitionBy;
}

/** True when the named boolean argument is provided and its value is {@code true}. */
static TraitCondition argIsTrue(final String name) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generialize the is true and is false to:

static <T> TraitCondition argIsEqualTo(T obj) {ctx.getScalarArgument(name, obj.getClass) == obj}

return ctx -> ctx.getScalarArgument(name, Boolean.class).orElse(false);
}

/** True when the named boolean argument is not provided or its value is {@code false}. */
static TraitCondition argIsFalse(final String name) {
return ctx -> !ctx.getScalarArgument(name, Boolean.class).orElse(false);
}

/** Negates the given condition. */
static TraitCondition not(final TraitCondition condition) {
return ctx -> !condition.test(ctx);
}
}
Loading