Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .claude/harness/ppl-bugfix-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ Consult this file when you need fix-path-specific guidance or test templates.
1. AST nodes in `core/.../ast/tree/`, functions in `core/.../expression/function/` or `PPLBuiltinOperators`
2. Watch Visitor pattern — sync `AbstractNodeVisitor`, `Analyzer`, `CalciteRelNodeVisitor`, `PPLQueryDataAnonymizer`
3. Test: `verifyLogical()`, `verifyPPLToSparkSQL()`, `verifyResult()`
4. **Before writing a new function-name → Calcite-op switch, try to reuse the existing visitor**
(`aggVisitor` / `rexVisitor` / `CalciteAggCallVisitor` / `CalciteRexNodeVisitor`). If the issue
is that a shared visitor resolves field references against the wrong row (e.g., wrong side of a
join), rewrite the AST field references to reference the correct names and delegate instead of
duplicating the AVG/SUM/MIN/MAX/STDDEV/... mapping by hand.

### Path C — Type System / Semantic Analysis

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -103,6 +104,7 @@
import org.opensearch.sql.ast.expression.ParseMethod;
import org.opensearch.sql.ast.expression.PatternMethod;
import org.opensearch.sql.ast.expression.PatternMode;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.Span;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
Expand Down Expand Up @@ -191,6 +193,18 @@

