diff --git a/.claude/harness/ppl-bugfix-reference.md b/.claude/harness/ppl-bugfix-reference.md index 9437ab88e1a..20697c83ecb 100644 --- a/.claude/harness/ppl-bugfix-reference.md +++ b/.claude/harness/ppl-bugfix-reference.md @@ -21,6 +21,11 @@ Consult this file when you need fix-path-specific guidance or test templates. 1. AST nodes in `core/.../ast/tree/`, functions in `core/.../expression/function/` or `PPLBuiltinOperators` 2. Watch Visitor pattern — sync `AbstractNodeVisitor`, `Analyzer`, `CalciteRelNodeVisitor`, `PPLQueryDataAnonymizer` 3. Test: `verifyLogical()`, `verifyPPLToSparkSQL()`, `verifyResult()` +4. **Before writing a new function-name → Calcite-op switch, try to reuse the existing visitor** + (`aggVisitor` / `rexVisitor` / `CalciteAggCallVisitor` / `CalciteRexNodeVisitor`). If the issue + is that a shared visitor resolves field references against the wrong row (e.g., wrong side of a + join), rewrite the AST field references to reference the correct names and delegate instead of + duplicating the AVG/SUM/MIN/MAX/STDDEV/... mapping by hand. ### Path C — Type System / Semantic Analysis diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 4e2e3f44b70..404d42cd08c 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -35,6 +35,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -103,6 +104,7 @@ import org.opensearch.sql.ast.expression.ParseMethod; import org.opensearch.sql.ast.expression.PatternMethod; import org.opensearch.sql.ast.expression.PatternMode; +import org.opensearch.sql.ast.expression.QualifiedName; import org.opensearch.sql.ast.expression.Span; import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.expression.UnresolvedExpression; @@ -191,6 +193,18 @@ public class CalciteRelNodeVisitor extends AbstractNodeVisitor { + /** + * Prefix/suffix applied to right-side fields in the streamstats self-join plan to avoid name + * collisions with the left side and to make the renaming reversible. + */ + private static final String RIGHT_SIDE_FIELD_PREFIX = "__r_"; + + private static final String RIGHT_SIDE_FIELD_SUFFIX = "__"; + + /** Name of the right-side sequence column in the streamstats self-join plan. */ + private static final String RIGHT_SIDE_SEQ_COLUMN = + RIGHT_SIDE_FIELD_PREFIX + "seq" + RIGHT_SIDE_FIELD_SUFFIX; + private final CalciteRexNodeVisitor rexVisitor; private final CalciteAggCallVisitor aggVisitor; private final DataSourceService dataSourceService; @@ -2097,14 +2111,14 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context) context.relBuilder.projectPlus(streamSeq); RelNode left = context.relBuilder.build(); - // 2. Run correlate + aggregate - return buildStreamWindowJoinPlan( + // 2. Use self-join approach to avoid nested correlates (which cause NPE + // in Calcite's RelDecorrelator when chaining multiple streamstats) + return buildStreamWindowSelfJoinPlan( context, left, node, groupList, ROW_NUMBER_COLUMN_FOR_STREAMSTATS, - null, new String[] {ROW_NUMBER_COLUMN_FOR_STREAMSTATS}); } @@ -2235,6 +2249,229 @@ private RelNode buildStreamWindowJoinPlan( return context.relBuilder.peek(); } + /** + * Builds a self-join based plan for streamstats with global=true + window + group. This avoids + * using LogicalCorrelate which causes NPE in Calcite's RelDecorrelator when chaining multiple + * streamstats commands. + * + *

Plan structure: + * + *

    + *
  1. left = input + __stream_seq__ + *
  2. right = trim to only aggregate input + __stream_seq__ + *
  3. Join left and right on window frame + group conditions + *
  4. Group by all left field indices, compute AGG(right.X) + *
  5. Sort by __stream_seq__, then remove it + *
