Skip to content

Commit d135c78

Browse files
committed
[FLINK-39420] Reject temporal joins in batch mode with a clear error
Batch mode only supports lookup joins. General temporal joins (event-time or processing-time on non-lookup sources) previously left an unresolved LogicalCorrelate in the plan, causing a confusing "unexpected correlate variable" error from FlinkDecorrelateProgram. Add RejectTemporalJoinInBatchRule to the batch EXPAND_PLAN_RULES. The lookup join rules fire first and rewrite valid lookup joins; any remaining Correlate+Snapshot pattern is unconditionally rejected with an actionable message pointing users to lookup joins or streaming mode. Generated-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6e3768d commit d135c78

3 files changed

Lines changed: 101 additions & 4 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.rules.logical;
20+
21+
import org.apache.flink.table.api.TableException;
22+
23+
import org.apache.calcite.plan.RelOptRule;
24+
import org.apache.calcite.plan.RelOptRuleCall;
25+
import org.apache.calcite.plan.RelOptRuleOperand;
26+
import org.apache.calcite.rel.RelNode;
27+
import org.apache.calcite.rel.logical.LogicalCorrelate;
28+
import org.apache.calcite.rel.logical.LogicalFilter;
29+
import org.apache.calcite.rel.logical.LogicalSnapshot;
30+
31+
/**
32+
* Rules that reject temporal table joins ({@code FOR SYSTEM_TIME AS OF}) in batch mode with a clear
33+
* error message.
34+
*
35+
* <p>In the batch {@code EXPAND_PLAN_RULES}, lookup join rules run first and rewrite valid lookup
36+
* joins (processing-time + {@code LookupTableSource}) into {@code TemporalJoin} nodes. Any
37+
* remaining {@link LogicalCorrelate} + {@link LogicalSnapshot} pattern therefore represents an
38+
* unsupported temporal join. These rules catch it and throw a {@link TableException} rather than
39+
* letting the correlate survive into {@code FlinkDecorrelateProgram}, where it would cause a
40+
* confusing "unexpected correlate variable" internal error.
41+
*/
42+
public class RejectTemporalJoinInBatchRule extends RelOptRule {
43+
44+
private static final String MESSAGE =
45+
"Temporal joins (FOR SYSTEM_TIME AS OF) on regular tables are not supported in "
46+
+ "batch mode. Use a lookup join or switch to streaming mode.";
47+
48+
/**
49+
* Matches temporal joins where the right side of the Correlate is a Filter wrapping a Snapshot
50+
* (non-trivial join condition).
51+
*/
52+
public static final RelOptRule WITH_FILTER =
53+
new RejectTemporalJoinInBatchRule(
54+
operand(
55+
LogicalCorrelate.class,
56+
operand(RelNode.class, any()),
57+
operand(
58+
LogicalFilter.class,
59+
operand(LogicalSnapshot.class, operand(RelNode.class, any())))),
60+
"RejectTemporalJoinInBatchRule_WithFilter");
61+
62+
/**
63+
* Matches temporal joins where the right side of the Correlate is a Snapshot directly (trivial
64+
* join condition).
65+
*/
66+
public static final RelOptRule WITHOUT_FILTER =
67+
new RejectTemporalJoinInBatchRule(
68+
operand(
69+
LogicalCorrelate.class,
70+
operand(RelNode.class, any()),
71+
operand(LogicalSnapshot.class, operand(RelNode.class, any()))),
72+
"RejectTemporalJoinInBatchRule_WithoutFilter");
73+
74+
private RejectTemporalJoinInBatchRule(RelOptRuleOperand operand, String description) {
75+
super(operand, description);
76+
}
77+
78+
@Override
79+
public void onMatch(RelOptRuleCall call) {
80+
throw new TableException(MESSAGE);
81+
}
82+
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,16 @@ object FlinkBatchRuleSets {
5050
/**
5151
* Expand plan by replacing references to tables into a proper plan sub trees. Those rules can
5252
* create new plan nodes.
53+
*
54+
* The lookup join rules run first and rewrite supported temporal joins. The rejection rules then
55+
* catch any remaining temporal joins (unsupported in batch) with a clear error message.
5356
*/
5457
val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
5558
LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITH_FILTER,
56-
LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITHOUT_FILTER)
59+
LogicalCorrelateToJoinFromTemporalTableRule.LOOKUP_JOIN_WITHOUT_FILTER,
60+
RejectTemporalJoinInBatchRule.WITH_FILTER,
61+
RejectTemporalJoinInBatchRule.WITHOUT_FILTER
62+
)
5763

5864
val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(EnumerableToLogicalTableScan.INSTANCE)
5965

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/TemporalJoinTest.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ import org.assertj.core.api.Assertions.assertThatExceptionOfType
2424
import org.junit.jupiter.api.{BeforeEach, Test}
2525

2626
/**
27-
* Test temporal join in batch mode.
27+
* Tests that temporal joins on non-lookup tables are rejected in batch mode with a clear error.
2828
*
29-
* <p> Flink only supports lookup join in batch mode, the others Temporal join is not supported yet.
29+
* <p>Batch mode only supports lookup joins. General temporal joins (event-time or processing-time
30+
* on non-lookup sources) are caught by {@link RejectTemporalJoinInBatchRule} during the
31+
* TEMPORAL_JOIN_REWRITE phase.
3032
*/
3133
class TemporalJoinTest extends TableTestBase {
3234

@@ -105,6 +107,10 @@ class TemporalJoinTest extends TableTestBase {
105107
"GROUP BY currency ")
106108
}
107109

110+
private val expectedMessage =
111+
"Temporal joins (FOR SYSTEM_TIME AS OF) on regular tables are not supported in " +
112+
"batch mode. Use a lookup join or switch to streaming mode."
113+
108114
@Test
109115
def testSimpleJoin(): Unit = {
110116
val sqlQuery = "SELECT " +
@@ -115,6 +121,7 @@ class TemporalJoinTest extends TableTestBase {
115121

116122
assertThatExceptionOfType(classOf[TableException])
117123
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
124+
.withMessage(expectedMessage)
118125
}
119126

120127
@Test
@@ -128,6 +135,7 @@ class TemporalJoinTest extends TableTestBase {
128135

129136
assertThatExceptionOfType(classOf[TableException])
130137
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
138+
.withMessage(expectedMessage)
131139
}
132140

133141
@Test
@@ -141,11 +149,11 @@ class TemporalJoinTest extends TableTestBase {
141149

142150
assertThatExceptionOfType(classOf[TableException])
143151
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
152+
.withMessage(expectedMessage)
144153
}
145154

146155
@Test
147156
def testSimpleViewProcTimeJoin(): Unit = {
148-
149157
val sqlQuery = "SELECT " +
150158
"o_amount * rate as rate " +
151159
"FROM Orders AS o JOIN " +
@@ -155,5 +163,6 @@ class TemporalJoinTest extends TableTestBase {
155163

156164
assertThatExceptionOfType(classOf[TableException])
157165
.isThrownBy(() => util.verifyExecPlan(sqlQuery))
166+
.withMessage(expectedMessage)
158167
}
159168
}

0 commit comments

Comments
 (0)