public class CalciteRelNodeVisitor extends AbstractNodeVisitor<RelNode, CalcitePlanContext> {

/**
* Prefix/suffix applied to right-side fields in the streamstats self-join plan to avoid name
* collisions with the left side and to make the renaming reversible.
*/
private static final String RIGHT_SIDE_FIELD_PREFIX = "__r_";

private static final String RIGHT_SIDE_FIELD_SUFFIX = "__";

/** Name of the right-side sequence column in the streamstats self-join plan. */
private static final String RIGHT_SIDE_SEQ_COLUMN =
RIGHT_SIDE_FIELD_PREFIX + "seq" + RIGHT_SIDE_FIELD_SUFFIX;

private final CalciteRexNodeVisitor rexVisitor;
private final CalciteAggCallVisitor aggVisitor;
private final DataSourceService dataSourceService;
Expand Down Expand Up @@ -2097,14 +2111,14 @@ public RelNode visitStreamWindow(StreamWindow node, CalcitePlanContext context)
context.relBuilder.projectPlus(streamSeq);
RelNode left = context.relBuilder.build();

// 2. Run correlate + aggregate
return buildStreamWindowJoinPlan(
// 2. Use self-join approach to avoid nested correlates (which cause NPE
// in Calcite's RelDecorrelator when chaining multiple streamstats)
return buildStreamWindowSelfJoinPlan(
context,
left,
node,
groupList,
ROW_NUMBER_COLUMN_FOR_STREAMSTATS,
null,
new String[] {ROW_NUMBER_COLUMN_FOR_STREAMSTATS});
}

Expand Down Expand Up @@ -2235,6 +2249,229 @@ private RelNode buildStreamWindowJoinPlan(
return context.relBuilder.peek();
}

/**
* Builds a self-join based plan for streamstats with global=true + window + group. This avoids
* using LogicalCorrelate which causes NPE in Calcite's RelDecorrelator when chaining multiple
* streamstats commands.
*
* <p>Plan structure:
*
* <ol>
* <li>left = input + __stream_seq__
* <li>right = trim to only aggregate input + __stream_seq__
* <li>Join left and right on window frame + group conditions
* <li>Group by all left field indices, compute AGG(right.X)
* <li>Sort by __stream_seq__, then remove it
* </ol>
*/
private RelNode buildStreamWindowSelfJoinPlan(
CalcitePlanContext context,
RelNode leftWithHelpers,
StreamWindow node,
List<UnresolvedExpression> groupList,
String seqCol,
String[] helperColsToCleanup) {

int leftFieldCount = leftWithHelpers.getRowType().getFieldCount();

// Build right side: project only the fields needed for aggregation + seq + group columns
// This avoids field name collisions and keeps the right side minimal
context.relBuilder.push(leftWithHelpers);

// Collect fields needed on right side: seq col + group cols + aggregate input fields
List<RexNode> rightFields = new ArrayList<>();
List<String> rightFieldNames = new ArrayList<>();

// Always include seq col
rightFields.add(context.relBuilder.field(seqCol));
rightFieldNames.add(RIGHT_SIDE_SEQ_COLUMN);

// Include group columns
for (UnresolvedExpression groupExpr : groupList) {
String groupName = extractGroupFieldName(groupExpr);
rightFields.add(context.relBuilder.field(groupName));
rightFieldNames.add(toRightSideFieldName(groupName));
}

// Include aggregate input fields (extract field names from window functions)
Set<String> aggInputFields = new LinkedHashSet<>();
for (UnresolvedExpression wfExpr : node.getWindowFunctionList()) {
collectFieldNames(wfExpr, aggInputFields);
}
// Remove already-included fields
aggInputFields.remove(seqCol);
for (UnresolvedExpression groupExpr : groupList) {
aggInputFields.remove(extractGroupFieldName(groupExpr));
}
for (String aggField : aggInputFields) {
rightFields.add(context.relBuilder.field(aggField));
rightFieldNames.add(toRightSideFieldName(aggField));
}

context.relBuilder.project(rightFields, rightFieldNames);
RelNode rightProjected = context.relBuilder.build();

// Push left and right
context.relBuilder.push(leftWithHelpers);
context.relBuilder.push(rightProjected);

// Build join condition using 2-input references
RexNode leftSeq = context.relBuilder.field(2, 0, seqCol);
RexNode rightSeq = context.relBuilder.field(2, 1, RIGHT_SIDE_SEQ_COLUMN);

// Frame filter
RexNode frameFilter;
if (node.isCurrent()) {
RexNode lower =
context.relBuilder.call(
SqlStdOperatorTable.MINUS, leftSeq, context.relBuilder.literal(node.getWindow() - 1));
frameFilter = context.relBuilder.between(rightSeq, lower, leftSeq);
} else {
RexNode lower =
context.relBuilder.call(
SqlStdOperatorTable.MINUS, leftSeq, context.relBuilder.literal(node.getWindow()));
RexNode upper =
context.relBuilder.call(
SqlStdOperatorTable.MINUS, leftSeq, context.relBuilder.literal(1));
frameFilter = context.relBuilder.between(rightSeq, lower, upper);
}

// Group filter
List<RexNode> groupFilters = new ArrayList<>();
for (UnresolvedExpression groupExpr : groupList) {
String groupName = extractGroupFieldName(groupExpr);
RexNode leftGroup = context.relBuilder.field(2, 0, groupName);
RexNode rightGroup = context.relBuilder.field(2, 1, toRightSideFieldName(groupName));
RexNode equalCondition = context.relBuilder.equals(leftGroup, rightGroup);
if (node.isBucketNullable()) {
RexNode bothNull =
context.relBuilder.and(
context.relBuilder.isNull(leftGroup), context.relBuilder.isNull(rightGroup));
groupFilters.add(context.relBuilder.or(equalCondition, bothNull));
} else {
groupFilters.add(equalCondition);
}
}

RexNode joinCondition =
groupFilters.isEmpty()
? frameFilter
: context.relBuilder.and(frameFilter, context.relBuilder.and(groupFilters));
context.relBuilder.join(JoinRelType.LEFT, joinCondition);

// After join: [left_fields(0..leftFieldCount-1), right_fields(leftFieldCount..)]
// Aggregate: group by all left fields, compute AGG on right fields
// The aggregate functions need to reference the right-side fields in the joined row

// Build aggregate calls using the right-side field references
List<AggCall> aggCalls = buildAggCallsFromJoinedRight(node.getWindowFunctionList(), context);

RelBuilder.GroupKey groupKey =
context.relBuilder.groupKey(
IntStream.range(0, leftFieldCount).mapToObj(context.relBuilder::field).toList());

context.relBuilder.aggregate(groupKey, aggCalls);

// Resort by the sequence column
context.relBuilder.sort(context.relBuilder.field(seqCol));

// Cleanup helper columns
List<RexNode> cleanup = new ArrayList<>();
for (String c : helperColsToCleanup) {
cleanup.add(context.relBuilder.field(c));
}
context.relBuilder.projectExcept(cleanup);
return context.relBuilder.peek();
}

/** Collect field names referenced by an expression tree. */
private void collectFieldNames(UnresolvedExpression expr, Set<String> fieldNames) {
if (expr instanceof Field f) {
fieldNames.add(f.getField().toString());
} else if (expr instanceof Alias a) {
collectFieldNames(a.getDelegated(), fieldNames);
} else if (expr instanceof WindowFunction wf) {
collectFieldNames(wf.getFunction(), fieldNames);
} else if (expr instanceof Function func) {
for (UnresolvedExpression arg : func.getFuncArgs()) {
collectFieldNames(arg, fieldNames);
}
}
}

/**
* Build AggCall list for the self-join plan. The aggregate functions reference fields from the
* right side of the join, which carry the {@code __r_<name>__} prefix applied during right-side
* projection. This method rewrites the window function's field references to those prefixed
* names, unwraps the {@link WindowFunction} to its inner {@link Function}, and then delegates to
* the shared {@link #aggVisitor} so the self-join path reuses the same aggregate-resolution logic
* as regular {@code stats}/{@code eventstats} aggregations.
*/
private List<AggCall> buildAggCallsFromJoinedRight(
List<UnresolvedExpression> windowFunctionList, CalcitePlanContext context) {
List<AggCall> aggCalls = new ArrayList<>();
for (UnresolvedExpression wfExpr : windowFunctionList) {
UnresolvedExpression rewritten = rewriteWindowFunctionForSelfJoin(wfExpr);
aggCalls.add(aggVisitor.analyze(rewritten, context));
}
return aggCalls;
}

/**
* Rewrites a streamstats window function expression so that {@link #aggVisitor} can resolve it
* against the joined row type, where right-side fields carry the {@code __r_<name>__} prefix:
*
* <ul>
* <li>Unwraps {@link WindowFunction} to expose its inner {@link Function} (the aggregate).
* <li>Preserves the outer {@link Alias} so the aggregate output keeps its user-visible name.
* <li>Renames every {@link QualifiedName} / {@link Field} reference inside the function body to
* the prefixed right-side column name.
* </ul>
*/
private UnresolvedExpression rewriteWindowFunctionForSelfJoin(UnresolvedExpression expr) {
if (expr instanceof Alias a) {
return new Alias(a.getName(), rewriteWindowFunctionForSelfJoin(a.getDelegated()));
}
if (expr instanceof WindowFunction wf) {
return rewriteWindowFunctionForSelfJoin(wf.getFunction());
}
if (expr instanceof Function func) {
List<UnresolvedExpression> rewrittenArgs =
func.getFuncArgs().stream().map(this::rewriteFieldNamesToRightSide).toList();
return new Function(func.getFuncName(), rewrittenArgs);
}
return expr;
}

/**
* Recursively renames field references within an aggregate argument to their right-side alias.
*/
private UnresolvedExpression rewriteFieldNamesToRightSide(UnresolvedExpression expr) {
if (expr instanceof Field f && f.getField() instanceof QualifiedName qn) {
return new Field(toRightSideQualifiedName(qn), f.getFieldArgs());
}
if (expr instanceof QualifiedName qn) {
return toRightSideQualifiedName(qn);
}
if (expr instanceof Alias a) {
return new Alias(a.getName(), rewriteFieldNamesToRightSide(a.getDelegated()));
}
if (expr instanceof Function func) {
List<UnresolvedExpression> rewrittenArgs =
func.getFuncArgs().stream().map(this::rewriteFieldNamesToRightSide).toList();
return new Function(func.getFuncName(), rewrittenArgs);
}
return expr;
}

private static QualifiedName toRightSideQualifiedName(QualifiedName original) {
return new QualifiedName(toRightSideFieldName(original.toString()));
}

private static String toRightSideFieldName(String originalName) {
return RIGHT_SIDE_FIELD_PREFIX + originalName + RIGHT_SIDE_FIELD_SUFFIX;
}

private RelNode buildResetHelperColumns(CalcitePlanContext context, StreamWindow node) {
// 1. global sequence to define order
RexNode rowNum =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,41 @@ public void testMultipleStreamstats() throws IOException {
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20, 22.5));
}

