Skip to content

Commit 70968e9

Browse files
committed
[FLINK-39420] Reject temporal joins in batch mode with a clear error
Batch mode only supports lookup joins. General temporal joins (event-time or processing-time on non-lookup sources) previously left an unresolved LogicalCorrelate in the plan, causing a confusing "unexpected correlate variable" error from FlinkDecorrelateProgram. Add RejectTemporalJoinInBatchRule to the batch EXPAND_PLAN_RULES. The lookup join rules fire first and rewrite valid lookup joins; any remaining Correlate+Snapshot pattern is unconditionally rejected with an actionable message pointing users to lookup joins or streaming mode. Generated-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6e3768d commit 70968e9

3 files changed

Lines changed: 142 additions & 4 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.rules.logical;
20+
21+
import org.apache.flink.table.api.TableException;
22+
23+
import org.apache.calcite.plan.RelOptRuleCall;
24+
import org.apache.calcite.plan.RelRule;
25+
import org.apache.calcite.rel.RelNode;
26+
import org.apache.calcite.rel.logical.LogicalCorrelate;
27+
import org.apache.calcite.rel.logical.LogicalFilter;
28+
import org.apache.calcite.rel.logical.LogicalSnapshot;
29+
import org.immutables.value.Value;
30+
31+
/**
32+
* Rules that reject temporal table joins ({@code FOR SYSTEM_TIME AS OF}) in batch mode with a clear
33+
* error message.
34+
*
35+
* <p>In the batch {@code EXPAND_PLAN_RULES}, lookup join rules run first and rewrite valid lookup
36+
* joins (processing-time + {@code LookupTableSource}) into {@code TemporalJoin} nodes. Any
37+
* remaining {@link LogicalCorrelate} + {@link LogicalSnapshot} pattern therefore represents an
38+
* unsupported temporal join. These rules catch it and throw a {@link TableException} rather than
39+
* letting the correlate survive into {@code FlinkDecorrelateProgram}, where it would cause a
40+
* confusing "unexpected correlate variable" internal error.
41+
*/
42+
@Value.Enclosing
43+
public class RejectTemporalJoinInBatchRule
44+
extends RelRule<RejectTemporalJoinInBatchRule.RejectTemporalJoinInBatchRuleConfig> {
45+
46+
private static final String MESSAGE =
47+
"Temporal joins (FOR SYSTEM_TIME AS OF) on regular tables are not supported in "
48+
+ "batch mode. Use a lookup join or switch to streaming mode.";
49+
50+
/**
51+
* Matches temporal joins where the right side of the Correlate is a Filter wrapping a Snapshot
52+
* (non-trivial join condition).
53+
*/
54+
public static final RejectTemporalJoinInBatchRule WITH_FILTER =
55+
RejectTemporalJoinInBatchRuleConfig.WITH_FILTER.toRule();
56+
57+
/**
58+
* Matches temporal joins where the right side of the Correlate is a Snapshot directly (trivial
59+
* join condition).
60+
*/
61+
public static final RejectTemporalJoinInBatchRule WITHOUT_FILTER =
62+
RejectTemporalJoinInBatchRuleConfig.WITHOUT_FILTER.toRule();
63+
64+
private RejectTemporalJoinInBatchRule(RejectTemporalJoinInBatchRuleConfig config) {
65+
super(config);
66+
}
67+
68+
@Override
69+
public void onMatch(RelOptRuleCall call) {
70+
throw new TableException(MESSAGE);
71+
}
72+
73+
/** Rule configuration. */
74+
@Value.Immutable(singleton = false)
75+
public interface RejectTemporalJoinInBatchRuleConfig extends RelRule.Config {
76+
77+
RejectTemporalJoinInBatchRuleConfig WITH_FILTER =
78+
ImmutableRejectTemporalJoinInBatchRule.RejectTemporalJoinInBatchRuleConfig.builder()
79+
.operandSupplier(
80+
b0 ->
81+
b0.operand(LogicalCorrelate.class)
82+
.inputs(
83+
b1 -> b1.operand(RelNode.class).anyInputs(),
84+
b2 ->
85+
b2.operand(LogicalFilter.class)
86+
.oneInput(
87+
b3 ->
88+
b3.operand(
89+
LogicalSnapshot
90+
.class)
91+
.anyInputs())))
92+
.description("RejectTemporalJoinInBatchRule_WithFilter")
93+
.build();
94+
95+
RejectTemporalJoinInBatchRuleConfig WITHOUT_FILTER =
96+
ImmutableRejectTemporalJoinInBatchRule.RejectTemporalJoinInBatchRuleConfig.builder()
97+
.operandSupplier(
98+
b0 ->
99+
b0.operand(LogicalCorrelate.class)
100+
.inputs(
101+
b1 -> b1.operand(RelNode.class).anyInputs(),
102+
b2 ->
103+
b2.operand(LogicalSnapshot.class)
104+
.anyInputs()))
105+
.description("RejectTemporalJoinInBatchRule_WithoutFilter")
106+
.build();
107+
108+
@Override
109+
default RejectTemporalJoinInBatchRule toRule() {
110+
return new RejectTemporalJoinInBatchRule(this);
111+
}
112+
}
113+
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,16 @@ object FlinkBatchRuleSets {
5050
/**
5151
* Expand plan by replacing references to tables into a proper plan sub trees. Those rules can
5252
* create new plan nodes.
53+
*
54+
* The lookup join rules run first and rewrite supported temporal joins. The rejection rules then
55+
* catch any remaining temporal joins (unsupported in batch) with a clear error message.
5356
*/
5457
val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
5558
LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITH_FILTER,
56-
LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITHOUT_FILTER)
59+
LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITHOUT_FILTER,
60+
RejectTemporalJoinInBatchRule.WITH_FILTER,
61+
RejectTemporalJoinInBatchRule.WITHOUT_FILTER
62+
)
5763