+ */ + private RelNode buildStreamWindowSelfJoinPlan( + CalcitePlanContext context, + RelNode leftWithHelpers, + StreamWindow node, + List groupList, + String seqCol, + String[] helperColsToCleanup) { + + int leftFieldCount = leftWithHelpers.getRowType().getFieldCount(); + + // Build right side: project only the fields needed for aggregation + seq + group columns + // This avoids field name collisions and keeps the right side minimal + context.relBuilder.push(leftWithHelpers); + + // Collect fields needed on right side: seq col + group cols + aggregate input fields + List rightFields = new ArrayList<>(); + List rightFieldNames = new ArrayList<>(); + + // Always include seq col + rightFields.add(context.relBuilder.field(seqCol)); + rightFieldNames.add(RIGHT_SIDE_SEQ_COLUMN); + + // Include group columns + for (UnresolvedExpression groupExpr : groupList) { + String groupName = extractGroupFieldName(groupExpr); + rightFields.add(context.relBuilder.field(groupName)); + rightFieldNames.add(toRightSideFieldName(groupName)); + } + + // Include aggregate input fields (extract field names from window functions) + Set aggInputFields = new LinkedHashSet<>(); + for (UnresolvedExpression wfExpr : node.getWindowFunctionList()) { + collectFieldNames(wfExpr, aggInputFields); + } + // Remove already-included fields + aggInputFields.remove(seqCol); + for (UnresolvedExpression groupExpr : groupList) { + aggInputFields.remove(extractGroupFieldName(groupExpr)); + } + for (String aggField : aggInputFields) { + rightFields.add(context.relBuilder.field(aggField)); + rightFieldNames.add(toRightSideFieldName(aggField)); + } + + context.relBuilder.project(rightFields, rightFieldNames); + RelNode rightProjected = context.relBuilder.build(); + + // Push left and right + context.relBuilder.push(leftWithHelpers); + context.relBuilder.push(rightProjected); + + // Build join condition using 2-input references + RexNode leftSeq = context.relBuilder.field(2, 0, seqCol); + RexNode rightSeq = context.relBuilder.field(2, 1, RIGHT_SIDE_SEQ_COLUMN); + + // Frame filter + RexNode frameFilter; + if (node.isCurrent()) { + RexNode lower = + context.relBuilder.call( + SqlStdOperatorTable.MINUS, leftSeq, context.relBuilder.literal(node.getWindow() - 1)); + frameFilter = context.relBuilder.between(rightSeq, lower, leftSeq); + } else { + RexNode lower = + context.relBuilder.call( + SqlStdOperatorTable.MINUS, leftSeq, context.relBuilder.literal(node.getWindow())); + RexNode upper = + context.relBuilder.call( + SqlStdOperatorTable.MINUS, leftSeq, context.relBuilder.literal(1)); + frameFilter = context.relBuilder.between(rightSeq, lower, upper); + } + + // Group filter + List groupFilters = new ArrayList<>(); + for (UnresolvedExpression groupExpr : groupList) { + String groupName = extractGroupFieldName(groupExpr); + RexNode leftGroup = context.relBuilder.field(2, 0, groupName); + RexNode rightGroup = context.relBuilder.field(2, 1, toRightSideFieldName(groupName)); + RexNode equalCondition = context.relBuilder.equals(leftGroup, rightGroup); + if (node.isBucketNullable()) { + RexNode bothNull = + context.relBuilder.and( + context.relBuilder.isNull(leftGroup), context.relBuilder.isNull(rightGroup)); + groupFilters.add(context.relBuilder.or(equalCondition, bothNull)); + } else { + groupFilters.add(equalCondition); + } + } + + RexNode joinCondition = + groupFilters.isEmpty() + ? frameFilter + : context.relBuilder.and(frameFilter, context.relBuilder.and(groupFilters)); + context.relBuilder.join(JoinRelType.LEFT, joinCondition); + + // After join: [left_fields(0..leftFieldCount-1), right_fields(leftFieldCount..)] + // Aggregate: group by all left fields, compute AGG on right fields + // The aggregate functions need to reference the right-side fields in the joined row + + // Build aggregate calls using the right-side field references + List aggCalls = buildAggCallsFromJoinedRight(node.getWindowFunctionList(), context); + + RelBuilder.GroupKey groupKey = + context.relBuilder.groupKey( + IntStream.range(0, leftFieldCount).mapToObj(context.relBuilder::field).toList()); + + context.relBuilder.aggregate(groupKey, aggCalls); + + // Resort by the sequence column + context.relBuilder.sort(context.relBuilder.field(seqCol)); + + // Cleanup helper columns + List cleanup = new ArrayList<>(); + for (String c : helperColsToCleanup) { + cleanup.add(context.relBuilder.field(c)); + } + context.relBuilder.projectExcept(cleanup); + return context.relBuilder.peek(); + } + + /** Collect field names referenced by an expression tree. */ + private void collectFieldNames(UnresolvedExpression expr, Set fieldNames) { + if (expr instanceof Field f) { + fieldNames.add(f.getField().toString()); + } else if (expr instanceof Alias a) { + collectFieldNames(a.getDelegated(), fieldNames); + } else if (expr instanceof WindowFunction wf) { + collectFieldNames(wf.getFunction(), fieldNames); + } else if (expr instanceof Function func) { + for (UnresolvedExpression arg : func.getFuncArgs()) { + collectFieldNames(arg, fieldNames); + } + } + } + + /** + * Build AggCall list for the self-join plan. The aggregate functions reference fields from the + * right side of the join, which carry the {@code __r___} prefix applied during right-side + * projection. This method rewrites the window function's field references to those prefixed + * names, unwraps the {@link WindowFunction} to its inner {@link Function}, and then delegates to + * the shared {@link #aggVisitor} so the self-join path reuses the same aggregate-resolution logic + * as regular {@code stats}/{@code eventstats} aggregations. + */ + private List buildAggCallsFromJoinedRight( + List windowFunctionList, CalcitePlanContext context) { + List aggCalls = new ArrayList<>(); + for (UnresolvedExpression wfExpr : windowFunctionList) { + UnresolvedExpression rewritten = rewriteWindowFunctionForSelfJoin(wfExpr); + aggCalls.add(aggVisitor.analyze(rewritten, context)); + } + return aggCalls; + } + + /** + * Rewrites a streamstats window function expression so that {@link #aggVisitor} can resolve it + * against the joined row type, where right-side fields carry the {@code __r___} prefix: + * + *
    + *
  • Unwraps {@link WindowFunction} to expose its inner {@link Function} (the aggregate). + *
  • Preserves the outer {@link Alias} so the aggregate output keeps its user-visible name. + *
  • Renames every {@link QualifiedName} / {@link Field} reference inside the function body to + * the prefixed right-side column name. + *
+ */ + private UnresolvedExpression rewriteWindowFunctionForSelfJoin(UnresolvedExpression expr) { + if (expr instanceof Alias a) { + return new Alias(a.getName(), rewriteWindowFunctionForSelfJoin(a.getDelegated())); + } + if (expr instanceof WindowFunction wf) { + return rewriteWindowFunctionForSelfJoin(wf.getFunction()); + } + if (expr instanceof Function func) { + List rewrittenArgs = + func.getFuncArgs().stream().map(this::rewriteFieldNamesToRightSide).toList(); + return new Function(func.getFuncName(), rewrittenArgs); + } + return expr; + } + + /** + * Recursively renames field references within an aggregate argument to their right-side alias. + */ + private UnresolvedExpression rewriteFieldNamesToRightSide(UnresolvedExpression expr) { + if (expr instanceof Field f && f.getField() instanceof QualifiedName qn) { + return new Field(toRightSideQualifiedName(qn), f.getFieldArgs()); + } + if (expr instanceof QualifiedName qn) { + return toRightSideQualifiedName(qn); + } + if (expr instanceof Alias a) { + return new Alias(a.getName(), rewriteFieldNamesToRightSide(a.getDelegated())); + } + if (expr instanceof Function func) { + List rewrittenArgs = + func.getFuncArgs().stream().map(this::rewriteFieldNamesToRightSide).toList(); + return new Function(func.getFuncName(), rewrittenArgs); + } + return expr; + } + + private static QualifiedName toRightSideQualifiedName(QualifiedName original) { + return new QualifiedName(toRightSideFieldName(original.toString())); + } + + private static String toRightSideFieldName(String originalName) { + return RIGHT_SIDE_FIELD_PREFIX + originalName + RIGHT_SIDE_FIELD_SUFFIX; + } + private RelNode buildResetHelperColumns(CalcitePlanContext context, StreamWindow node) { // 1. global sequence to define order RexNode rowNum = diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java index dcf36f510bf..fa0b21e622f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java @@ -823,6 +823,41 @@ public void testMultipleStreamstats() throws IOException { rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20, 22.5)); } + @Test + public void testMultipleStreamstatsWithWindow() throws IOException { + // Test case from GitHub issue #4800: chained streamstats with window=2 + JSONObject actual = + executeQuery( + String.format( + "source=%s | streamstats window=2 avg(age) as avg_age by state, country" + + " | streamstats window=2 avg(avg_age) as avg_state_age by country", + TEST_INDEX_STATE_COUNTRY_WITH_NULL)); + + verifySchemaInOrder( + actual, + schema("name", "string"), + schema("country", "string"), + schema("state", "string"), + schema("month", "int"), + schema("year", "int"), + schema("age", "int"), + schema("avg_age", "double"), + schema("avg_state_age", "double")); + + verifyDataRows( + actual, + rows("Jake", "USA", "California", 4, 2023, 70, 70, 70), + rows("Hello", "USA", "New York", 4, 2023, 30, 30, 50), + rows("John", "Canada", "Ontario", 4, 2023, 25, 25, 25), + rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20, 22.5), + rows(null, "Canada", null, 4, 2023, 10, 10, 15), + rows("Kevin", null, null, 4, 2023, null, null, null)); + } + + // TODO: Fix chained reset_before + window streamstats (nested correlate issue, see #4800) + // The reset path still uses correlate, and the window self-join copies it into the right side, + // causing Calcite's RelDecorrelator to fail on duplicate correlate references. + @Test public void testMultipleStreamstatsWithNull1() throws IOException { JSONObject actual = diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml index d0c57f3b08a..124539b9d4c 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml @@ -3,29 +3,19 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($0)]) - LogicalProject(age=[$8]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], avg_age=[AVG($20)]) + LogicalJoin(condition=[AND(>=($18, -($17, 1)), <=($18, $17), IS NOT DISTINCT FROM($4, $19))], joinType=[left]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalProject(__r_seq__=[ROW_NUMBER() OVER ()], __r_gender__=[$4], __r_age__=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18]) - EnumerableLimit(fetch=[10000]) - EnumerableMergeJoin(condition=[AND(=($11, $15), =($12, $16), =($13, $17), IS NOT DISTINCT FROM($4, $14))], joinType=[left]) - EnumerableSort(sort0=[$11], sort1=[$12], sort2=[$13], dir0=[ASC], dir1=[ASC], dir2=[ASC]) - EnumerableCalc(expr#0..11=[{inputs}], expr#12=[1], expr#13=[-($t11, $t12)], expr#14=[IS NULL($t4)], proj#0..11=[{exprs}], $f12=[$t13], $f15=[$t14]) + EnumerableCalc(expr#0..19=[{inputs}], expr#20=[0], expr#21=[=($t19, $t20)], expr#22=[null:BIGINT], expr#23=[CASE($t21, $t22, $t18)], expr#24=[CAST($t23):DOUBLE], expr#25=[/($t24, $t19)], proj#0..10=[{exprs}], avg_age=[$t25]) + CalciteEnumerableTopK(sort0=[$17], dir0=[ASC], fetch=[10000]) + EnumerableAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], agg#0=[$SUM0($20)], agg#1=[COUNT($20)]) + EnumerableNestedLoopJoin(condition=[AND(>=($18, -($17, 1)), <=($18, $17), IS NOT DISTINCT FROM($4, $19))], joinType=[left]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableCalc(expr#0..2=[{inputs}], __r_seq__=[$t2], __r_gender__=[$t0], __r_age__=[$t1]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[ASC], dir2=[ASC]) - EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[=($t5, $t6)], expr#8=[null:BIGINT], expr#9=[CASE($t7, $t8, $t4)], expr#10=[CAST($t9):DOUBLE], expr#11=[/($t10, $t5)], proj#0..3=[{exprs}], avg_age=[$t11]) - EnumerableAggregate(group=[{0, 1, 2, 3}], agg#0=[$SUM0($5)], agg#1=[COUNT($5)]) - EnumerableNestedLoopJoin(condition=[AND(>=($6, $2), <=($6, $1), OR(=($4, $0), AND(IS NULL($4), $3)))], joinType=[inner]) - EnumerableAggregate(group=[{0, 1, 2, 3}]) - EnumerableCalc(expr#0..1=[{inputs}], expr#2=[1], expr#3=[-($t1, $t2)], expr#4=[IS NULL($t0)], proj#0..1=[{exprs}], $f12=[$t3], $f15=[$t4]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml index 52d485482b7..a1cf6ae00e9 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml @@ -3,29 +3,21 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($0)]) - LogicalProject(age=[$8]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], avg_age=[AVG($20)]) + LogicalJoin(condition=[AND(>=($18, -($17, 1)), <=($18, $17), =($4, $19))], joinType=[left]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalProject(__r_seq__=[ROW_NUMBER() OVER ()], __r_gender__=[$4], __r_age__=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) - CalciteEnumerableTopK(sort0=[$11], dir0=[ASC], fetch=[10000]) - EnumerableMergeJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) - EnumerableSort(sort0=[$4], sort1=[$11], sort2=[$12], dir0=[ASC], dir1=[ASC], dir2=[ASC]) - EnumerableCalc(expr#0..11=[{inputs}], expr#12=[1], expr#13=[-($t11, $t12)], proj#0..11=[{exprs}], $f12=[$t13]) + EnumerableCalc(expr#0..19=[{inputs}], expr#20=[0], expr#21=[=($t19, $t20)], expr#22=[null:BIGINT], expr#23=[CASE($t21, $t22, $t18)], expr#24=[CAST($t23):DOUBLE], expr#25=[/($t24, $t19)], proj#0..10=[{exprs}], avg_age=[$t25]) + CalciteEnumerableTopK(sort0=[$17], dir0=[ASC], fetch=[10000]) + EnumerableAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], agg#0=[$SUM0($20)], agg#1=[COUNT($20)]) + EnumerableMergeJoin(condition=[AND(=($4, $19), >=($18, -($17, 1)), <=($18, $17))], joinType=[left]) + EnumerableSort(sort0=[$4], dir0=[ASC]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","balance","gender","city","employer","state","age","email","lastname"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableSort(sort0=[$0], sort1=[$1], sort2=[$2], dir0=[ASC], dir1=[ASC], dir2=[ASC]) - EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[=($t4, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t3)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t4)], proj#0..2=[{exprs}], avg_age=[$t10]) - EnumerableAggregate(group=[{0, 1, 2}], agg#0=[$SUM0($4)], agg#1=[COUNT($4)]) - EnumerableHashJoin(condition=[AND(=($0, $3), >=($5, $2), <=($5, $1))], joinType=[inner]) - EnumerableAggregate(group=[{0, 1, 2}]) - EnumerableCalc(expr#0..1=[{inputs}], expr#2=[1], expr#3=[-($t1, $t2)], proj#0..1=[{exprs}], $f12=[$t3]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableSort(sort0=[$1], dir0=[ASC]) + EnumerableCalc(expr#0..2=[{inputs}], __r_seq__=[$t2], __r_gender__=[$t0], __r_age__=[$t1]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[gender, age]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["gender","age"]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml index 522e7922e68..c56cd5d1bce 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml @@ -3,30 +3,20 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($0)]) - LogicalProject(age=[$8]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), OR(=($4, $cor0.gender), AND(IS NULL($4), IS NULL($cor0.gender))))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], avg_age=[AVG($20)]) + LogicalJoin(condition=[AND(>=($18, -($17, 1)), <=($18, $17), IS NOT DISTINCT FROM($4, $19))], joinType=[left]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalProject(__r_seq__=[ROW_NUMBER() OVER ()], __r_gender__=[$4], __r_age__=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..18=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t18]) + EnumerableCalc(expr#0..19=[{inputs}], expr#20=[0], expr#21=[=($t19, $t20)], expr#22=[null:BIGINT], expr#23=[CASE($t21, $t22, $t18)], expr#24=[CAST($t23):DOUBLE], expr#25=[/($t24, $t19)], proj#0..10=[{exprs}], avg_age=[$t25]) EnumerableLimit(fetch=[10000]) - EnumerableMergeJoin(condition=[AND(=($11, $15), =($12, $16), =($13, $17), IS NOT DISTINCT FROM($4, $14))], joinType=[left]) - EnumerableSort(sort0=[$11], sort1=[$12], sort2=[$13], dir0=[ASC], dir1=[ASC], dir2=[ASC]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], expr#20=[IS NULL($t4)], proj#0..10=[{exprs}], __stream_seq__=[$t17], $f12=[$t19], $f15=[$t20]) + EnumerableSort(sort0=[$17], dir0=[ASC]) + EnumerableAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], agg#0=[$SUM0($20)], agg#1=[COUNT($20)]) + EnumerableNestedLoopJoin(condition=[AND(>=($18, -($17, 1)), <=($18, $17), IS NOT DISTINCT FROM($4, $19))], joinType=[left]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - EnumerableSort(sort0=[$1], sort1=[$2], sort2=[$3], dir0=[ASC], dir1=[ASC], dir2=[ASC]) - EnumerableCalc(expr#0..5=[{inputs}], expr#6=[0], expr#7=[=($t5, $t6)], expr#8=[null:BIGINT], expr#9=[CASE($t7, $t8, $t4)], expr#10=[CAST($t9):DOUBLE], expr#11=[/($t10, $t5)], proj#0..3=[{exprs}], avg_age=[$t11]) - EnumerableAggregate(group=[{0, 1, 2, 3}], agg#0=[$SUM0($5)], agg#1=[COUNT($5)]) - EnumerableNestedLoopJoin(condition=[AND(>=($6, $2), <=($6, $1), OR(=($4, $0), AND(IS NULL($4), $3)))], joinType=[inner]) - EnumerableAggregate(group=[{0, 1, 2, 3}]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], expr#20=[IS NULL($t4)], gender=[$t4], __stream_seq__=[$t17], $f12=[$t19], $f15=[$t20]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - EnumerableCalc(expr#0..17=[{inputs}], gender=[$t4], age=[$t8], $2=[$t17]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file + EnumerableCalc(expr#0..17=[{inputs}], __r_seq__=[$t17], __r_gender__=[$t4], __r_age__=[$t8]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml index a0634448b5e..d72bf7b429f 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml @@ -3,29 +3,22 @@ calcite: LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_age=[$18]) LogicalSort(sort0=[$17], dir0=[ASC]) - LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{4, 17}]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - LogicalAggregate(group=[{}], avg_age=[AVG($0)]) - LogicalProject(age=[$8]) - LogicalFilter(condition=[AND(>=($17, -($cor0.__stream_seq__, 1)), <=($17, $cor0.__stream_seq__), =($4, $cor0.gender))]) - LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) - CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], avg_age=[AVG($20)]) + LogicalJoin(condition=[AND(>=($18, -($17, 1)), <=($18, $17), =($4, $19))], joinType=[left]) + LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], _id=[$11], _index=[$12], _score=[$13], _maxscore=[$14], _sort=[$15], _routing=[$16], __stream_seq__=[ROW_NUMBER() OVER ()]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + LogicalProject(__r_seq__=[ROW_NUMBER() OVER ()], __r_gender__=[$4], __r_age__=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableCalc(expr#0..16=[{inputs}], proj#0..10=[{exprs}], avg_age=[$t16]) + EnumerableCalc(expr#0..19=[{inputs}], expr#20=[0], expr#21=[=($t19, $t20)], expr#22=[null:BIGINT], expr#23=[CASE($t21, $t22, $t18)], expr#24=[CAST($t23):DOUBLE], expr#25=[/($t24, $t19)], proj#0..10=[{exprs}], avg_age=[$t25]) EnumerableLimit(fetch=[10000]) - EnumerableHashJoin(condition=[AND(=($4, $13), =($11, $14), =($12, $15))], joinType=[left]) - EnumerableSort(sort0=[$11], dir0=[ASC]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], proj#0..10=[{exprs}], __stream_seq__=[$t17], $f12=[$t19]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - EnumerableCalc(expr#0..4=[{inputs}], expr#5=[0], expr#6=[=($t4, $t5)], expr#7=[null:BIGINT], expr#8=[CASE($t6, $t7, $t3)], expr#9=[CAST($t8):DOUBLE], expr#10=[/($t9, $t4)], proj#0..2=[{exprs}], avg_age=[$t10]) - EnumerableAggregate(group=[{0, 1, 2}], agg#0=[$SUM0($4)], agg#1=[COUNT($4)]) - EnumerableHashJoin(condition=[AND(=($0, $3), >=($5, $2), <=($5, $1))], joinType=[inner]) - EnumerableAggregate(group=[{0, 1, 2}]) - EnumerableCalc(expr#0..17=[{inputs}], expr#18=[1], expr#19=[-($t17, $t18)], gender=[$t4], __stream_seq__=[$t17], $f12=[$t19]) - EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) - EnumerableCalc(expr#0..17=[{inputs}], gender=[$t4], age=[$t8], $2=[$t17]) + EnumerableSort(sort0=[$17], dir0=[ASC]) + EnumerableAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}], agg#0=[$SUM0($20)], agg#1=[COUNT($20)]) + EnumerableMergeJoin(condition=[AND(=($4, $19), >=($18, -($17, 1)), <=($18, $17))], joinType=[left]) + EnumerableSort(sort0=[$4], dir0=[ASC]) + EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + EnumerableSort(sort0=[$1], dir0=[ASC]) + EnumerableCalc(expr#0..17=[{inputs}], __r_seq__=[$t17], __r_gender__=[$t4], __r_age__=[$t8]) EnumerableWindow(window#0=[window(rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) \ No newline at end of file + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4800.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4800.yml new file mode 100644 index 00000000000..ef4c4769191 --- /dev/null +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4800.yml @@ -0,0 +1,57 @@ +setup: + - skip: + features: + - headers + - allowed_warnings + - do: + query.settings: + body: + transient: + plugins.calcite.enabled: true + - do: + indices.create: + index: stream_test_null + body: + mappings: + properties: + name: { type: keyword } + age: { type: integer } + state: { type: keyword } + country: { type: keyword } + year: { type: integer } + month: { type: integer } + - do: + bulk: + index: stream_test_null + refresh: true + body: + - '{"index": {"_id": "1"}}' + - '{"name": "Jake", "age": 70, "state": "California", "country": "USA", "year": 2023, "month": 4}' + - '{"index": {"_id": "2"}}' + - '{"name": "Hello", "age": 30, "state": "New York", "country": "USA", "year": 2023, "month": 4}' + - '{"index": {"_id": "3"}}' + - '{"name": "John", "age": 25, "state": "Ontario", "country": "Canada", "year": 2023, "month": 4}' + - '{"index": {"_id": "4"}}' + - '{"name": "Jane", "age": 20, "state": "Quebec", "country": "Canada", "year": 2023, "month": 4}' + - '{"index": {"_id": "5"}}' + - '{"name": null, "age": 10, "state": null, "country": "Canada", "year": 2023, "month": 4}' + - '{"index": {"_id": "6"}}' + - '{"name": "Kevin", "year": 2023, "month": 4}' + +--- +teardown: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled: false + +--- +"Chained streamstats with window should not cause NPE (#4800)": + - do: + headers: + Content-Type: 'application/json' + ppl: + body: + query: 'source=stream_test_null | streamstats window=2 avg(age) as avg_age by state, country | streamstats window=2 avg(avg_age) as avg_state_age by country' + - match: {"total": 6} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java index 637e8f19820..2e4b6a605dd 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java @@ -5,6 +5,10 @@ package org.opensearch.sql.ppl.calcite; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import org.apache.calcite.rel.RelNode; import org.apache.calcite.test.CalciteAssert; import org.junit.Test; @@ -92,41 +96,21 @@ public void testStreamstatsCurrent() { public void testStreamstatsWindow() { String ppl = "source=EMP | streamstats window = 5 max(SAL) by DEPTNO"; RelNode root = getRelNode(ppl); + // Uses self-join plan to avoid nested correlates that cause NPE in Calcite's decorrelator String expectedLogical = "LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5]," + " COMM=[$6], DEPTNO=[$7], max(SAL)=[$9])\n" + " LogicalSort(sort0=[$8], dir0=[ASC])\n" - + " LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{7," - + " 8}])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " LogicalAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7, 8}], max(SAL)=[MAX($11)])\n" + + " LogicalJoin(condition=[AND(>=($9, -($8, 4)), <=($9, $8), IS NOT DISTINCT" + + " FROM($7, $10))], joinType=[left])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + " SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER ()])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalAggregate(group=[{}], max(SAL)=[MAX($0)])\n" - + " LogicalProject(SAL=[$5])\n" - + " LogicalFilter(condition=[AND(>=($8, -($cor0.__stream_seq__, 4)), <=($8," - + " $cor0.__stream_seq__), OR(=($7, $cor0.DEPTNO), AND(IS NULL($7), IS" - + " NULL($cor0.DEPTNO))))])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3]," - + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], __stream_seq__=[ROW_NUMBER() OVER" - + " ()])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(__r_seq__=[ROW_NUMBER() OVER ()], __r_DEPTNO__=[$7]," + + " __r_SAL__=[$5])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); - - String expectedSparkSql = - "SELECT `$cor0`.`EMPNO`, `$cor0`.`ENAME`, `$cor0`.`JOB`, `$cor0`.`MGR`, `$cor0`.`HIREDATE`," - + " `$cor0`.`SAL`, `$cor0`.`COMM`, `$cor0`.`DEPTNO`, `t3`.`max(SAL)`\n" - + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," - + " ROW_NUMBER() OVER () `__stream_seq__`\n" - + "FROM `scott`.`EMP`) `$cor0`,\n" - + "LATERAL (SELECT MAX(`SAL`) `max(SAL)`\n" - + "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`," - + " ROW_NUMBER() OVER () `__stream_seq__`\n" - + "FROM `scott`.`EMP`) `t0`\n" - + "WHERE `__stream_seq__` >= `$cor0`.`__stream_seq__` - 4 AND `__stream_seq__` <=" - + " `$cor0`.`__stream_seq__` AND (`DEPTNO` = `$cor0`.`DEPTNO` OR `DEPTNO` IS NULL AND" - + " `$cor0`.`DEPTNO` IS NULL)) `t3`\n" - + "ORDER BY `$cor0`.`__stream_seq__` NULLS LAST"; - verifyPPLToSparkSQL(root, expectedSparkSql); } @Test @@ -223,6 +207,22 @@ public void testStreamstatsReset() { verifyPPLToSparkSQL(root, expectedSparkSql); } + @Test + public void testMultipleStreamstatsWithWindow() { + String ppl = + "source=EMP | streamstats window=2 avg(SAL) as avg_sal by DEPTNO" + + " | streamstats window=2 avg(avg_sal) as avg_dept_sal by DEPTNO"; + RelNode root = getRelNode(ppl); + assertNotNull("Chained streamstats with window should produce a valid plan", root); + // Verify the plan uses self-join (LogicalJoin) instead of LogicalCorrelate + String plan = root.explain(); + assertTrue( + "Plan should contain LogicalJoin for self-join approach", plan.contains("LogicalJoin")); + assertFalse( + "Plan should not contain LogicalCorrelate for window+group streamstats", + plan.contains("LogicalCorrelate")); + } + @Test public void testStreamstatsWithReverse() { String ppl = "source=EMP | streamstats max(SAL) by DEPTNO | reverse";