@Test
public void testMultipleStreamstatsWithWindow() throws IOException {
// Test case from GitHub issue #4800: chained streamstats with window=2
JSONObject actual =
executeQuery(
String.format(
"source=%s | streamstats window=2 avg(age) as avg_age by state, country"
+ " | streamstats window=2 avg(avg_age) as avg_state_age by country",
TEST_INDEX_STATE_COUNTRY_WITH_NULL));

verifySchemaInOrder(
actual,
schema("name", "string"),
schema("country", "string"),
schema("state", "string"),
schema("month", "int"),
schema("year", "int"),
schema("age", "int"),
schema("avg_age", "double"),
schema("avg_state_age", "double"));

verifyDataRows(
actual,
rows("Jake", "USA", "California", 4, 2023, 70, 70, 70),
rows("Hello", "USA", "New York", 4, 2023, 30, 30, 50),
rows("John", "Canada", "Ontario", 4, 2023, 25, 25, 25),
rows("Jane", "Canada", "Quebec", 4, 2023, 20, 20, 22.5),
rows(null, "Canada", null, 4, 2023, 10, 10, 15),
rows("Kevin", null, null, 4, 2023, null, null, null));
}

// TODO: Fix chained reset_before + window streamstats (nested correlate issue, see #4800)
// The reset path still uses correlate, and the window self-join copies it into the right side,
// causing Calcite's RelDecorrelator to fail on duplicate correlate references.

@Test
public void testMultipleStreamstatsWithNull1() throws IOException {
JSONObject actual =
Expand Down
Loading
Loading