5864
val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(EnumerableToLogicalTableScan.INSTANCE)
5965

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/TemporalJoinTest.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ import org.assertj.core.api.Assertions.assertThatExceptionOfType
2424
import org.junit.jupiter.api.{BeforeEach, Test}
2525

2626
/**
27-
* Test temporal join in batch mode.
27+
* Tests that temporal joins on non-lookup tables are rejected in batch mode with a clear error.
2828
*
29-
* <p> Flink only supports lookup join in batch mode, the others Temporal join is not supported yet.
29+
* <p>Batch mode only supports lookup joins. General temporal joins (event-time or processing-time
30+
* on non-lookup sources) are caught by [[RejectTemporalJoinInBatchRule]] during the
31+
* TEMPORAL_JOIN_REWRITE phase.
3032
*/
3133
class TemporalJoinTest extends TableTestBase {
3234

@@ -105,6 +107,10 @@ class TemporalJoinTest extends TableTestBase {
105107
"GROUP BY currency ")
106108
}
107109

110+
private val expectedMessage =
111+
"Temporal joins (FOR SYSTEM_TIME AS OF) on regular tables are not supported in " +
112+
"batch mode. Use a lookup join or switch to streaming mode."
113+
108114
@Test
109115
def testSimpleJoin(): Unit = {
110116
val sqlQuery = "SELECT " +
@@ -115,6 +121,7 @@ class TemporalJoinTest extends TableTestBase {
115121

116122
assertThatExceptionOfType(classOf[TableException])
117123
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
124+
.withMessage(expectedMessage)
118125
}
119126

120127
@Test
@@ -128,6 +135,7 @@ class TemporalJoinTest extends TableTestBase {
128135

129136
assertThatExceptionOfType(classOf[TableException])
130137
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
138+
.withMessage(expectedMessage)
131139
}
132140

133141
@Test
@@ -141,11 +149,21 @@ class TemporalJoinTest extends TableTestBase {
141149

142150
assertThatExceptionOfType(classOf[TableException])
143151
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
152+
.withMessage(expectedMessage)
144153
}
145154

146155
@Test
147-
def testSimpleViewProcTimeJoin(): Unit = {
156+
def testSimpleJoinOnTrue(): Unit = {
157+
val sqlQuery = "SELECT o_amount FROM Orders AS o JOIN " +
158+
"RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.o_rowtime AS r ON TRUE"
159+
160+
assertThatExceptionOfType(classOf[TableException])
161+
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
162+
.withMessage(expectedMessage)
163+
}
148164

165+
@Test
166+
def testSimpleViewProcTimeJoin(): Unit = {
149167
val sqlQuery = "SELECT " +
150168
"o_amount * rate as rate " +
151169
"FROM Orders AS o JOIN " +
@@ -155,5 +173,6 @@ class TemporalJoinTest extends TableTestBase {
155173

156174
assertThatExceptionOfType(classOf[TableException])
157175
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
176+
.withMessage(expectedMessage)
158177
}
159178
}

0 commit comments

Comments
 (0)