From 65ba6877e6a351e9ef04feb57963c64413b87a3a Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Thu, 16 Apr 2026 16:47:00 +0800 Subject: [PATCH 1/3] [BugFix] Fix chained streamstats with window causing NPE in Calcite decorrelator (#4800) Replace the correlate-based plan with a self-join plan for the global=true + window + group case in streamstats. Nested correlates (produced when chaining two streamstats with window + group by) caused NPE in Calcite's RelDecorrelator.createValueGenerator during plan preparation. The self-join approach builds a LogicalJoin instead of LogicalCorrelate, which avoids triggering the decorrelation path that fails with nested correlation variable references. Signed-off-by: Heng Qian --- .../sql/calcite/CalciteRelNodeVisitor.java | 262 +++++++++++++++++- .../remote/CalciteStreamstatsCommandIT.java | 35 +++ .../rest-api-spec/test/issues/4800.yml | 57 ++++ .../calcite/CalcitePPLStreamstatsTest.java | 46 ++- 4 files changed, 368 insertions(+), 32 deletions(-) create mode 100644 integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4800.yml diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index a4850a34721..c2ff62bed49 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -2093,14 +2093,14 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context) context.relBuilder.projectPlus(streamSeq); RelNode left = context.relBuilder.build(); - // 2. Run correlate + aggregate - return buildStreamWindowJoinPlan( + // 2. Use self-join approach to avoid nested correlates (which cause NPE + // in Calcite's RelDecorrelator when chaining multiple streamstats) + return buildStreamWindowSelfJoinPlan( context, left, node, groupList, ROW_NUMBER_COLUMN_FOR_STREAMSTATS, - null, new String[] {ROW_NUMBER_COLUMN_FOR_STREAMSTATS}); } @@ -2231,6 +2231,262 @@ private RelNode buildStreamWindowJoinPlan( return context.relBuilder.peek(); } + /** + * Builds a self-join based plan for streamstats with global=true + window + group. This avoids + * using LogicalCorrelate which causes NPE in Calcite's RelDecorrelator when chaining multiple + * streamstats commands. + * + *

Plan structure: + * + *

    + *
  1. left = input + __stream_seq__ + *
  2. right = trim to only aggregate input + __stream_seq__ + *
  3. Join left and right on window frame + group conditions + *
  4. Group by all left field indices, compute AGG(right.X) + *
  5. Sort by __stream_seq__, then remove it + *
+ */ + private RelNode buildStreamWindowSelfJoinPlan( + CalcitePlanContext context, + RelNode leftWithHelpers, + StreamWindow node, + List groupList, + String seqCol, + String[] helperColsToCleanup) { + + int leftFieldCount = leftWithHelpers.getRowType().getFieldCount(); + + // Build right side: project only the fields needed for aggregation + seq + group columns + // This avoids field name collisions and keeps the right side minimal + context.relBuilder.push(leftWithHelpers); + + // Collect fields needed on right side: seq col + group cols + aggregate input fields + List rightFields = new ArrayList<>(); + List rightFieldNames = new ArrayList<>(); + + // Always include seq col + rightFields.add(context.relBuilder.field(seqCol)); + rightFieldNames.add("__r_seq__"); + + // Include group columns + for (UnresolvedExpression groupExpr : groupList) { + String groupName = extractGroupFieldName(groupExpr); + rightFields.add(context.relBuilder.field(groupName)); + rightFieldNames.add("__r_" + groupName + "__"); + } + + // Include aggregate input fields (extract field names from window functions) + Set aggInputFields = new HashSet<>(); + for (UnresolvedExpression wfExpr : node.getWindowFunctionList()) { + collectFieldNames(wfExpr, aggInputFields); + } + // Remove already-included fields + aggInputFields.remove(seqCol); + for (UnresolvedExpression groupExpr : groupList) { + aggInputFields.remove(extractGroupFieldName(groupExpr)); + } + for (String aggField : aggInputFields) { + rightFields.add(context.relBuilder.field(aggField)); + rightFieldNames.add("__r_" + aggField + "__"); + } + + context.relBuilder.project(rightFields, rightFieldNames); + RelNode rightProjected = context.relBuilder.build(); + + // Push left and right + context.relBuilder.push(leftWithHelpers); + context.relBuilder.push(rightProjected); + + // Build join condition using 2-input references + RexNode leftSeq = context.relBuilder.field(2, 0, seqCol); + RexNode rightSeq = context.relBuilder.field(2, 1, "__r_seq__"); + + // Frame filter + RexNode frameFilter; + if (node.isCurrent()) { + RexNode lower = + context.relBuilder.call( + SqlStdOperatorTable.MINUS, leftSeq, context.relBuilder.literal(node.getWindow() - 1)); + frameFilter = context.relBuilder.between(rightSeq, lower, leftSeq); + } else { + RexNode lower = + context.relBuilder.call( + SqlStdOperatorTable.MINUS, leftSeq, context.relBuilder.literal(node.getWindow())); + RexNode upper = + context.relBuilder.call( + SqlStdOperatorTable.MINUS, leftSeq, context.relBuilder.literal(1)); + frameFilter = context.relBuilder.between(rightSeq, lower, upper); + } + + // Group filter + List groupFilters = new ArrayList<>(); + for (UnresolvedExpression groupExpr : groupList) { + String groupName = extractGroupFieldName(groupExpr); + RexNode leftGroup = context.relBuilder.field(2, 0, groupName); + RexNode rightGroup = context.relBuilder.field(2, 1, "__r_" + groupName + "__"); + RexNode equalCondition = context.relBuilder.equals(leftGroup, rightGroup); + if (node.isBucketNullable()) { + RexNode bothNull = + context.relBuilder.and( + context.relBuilder.isNull(leftGroup), context.relBuilder.isNull(rightGroup)); + groupFilters.add(context.relBuilder.or(equalCondition, bothNull)); + } else { + groupFilters.add(equalCondition); + } + } + + RexNode joinCondition = + groupFilters.isEmpty() + ? frameFilter + : context.relBuilder.and(frameFilter, context.relBuilder.and(groupFilters)); + context.relBuilder.join(JoinRelType.LEFT, joinCondition); + + // After join: [left_fields(0..leftFieldCount-1), right_fields(leftFieldCount..)] + // Aggregate: group by all left fields, compute AGG on right fields + // The aggregate functions need to reference the right-side fields in the joined row + + // Build aggregate calls using the right-side field references + List aggCalls = + buildAggCallsFromJoinedRight(node.getWindowFunctionList(), leftFieldCount, context); + + RelBuilder.GroupKey groupKey = + context.relBuilder.groupKey( + IntStream.range(0, leftFieldCount).mapToObj(context.relBuilder::field).toList()); + + context.relBuilder.aggregate(groupKey, aggCalls); + + // Resort by the sequence column + context.relBuilder.sort(context.relBuilder.field(seqCol)); + + // Cleanup helper columns + List cleanup = new ArrayList<>(); + for (String c : helperColsToCleanup) { + cleanup.add(context.relBuilder.field(c)); + } + context.relBuilder.projectExcept(cleanup); + return context.relBuilder.peek(); + } + + /** Collect field names referenced by an expression tree. */ + private void collectFieldNames(UnresolvedExpression expr, Set fieldNames) { + if (expr instanceof Field f) { + fieldNames.add(f.getField().toString()); + } else if (expr instanceof Alias a) { + collectFieldNames(a.getDelegated(), fieldNames); + } else if (expr instanceof WindowFunction wf) { + collectFieldNames(wf.getFunction(), fieldNames); + } else if (expr instanceof Function func) { + for (UnresolvedExpression arg : func.getFuncArgs()) { + collectFieldNames(arg, fieldNames); + } + } + } + + /** + * Build AggCall list for the self-join plan. The aggregate functions reference fields from the + * right side of the join (offset by leftFieldCount). + */ + private List buildAggCallsFromJoinedRight( + List windowFunctionList, + int leftFieldCount, + CalcitePlanContext context) { + List aggCalls = new ArrayList<>(); + for (UnresolvedExpression wfExpr : windowFunctionList) { + String alias = null; + UnresolvedExpression inner = wfExpr; + if (wfExpr instanceof Alias a) { + // Use Alias.getName() for the aggregate output name (e.g. "max(SAL)" or user-defined alias) + alias = a.getName(); + inner = a.getDelegated(); + } + if (inner instanceof WindowFunction wf && wf.getFunction() instanceof Function func) { + String funcName = func.getFuncName().toUpperCase(); + List args = func.getFuncArgs(); + AggCall aggCall = buildSingleAggCall(funcName, args, alias, leftFieldCount, context); + aggCalls.add(aggCall); + } + } + return aggCalls; + } + + /** Build a single AggCall for the self-join aggregate, referencing right-side fields. */ + private AggCall buildSingleAggCall( + String funcName, + List args, + String alias, + int leftFieldCount, + CalcitePlanContext context) { + // Map function name to Calcite aggregate function + RelBuilder.AggCall aggCall; + if (args.isEmpty()) { + // COUNT() + aggCall = context.relBuilder.count(); + if (alias != null) { + aggCall = aggCall.as(alias); + } + } else { + // Get the right-side field reference for the argument + String argFieldName = extractFieldNameFromExpr(args.get(0)); + String rightArgFieldName = "__r_" + argFieldName + "__"; + // Find it in the right side (after left fields) + int rightFieldIndex = -1; + RelDataType joinedRowType = context.relBuilder.peek().getRowType(); + List fieldNames = joinedRowType.getFieldNames(); + for (int i = leftFieldCount; i < fieldNames.size(); i++) { + if (fieldNames.get(i).equals(rightArgFieldName)) { + rightFieldIndex = i; + break; + } + } + if (rightFieldIndex == -1) { + throw new IllegalArgumentException( + "Cannot find aggregate input field '" + rightArgFieldName + "' in right side of join"); + } + RexNode argRef = context.relBuilder.field(rightFieldIndex); + + aggCall = + switch (funcName) { + case "AVG" -> context.relBuilder.avg(false, alias, argRef); + case "SUM" -> context.relBuilder.sum(false, alias, argRef); + case "MIN" -> context.relBuilder.min(alias, argRef); + case "MAX" -> context.relBuilder.max(alias, argRef); + case "COUNT" -> context.relBuilder.count(false, alias, argRef); + case "DC", "DISTINCT_COUNT" -> context.relBuilder.count(true, alias, argRef); + case "STDDEV_POP" -> { + AggCall c = context.relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_POP, argRef); + yield alias != null ? c.as(alias) : c; + } + case "STDDEV_SAMP" -> { + AggCall c = context.relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_SAMP, argRef); + yield alias != null ? c.as(alias) : c; + } + case "VAR_POP" -> { + AggCall c = context.relBuilder.aggregateCall(SqlStdOperatorTable.VAR_POP, argRef); + yield alias != null ? c.as(alias) : c; + } + case "VAR_SAMP" -> { + AggCall c = context.relBuilder.aggregateCall(SqlStdOperatorTable.VAR_SAMP, argRef); + yield alias != null ? c.as(alias) : c; + } + case "EARLIEST" -> context.relBuilder.min(alias, argRef); + case "LATEST" -> context.relBuilder.max(alias, argRef); + default -> + throw new UnsupportedOperationException("Unexpected window function: " + funcName); + }; + } + return aggCall; + } + + private String extractFieldNameFromExpr(UnresolvedExpression expr) { + if (expr instanceof Field f) { + return f.getField().toString(); + } else if (expr instanceof Alias a) { + return extractFieldNameFromExpr(a.getDelegated()); + } else { + throw new IllegalArgumentException("Cannot extract field name from: " + expr); + } + } + private RelNode buildResetHelperColumns(CalcitePlanContext context, StreamWindow node) { // 1. global sequence to define order RexNode rowNum = diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java index dcf36f510bf..fa0b21e622f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java @@ -823,6 +823,41 @@ public void testMultipleStreamstats() throws IOException { rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20, 22.5)); } + @Test + public void testMultipleStreamstatsWithWindow() throws IOException { + // Test case from GitHub issue #4800: chained streamstats with window=2 + JSONObject actual = + executeQuery( + String.format( + "source=%s | streamstats window=2 avg(age) as avg_age by state, country" + + " | streamstats window=2 avg(avg_age) as avg_state_age by country", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifySchemaInOrder( + actual, + schema("name", "string"), + schema("country", "string"), + schema("state", "string"), + schema("month", "int"), + schema("year", "int"), + schema("age", "int"), + schema("avg_age", "double"), + schema("avg_state_age", "double")); + + verifyDataRows( + actual, + rows("Jake", "USA", "California", 4, 2023, 70, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 30, 50), + rows("John", "Canada", "Ontario", 4, 2023, 25, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20, 22.5), + rows(null, "Canada", null, 4, 2023, 10, 10, 15), + rows("Kevin", null, null, 4, 2023, null, null, null)); + } + + // TODO: Fix chained reset_before + window streamstats (nested correlate issue, see #4800) + // The reset path still uses correlate, and the window self-join copies it into the right side, + // causing Calcite's RelDecorrelator to fail on duplicate correlate references. + @Test public void testMultipleStreamstatsWithNull1() throws IOException { JSONObject actual = diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4800.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4800.yml new file mode 100644 index 00000000000..ef4c4769191 --- /dev/null +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4800.yml @@ -0,0 +1,57 @@ +setup: + - skip: + features: + - headers + - allowed_warnings + - do: + query.settings: + body: + transient: + plugins.calcite.enabled: true + - do: + indices.create: + index: stream_test_null + body: + mappings: + properties: + name: { type: keyword } + age: { type: integer } + state: { type: keyword } + country: { type: keyword } + year: { type: integer } + month: { type: integer } + - do: + bulk: + index: stream_test_null + refresh: true + body: + - '{"index": {"_id": "1"}}' + - '{"name": "Jake", "age": 70, "state": "California", "country": "USA", "year": 2023, "month": 4}' + - '{"index": {"_id": "2"}}' + - '{"name": "Hello", "age": 30, "state": "New York", "country": "USA", "year": 2023, "month": 4}' + - '{"index": {"_id": "3"}}' + - '{"name": "John", "age": 25, "state": "Ontario", "country": "Canada", "year": 2023, "month": 4}' + - '{"index": {"_id": "4"}}' + - '{"name": "Jane", "age": 20, "state": "Quebec", "country": "Canada", "year": 2023, "month": 4}' + - '{"index": {"_id": "5"}}' + - '{"name": null, "age": 10, "state": null, "country": "Canada", "year": 2023, "month": 4}' + - '{"index": {"_id": "6"}}' + - '{"name": "Kevin", "year": 2023, "month": 4}' + +--- +teardown: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled: false + +--- +"Chained streamstats with window should not cause NPE (#4800)": + - do: + headers: + Content-Type: 'application/json' + ppl: + body: + query: 'source=stream_test_null | streamstats window=2 avg(age) as avg_age by state, country | streamstats window=2 avg(avg_age) as avg_state_age by country' + - match: {"total": 6} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java index 637e8f19820..be1b97a4810 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java @@ -92,41 +92,21 @@ public void testStreamstatsCurrent() { public void testStreamstatsWindow() { String ppl = "source=EMP | streamstats window = 5 max(SAL) by DEPTNO"; RelNode root = getRelNode(ppl); + // Uses self-join plan to avoid nested correlates that cause NPE in Calcite's decorrelator String expectedLogical = "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," + " COMM=[$6], DEPTNO=[$7], max(SAL)=[$9])\n" + " LogicalSort(sort0=[$8], dir0=[ASC])\n" - + " LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{7," - + " 8}])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " LogicalAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8}], max(SAL)=[MAX($11)])\n" + + " LogicalJoin(condition=[AND(>=($9, -($8, 4)), <=($9, $8), IS NOT DISTINCT" + + " FROM($7, $10))], joinType=[left])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalAggregate(group=[{}], max(SAL)=[MAX($0)])\n" - + " LogicalProject(SAL=[$5])\n" - + " LogicalFilter(condition=[AND(>=($8, -($cor0.__stream_seq__, 4)), <=($8," - + " $cor0.__stream_seq__), OR(=($7, $cor0.DEPTNO), AND(IS NULL($7), IS" - + " NULL($cor0.DEPTNO))))])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," - + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER" - + " ()])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(__r_seq__=[ROW_NUMBER() OVER ()], __r_DEPTNO__=[$7]," + + " __r_SAL__=[$5])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); - - String expectedSparkSql = - "SELECT `$cor0`.`EMPNO`, `$cor0`.`ENAME`, `$cor0`.`JOB`, `$cor0`.`MGR`, `$cor0`.`HIREDATE`," - + " `$cor0`.`SAL`, `$cor0`.`COMM`, `$cor0`.`DEPTNO`, `t3`.`max(SAL)`\n" - + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," - + " ROW_NUMBER() OVER () `__stream_seq__`\n" - + "FROM `scott`.`EMP`) `$cor0`,\n" - + "LATERAL (SELECT MAX(`SAL`) `max(SAL)`\n" - + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," - + " ROW_NUMBER() OVER () `__stream_seq__`\n" - + "FROM `scott`.`EMP`) `t0`\n" - + "WHERE `__stream_seq__` >= `$cor0`.`__stream_seq__` - 4 AND `__stream_seq__` <=" - + " `$cor0`.`__stream_seq__` AND (`DEPTNO` = `$cor0`.`DEPTNO` OR `DEPTNO` IS NULL AND" - + " `$cor0`.`DEPTNO` IS NULL)) `t3`\n" - + "ORDER BY `$cor0`.`__stream_seq__` NULLS LAST"; - verifyPPLToSparkSQL(root, expectedSparkSql); } @Test @@ -223,6 +203,14 @@ public void testStreamstatsReset() { verifyPPLToSparkSQL(root, expectedSparkSql); } + @Test + public void testMultipleStreamstatsWithWindow() { + String ppl = + "source=EMP | streamstats window=2 avg(SAL) as avg_sal by DEPTNO" + + " | streamstats window=2 avg(avg_sal) as avg_dept_sal by DEPTNO"; + RelNode root = getRelNode(ppl); + } + @Test public void testStreamstatsWithReverse() { String ppl = "source=EMP | streamstats max(SAL) by DEPTNO | reverse"; From 7c8f99140ea6eb6fb150f50c12bcb7c456ab434f Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Fri, 17 Apr 2026 10:49:57 +0800 Subject: [PATCH 2/3] Update explain YAML expected output and address review feedback - Update explain_streamstats_global.yaml and explain_streamstats_global_null_bucket.yaml for both pushdown and no-pushdown cases to reflect the new self-join plan (LogicalJoin + LogicalAggregate) instead of the old LogicalCorrelate plan - Guard zero-argument branch in buildSingleAggCall to only allow COUNT() - Use LinkedHashSet for aggInputFields for deterministic field ordering - Add assertions to testMultipleStreamstatsWithWindow unit test Signed-off-by: Heng Qian --- .../sql/calcite/CalciteRelNodeVisitor.java | 9 ++++- .../calcite/explain_streamstats_global.yaml | 38 +++++++----------- ...xplain_streamstats_global_null_bucket.yaml | 40 ++++++++----------- .../explain_streamstats_global.yaml | 36 ++++++----------- ...xplain_streamstats_global_null_bucket.yaml | 39 ++++++++---------- .../calcite/CalcitePPLStreamstatsTest.java | 12 ++++++ 6 files changed, 78 insertions(+), 96 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 1ce823ae1e4..c91937ff77b 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -35,6 +35,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -2280,7 +2281,7 @@ private RelNode buildStreamWindowSelfJoinPlan( } // Include aggregate input fields (extract field names from window functions) - Set aggInputFields = new HashSet<>(); + Set aggInputFields = new LinkedHashSet<>(); for (UnresolvedExpression wfExpr : node.getWindowFunctionList()) { collectFieldNames(wfExpr, aggInputFields); } @@ -2423,7 +2424,11 @@ private AggCall buildSingleAggCall( // Map function name to Calcite aggregate function RelBuilder.AggCall aggCall; if (args.isEmpty()) { - // COUNT() + // Only COUNT() supports zero arguments + if (!funcName.equals("COUNT")) { + throw new UnsupportedOperationException( + "Zero-argument window function not supported: " + funcName); + } aggCall = context.relBuilder.count(); if (alias != null) { aggCall = aggCall.as(alias); diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml index b1b492f44a7..2417c318c41 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml @@ -3,29 +3,19 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($0)]) - LogicalProject(age=[$8]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], avg_age=[AVG($20)]) + LogicalJoin(condition=[AND(>=($18, -($17, 1)), <=($18, $17), IS NOT DISTINCT FROM($4, $19))], joinType=[left]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalProject(__r_seq__=[ROW_NUMBER() OVER ()], __r_gender__=[$4], __r_age__=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18]) - EnumerableLimit(fetch=[10000]) - EnumerableMergeJoin(condition=[AND(=($11, $15), =($12, $16), =($13, $17), IS NOT DISTINCT FROM($4, $14))], joinType=[left]) - EnumerableSort(sort0=[$11], sort1=[$12], sort2=[$13], dir0=[ASC], dir1=[ASC], dir2=[ASC]) - EnumerableCalc(expr#0..11=[{inputs}], expr#12=[1], expr#13=[-($t11, $t12)], expr#14=[IS NULL($t4)], proj#0..11=[{exprs}], $f12=[$t13], $f15=[$t14]) + EnumerableCalc(expr#0..19=[{inputs}], expr#20=[0], expr#21=[=($t19, $t20)], expr#22=[null:BIGINT], expr#23=[CASE($t21, $t22, $t18)], expr#24=[CAST($t23):DOUBLE], expr#25=[/($t24, $t19)], proj#0..10=[{exprs}], avg_age=[$t25]) + CalciteEnumerableTopK(sort0=[$17], dir0=[ASC], fetch=[10000]) + EnumerableAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], agg#0=[$SUM0($20)], agg#1=[COUNT($20)]) + EnumerableNestedLoopJoin(condition=[AND(>=($18, -($17, 1)), <=($18, $17), IS NOT DISTINCT FROM($4, $19))], joinType=[left]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableCalc(expr#0..2=[{inputs}], __r_seq__=[$t2], __r_gender__=[$t0], __r_age__=[$t1]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[ASC], dir2=[ASC]) - EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[=($t5, $t6)], expr#8=[null:BIGINT], expr#9=[CASE($t7, $t8, $t4)], expr#10=[CAST($t9):DOUBLE], expr#11=[/($t10, $t5)], proj#0..3=[{exprs}], avg_age=[$t11]) - EnumerableAggregate(group=[{0, 1, 2, 3}], agg#0=[$SUM0($5)], agg#1=[COUNT($5)]) - EnumerableNestedLoopJoin(condition=[AND(>=($6, $2), <=($6, $1), OR(=($4, $0), AND(IS NULL($4), $3)))], joinType=[inner]) - EnumerableAggregate(group=[{0, 1, 2, 3}]) - EnumerableCalc(expr#0..1=[{inputs}], expr#2=[1], expr#3=[-($t1, $t2)], expr#4=[IS NULL($t0)], proj#0..1=[{exprs}], $f12=[$t3], $f15=[$t4]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml index 24425578af2..c6542d03abc 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml @@ -3,29 +3,21 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($0)]) - LogicalProject(age=[$8]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], avg_age=[AVG($20)]) + LogicalJoin(condition=[AND(>=($18, -($17, 1)), <=($18, $17), =($4, $19))], joinType=[left]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalProject(__r_seq__=[ROW_NUMBER() OVER ()], __r_gender__=[$4], __r_age__=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) - CalciteEnumerableTopK(sort0=[$11], dir0=[ASC], fetch=[10000]) - EnumerableMergeJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) - EnumerableSort(sort0=[$4], sort1=[$11], sort2=[$12], dir0=[ASC], dir1=[ASC], dir2=[ASC]) - EnumerableCalc(expr#0..11=[{inputs}], expr#12=[1], expr#13=[-($t11, $t12)], proj#0..11=[{exprs}], $f12=[$t13]) + EnumerableCalc(expr#0..19=[{inputs}], expr#20=[0], expr#21=[=($t19, $t20)], expr#22=[null:BIGINT], expr#23=[CASE($t21, $t22, $t18)], expr#24=[CAST($t23):DOUBLE], expr#25=[/($t24, $t19)], proj#0..10=[{exprs}], avg_age=[$t25]) + CalciteEnumerableTopK(sort0=[$17], dir0=[ASC], fetch=[10000]) + EnumerableAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], agg#0=[$SUM0($20)], agg#1=[COUNT($20)]) + EnumerableMergeJoin(condition=[AND(=($4, $19), >=($18, -($17, 1)), <=($18, $17))], joinType=[left]) + EnumerableSort(sort0=[$4], dir0=[ASC]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[ASC], dir1=[ASC], dir2=[ASC]) - EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[=($t4, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t3)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t4)], proj#0..2=[{exprs}], avg_age=[$t10]) - EnumerableAggregate(group=[{0, 1, 2}], agg#0=[$SUM0($4)], agg#1=[COUNT($4)]) - EnumerableHashJoin(condition=[AND(=($0, $3), >=($5, $2), <=($5, $1))], joinType=[inner]) - EnumerableAggregate(group=[{0, 1, 2}]) - EnumerableCalc(expr#0..1=[{inputs}], expr#2=[1], expr#3=[-($t1, $t2)], proj#0..1=[{exprs}], $f12=[$t3]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableSort(sort0=[$1], dir0=[ASC]) + EnumerableCalc(expr#0..2=[{inputs}], __r_seq__=[$t2], __r_gender__=[$t0], __r_age__=[$t1]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml index 522e7922e68..c56cd5d1bce 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml @@ -3,30 +3,20 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($0)]) - LogicalProject(age=[$8]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], avg_age=[AVG($20)]) + LogicalJoin(condition=[AND(>=($18, -($17, 1)), <=($18, $17), IS NOT DISTINCT FROM($4, $19))], joinType=[left]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalProject(__r_seq__=[ROW_NUMBER() OVER ()], __r_gender__=[$4], __r_age__=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18]) + EnumerableCalc(expr#0..19=[{inputs}], expr#20=[0], expr#21=[=($t19, $t20)], expr#22=[null:BIGINT], expr#23=[CASE($t21, $t22, $t18)], expr#24=[CAST($t23):DOUBLE], expr#25=[/($t24, $t19)], proj#0..10=[{exprs}], avg_age=[$t25]) EnumerableLimit(fetch=[10000]) - EnumerableMergeJoin(condition=[AND(=($11, $15), =($12, $16), =($13, $17), IS NOT DISTINCT FROM($4, $14))], joinType=[left]) - EnumerableSort(sort0=[$11], sort1=[$12], sort2=[$13], dir0=[ASC], dir1=[ASC], dir2=[ASC]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], expr#20=[IS NULL($t4)], proj#0..10=[{exprs}], __stream_seq__=[$t17], $f12=[$t19], $f15=[$t20]) + EnumerableSort(sort0=[$17], dir0=[ASC]) + EnumerableAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], agg#0=[$SUM0($20)], agg#1=[COUNT($20)]) + EnumerableNestedLoopJoin(condition=[AND(>=($18, -($17, 1)), <=($18, $17), IS NOT DISTINCT FROM($4, $19))], joinType=[left]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[ASC], dir2=[ASC]) - EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[=($t5, $t6)], expr#8=[null:BIGINT], expr#9=[CASE($t7, $t8, $t4)], expr#10=[CAST($t9):DOUBLE], expr#11=[/($t10, $t5)], proj#0..3=[{exprs}], avg_age=[$t11]) - EnumerableAggregate(group=[{0, 1, 2, 3}], agg#0=[$SUM0($5)], agg#1=[COUNT($5)]) - EnumerableNestedLoopJoin(condition=[AND(>=($6, $2), <=($6, $1), OR(=($4, $0), AND(IS NULL($4), $3)))], joinType=[inner]) - EnumerableAggregate(group=[{0, 1, 2, 3}]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], expr#20=[IS NULL($t4)], gender=[$t4], __stream_seq__=[$t17], $f12=[$t19], $f15=[$t20]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - EnumerableCalc(expr#0..17=[{inputs}], gender=[$t4], age=[$t8], $2=[$t17]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file + EnumerableCalc(expr#0..17=[{inputs}], __r_seq__=[$t17], __r_gender__=[$t4], __r_age__=[$t8]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml index a0634448b5e..d72bf7b429f 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml @@ -3,29 +3,22 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($0)]) - LogicalProject(age=[$8]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], avg_age=[AVG($20)]) + LogicalJoin(condition=[AND(>=($18, -($17, 1)), <=($18, $17), =($4, $19))], joinType=[left]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalProject(__r_seq__=[ROW_NUMBER() OVER ()], __r_gender__=[$4], __r_age__=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) + EnumerableCalc(expr#0..19=[{inputs}], expr#20=[0], expr#21=[=($t19, $t20)], expr#22=[null:BIGINT], expr#23=[CASE($t21, $t22, $t18)], expr#24=[CAST($t23):DOUBLE], expr#25=[/($t24, $t19)], proj#0..10=[{exprs}], avg_age=[$t25]) EnumerableLimit(fetch=[10000]) - EnumerableHashJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) - EnumerableSort(sort0=[$11], dir0=[ASC]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], proj#0..10=[{exprs}], __stream_seq__=[$t17], $f12=[$t19]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[=($t4, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t3)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t4)], proj#0..2=[{exprs}], avg_age=[$t10]) - EnumerableAggregate(group=[{0, 1, 2}], agg#0=[$SUM0($4)], agg#1=[COUNT($4)]) - EnumerableHashJoin(condition=[AND(=($0, $3), >=($5, $2), <=($5, $1))], joinType=[inner]) - EnumerableAggregate(group=[{0, 1, 2}]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], gender=[$t4], __stream_seq__=[$t17], $f12=[$t19]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - EnumerableCalc(expr#0..17=[{inputs}], gender=[$t4], age=[$t8], $2=[$t17]) + EnumerableSort(sort0=[$17], dir0=[ASC]) + EnumerableAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], agg#0=[$SUM0($20)], agg#1=[COUNT($20)]) + EnumerableMergeJoin(condition=[AND(=($4, $19), >=($18, -($17, 1)), <=($18, $17))], joinType=[left]) + EnumerableSort(sort0=[$4], dir0=[ASC]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableSort(sort0=[$1], dir0=[ASC]) + EnumerableCalc(expr#0..17=[{inputs}], __r_seq__=[$t17], __r_gender__=[$t4], __r_age__=[$t8]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java index be1b97a4810..2e4b6a605dd 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java @@ -5,6 +5,10 @@ package org.opensearch.sql.ppl.calcite; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import org.apache.calcite.rel.RelNode; import org.apache.calcite.test.CalciteAssert; import org.junit.Test; @@ -209,6 +213,14 @@ public void testMultipleStreamstatsWithWindow() { "source=EMP | streamstats window=2 avg(SAL) as avg_sal by DEPTNO" + " | streamstats window=2 avg(avg_sal) as avg_dept_sal by DEPTNO"; RelNode root = getRelNode(ppl); + assertNotNull("Chained streamstats with window should produce a valid plan", root); + // Verify the plan uses self-join (LogicalJoin) instead of LogicalCorrelate + String plan = root.explain(); + assertTrue( + "Plan should contain LogicalJoin for self-join approach", plan.contains("LogicalJoin")); + assertFalse( + "Plan should not contain LogicalCorrelate for window+group streamstats", + plan.contains("LogicalCorrelate")); } @Test From 19d74dbb1ac7023cca846f010cd5aa7f1de56194 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Thu, 23 Apr 2026 13:52:20 +0800 Subject: [PATCH 3/3] Reuse CalciteAggCallVisitor for streamstats self-join aggregates Address review feedback from @LantaoJin: instead of duplicating the PPL-window-function to Calcite-AggCall mapping inside buildSingleAggCall, rewrite the window function's field references to the prefixed right-side column names and delegate aggregate resolution to the shared aggVisitor. This eliminates ~100 lines of parallel function-name switching (AVG/SUM/MIN/MAX/COUNT/DC/STDDEV_*/VAR_*/ EARLIEST/LATEST) and keeps the streamstats self-join path consistent with regular stats/eventstats aggregation handling. Also extracts the "__r___" naming convention into named constants (RIGHT_SIDE_FIELD_PREFIX / RIGHT_SIDE_FIELD_SUFFIX / RIGHT_SIDE_SEQ_COLUMN) and a toRightSideFieldName helper so the prefix is defined in one place. Harness: add a reminder under Path B (AST / Function Implementation) in ppl-bugfix-reference.md to reuse aggVisitor / rexVisitor before hand-rolling a new function-name switch. Signed-off-by: Heng Qian --- .claude/harness/ppl-bugfix-reference.md | 5 + .../sql/calcite/CalciteRelNodeVisitor.java | 176 ++++++++---------- 2 files changed, 81 insertions(+), 100 deletions(-) diff --git a/.claude/harness/ppl-bugfix-reference.md b/.claude/harness/ppl-bugfix-reference.md index 9437ab88e1a..20697c83ecb 100644 --- a/.claude/harness/ppl-bugfix-reference.md +++ b/.claude/harness/ppl-bugfix-reference.md @@ -21,6 +21,11 @@ Consult this file when you need fix-path-specific guidance or test templates. 1. AST nodes in `core/.../ast/tree/`, functions in `core/.../expression/function/` or `PPLBuiltinOperators` 2. Watch Visitor pattern — sync `AbstractNodeVisitor`, `Analyzer`, `CalciteRelNodeVisitor`, `PPLQueryDataAnonymizer` 3. Test: `verifyLogical()`, `verifyPPLToSparkSQL()`, `verifyResult()` +4. **Before writing a new function-name → Calcite-op switch, try to reuse the existing visitor** + (`aggVisitor` / `rexVisitor` / `CalciteAggCallVisitor` / `CalciteRexNodeVisitor`). If the issue + is that a shared visitor resolves field references against the wrong row (e.g., wrong side of a + join), rewrite the AST field references to reference the correct names and delegate instead of + duplicating the AVG/SUM/MIN/MAX/STDDEV/... mapping by hand. ### Path C — Type System / Semantic Analysis diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index c91937ff77b..404d42cd08c 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -104,6 +104,7 @@ import org.opensearch.sql.ast.expression.ParseMethod; import org.opensearch.sql.ast.expression.PatternMethod; import org.opensearch.sql.ast.expression.PatternMode; +import org.opensearch.sql.ast.expression.QualifiedName; import org.opensearch.sql.ast.expression.Span; import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.expression.UnresolvedExpression; @@ -192,6 +193,18 @@ public class CalciteRelNodeVisitor extends AbstractNodeVisitor { + /** + * Prefix/suffix applied to right-side fields in the streamstats self-join plan to avoid name + * collisions with the left side and to make the renaming reversible. + */ + private static final String RIGHT_SIDE_FIELD_PREFIX = "__r_"; + + private static final String RIGHT_SIDE_FIELD_SUFFIX = "__"; + + /** Name of the right-side sequence column in the streamstats self-join plan. */ + private static final String RIGHT_SIDE_SEQ_COLUMN = + RIGHT_SIDE_FIELD_PREFIX + "seq" + RIGHT_SIDE_FIELD_SUFFIX; + private final CalciteRexNodeVisitor rexVisitor; private final CalciteAggCallVisitor aggVisitor; private final DataSourceService dataSourceService; @@ -2271,13 +2284,13 @@ private RelNode buildStreamWindowSelfJoinPlan( // Always include seq col rightFields.add(context.relBuilder.field(seqCol)); - rightFieldNames.add("__r_seq__"); + rightFieldNames.add(RIGHT_SIDE_SEQ_COLUMN); // Include group columns for (UnresolvedExpression groupExpr : groupList) { String groupName = extractGroupFieldName(groupExpr); rightFields.add(context.relBuilder.field(groupName)); - rightFieldNames.add("__r_" + groupName + "__"); + rightFieldNames.add(toRightSideFieldName(groupName)); } // Include aggregate input fields (extract field names from window functions) @@ -2292,7 +2305,7 @@ private RelNode buildStreamWindowSelfJoinPlan( } for (String aggField : aggInputFields) { rightFields.add(context.relBuilder.field(aggField)); - rightFieldNames.add("__r_" + aggField + "__"); + rightFieldNames.add(toRightSideFieldName(aggField)); } context.relBuilder.project(rightFields, rightFieldNames); @@ -2304,7 +2317,7 @@ private RelNode buildStreamWindowSelfJoinPlan( // Build join condition using 2-input references RexNode leftSeq = context.relBuilder.field(2, 0, seqCol); - RexNode rightSeq = context.relBuilder.field(2, 1, "__r_seq__"); + RexNode rightSeq = context.relBuilder.field(2, 1, RIGHT_SIDE_SEQ_COLUMN); // Frame filter RexNode frameFilter; @@ -2328,7 +2341,7 @@ private RelNode buildStreamWindowSelfJoinPlan( for (UnresolvedExpression groupExpr : groupList) { String groupName = extractGroupFieldName(groupExpr); RexNode leftGroup = context.relBuilder.field(2, 0, groupName); - RexNode rightGroup = context.relBuilder.field(2, 1, "__r_" + groupName + "__"); + RexNode rightGroup = context.relBuilder.field(2, 1, toRightSideFieldName(groupName)); RexNode equalCondition = context.relBuilder.equals(leftGroup, rightGroup); if (node.isBucketNullable()) { RexNode bothNull = @@ -2351,8 +2364,7 @@ private RelNode buildStreamWindowSelfJoinPlan( // The aggregate functions need to reference the right-side fields in the joined row // Build aggregate calls using the right-side field references - List aggCalls = - buildAggCallsFromJoinedRight(node.getWindowFunctionList(), leftFieldCount, context); + List aggCalls = buildAggCallsFromJoinedRight(node.getWindowFunctionList(), context); RelBuilder.GroupKey groupKey = context.relBuilder.groupKey( @@ -2389,111 +2401,75 @@ private void collectFieldNames(UnresolvedExpression expr, Set fieldNames /** * Build AggCall list for the self-join plan. The aggregate functions reference fields from the - * right side of the join (offset by leftFieldCount). + * right side of the join, which carry the {@code __r___} prefix applied during right-side + * projection. This method rewrites the window function's field references to those prefixed + * names, unwraps the {@link WindowFunction} to its inner {@link Function}, and then delegates to + * the shared {@link #aggVisitor} so the self-join path reuses the same aggregate-resolution logic + * as regular {@code stats}/{@code eventstats} aggregations. */ private List buildAggCallsFromJoinedRight( - List windowFunctionList, - int leftFieldCount, - CalcitePlanContext context) { + List windowFunctionList, CalcitePlanContext context) { List aggCalls = new ArrayList<>(); for (UnresolvedExpression wfExpr : windowFunctionList) { - String alias = null; - UnresolvedExpression inner = wfExpr; - if (wfExpr instanceof Alias a) { - // Use Alias.getName() for the aggregate output name (e.g. "max(SAL)" or user-defined alias) - alias = a.getName(); - inner = a.getDelegated(); - } - if (inner instanceof WindowFunction wf && wf.getFunction() instanceof Function func) { - String funcName = func.getFuncName().toUpperCase(); - List args = func.getFuncArgs(); - AggCall aggCall = buildSingleAggCall(funcName, args, alias, leftFieldCount, context); - aggCalls.add(aggCall); - } + UnresolvedExpression rewritten = rewriteWindowFunctionForSelfJoin(wfExpr); + aggCalls.add(aggVisitor.analyze(rewritten, context)); } return aggCalls; } - /** Build a single AggCall for the self-join aggregate, referencing right-side fields. */ - private AggCall buildSingleAggCall( - String funcName, - List args, - String alias, - int leftFieldCount, - CalcitePlanContext context) { - // Map function name to Calcite aggregate function - RelBuilder.AggCall aggCall; - if (args.isEmpty()) { - // Only COUNT() supports zero arguments - if (!funcName.equals("COUNT")) { - throw new UnsupportedOperationException( - "Zero-argument window function not supported: " + funcName); - } - aggCall = context.relBuilder.count(); - if (alias != null) { - aggCall = aggCall.as(alias); - } - } else { - // Get the right-side field reference for the argument - String argFieldName = extractFieldNameFromExpr(args.get(0)); - String rightArgFieldName = "__r_" + argFieldName + "__"; - // Find it in the right side (after left fields) - int rightFieldIndex = -1; - RelDataType joinedRowType = context.relBuilder.peek().getRowType(); - List fieldNames = joinedRowType.getFieldNames(); - for (int i = leftFieldCount; i < fieldNames.size(); i++) { - if (fieldNames.get(i).equals(rightArgFieldName)) { - rightFieldIndex = i; - break; - } - } - if (rightFieldIndex == -1) { - throw new IllegalArgumentException( - "Cannot find aggregate input field '" + rightArgFieldName + "' in right side of join"); - } - RexNode argRef = context.relBuilder.field(rightFieldIndex); - - aggCall = - switch (funcName) { - case "AVG" -> context.relBuilder.avg(false, alias, argRef); - case "SUM" -> context.relBuilder.sum(false, alias, argRef); - case "MIN" -> context.relBuilder.min(alias, argRef); - case "MAX" -> context.relBuilder.max(alias, argRef); - case "COUNT" -> context.relBuilder.count(false, alias, argRef); - case "DC", "DISTINCT_COUNT" -> context.relBuilder.count(true, alias, argRef); - case "STDDEV_POP" -> { - AggCall c = context.relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_POP, argRef); - yield alias != null ? c.as(alias) : c; - } - case "STDDEV_SAMP" -> { - AggCall c = context.relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_SAMP, argRef); - yield alias != null ? c.as(alias) : c; - } - case "VAR_POP" -> { - AggCall c = context.relBuilder.aggregateCall(SqlStdOperatorTable.VAR_POP, argRef); - yield alias != null ? c.as(alias) : c; - } - case "VAR_SAMP" -> { - AggCall c = context.relBuilder.aggregateCall(SqlStdOperatorTable.VAR_SAMP, argRef); - yield alias != null ? c.as(alias) : c; - } - case "EARLIEST" -> context.relBuilder.min(alias, argRef); - case "LATEST" -> context.relBuilder.max(alias, argRef); - default -> - throw new UnsupportedOperationException("Unexpected window function: " + funcName); - }; + /** + * Rewrites a streamstats window function expression so that {@link #aggVisitor} can resolve it + * against the joined row type, where right-side fields carry the {@code __r___} prefix: + * + *
    + *
  • Unwraps {@link WindowFunction} to expose its inner {@link Function} (the aggregate). + *
  • Preserves the outer {@link Alias} so the aggregate output keeps its user-visible name. + *
  • Renames every {@link QualifiedName} / {@link Field} reference inside the function body to + * the prefixed right-side column name. + *
+ */ + private UnresolvedExpression rewriteWindowFunctionForSelfJoin(UnresolvedExpression expr) { + if (expr instanceof Alias a) { + return new Alias(a.getName(), rewriteWindowFunctionForSelfJoin(a.getDelegated())); + } + if (expr instanceof WindowFunction wf) { + return rewriteWindowFunctionForSelfJoin(wf.getFunction()); + } + if (expr instanceof Function func) { + List rewrittenArgs = + func.getFuncArgs().stream().map(this::rewriteFieldNamesToRightSide).toList(); + return new Function(func.getFuncName(), rewrittenArgs); } - return aggCall; + return expr; } - private String extractFieldNameFromExpr(UnresolvedExpression expr) { - if (expr instanceof Field f) { - return f.getField().toString(); - } else if (expr instanceof Alias a) { - return extractFieldNameFromExpr(a.getDelegated()); - } else { - throw new IllegalArgumentException("Cannot extract field name from: " + expr); + /** + * Recursively renames field references within an aggregate argument to their right-side alias. + */ + private UnresolvedExpression rewriteFieldNamesToRightSide(UnresolvedExpression expr) { + if (expr instanceof Field f && f.getField() instanceof QualifiedName qn) { + return new Field(toRightSideQualifiedName(qn), f.getFieldArgs()); + } + if (expr instanceof QualifiedName qn) { + return toRightSideQualifiedName(qn); + } + if (expr instanceof Alias a) { + return new Alias(a.getName(), rewriteFieldNamesToRightSide(a.getDelegated())); } + if (expr instanceof Function func) { + List rewrittenArgs = + func.getFuncArgs().stream().map(this::rewriteFieldNamesToRightSide).toList(); + return new Function(func.getFuncName(), rewrittenArgs); + } + return expr; + } + + private static QualifiedName toRightSideQualifiedName(QualifiedName original) { + return new QualifiedName(toRightSideFieldName(original.toString())); + } + + private static String toRightSideFieldName(String originalName) { + return RIGHT_SIDE_FIELD_PREFIX + originalName + RIGHT_SIDE_FIELD_SUFFIX; } private RelNode buildResetHelperColumns(CalcitePlanContext context, StreamWindow node) {