Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,17 @@
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.Uncollect;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFamily;
Expand Down Expand Up @@ -139,7 +147,6 @@
import org.opensearch.sql.ast.tree.Rex;
import org.opensearch.sql.ast.tree.SPath;
import org.opensearch.sql.ast.tree.Search;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.ast.tree.StreamWindow;
import org.opensearch.sql.ast.tree.SubqueryAlias;
Expand Down Expand Up @@ -633,7 +640,7 @@ private void removeFieldIfExists(
}

@Override
public RelNode visitSort(Sort node, CalcitePlanContext context) {
public RelNode visitSort(org.opensearch.sql.ast.tree.Sort node, CalcitePlanContext context) {
visitChildren(node, context);
List<RexNode> sortList =
node.getSortList().stream()
Expand Down Expand Up @@ -681,25 +688,161 @@ public RelNode visitHead(Head node, CalcitePlanContext context) {
return context.relBuilder.peek();
}

private static final String REVERSE_ROW_NUM = "__reverse_row_num__";
/**
* Backtrack through the RelNode tree to find the first Sort node with non-empty collation. Stops
* at blocking operators that break ordering:
*
* <ul>
* <li>Aggregate - aggregation destroys input ordering
* <li>BiRel - covers Join, Correlate, and other binary relations
* <li>SetOp - covers Union, Intersect, Except
* <li>Uncollect - unnesting operation that may change ordering
* <li>Project with window functions (RexOver) - ordering determined by window's ORDER BY
* </ul>
*
* @param node the starting RelNode to backtrack from
* @return the collation found, or null if no sort or blocking operator encountered
*/
private RelCollation backtrackForCollation(RelNode node) {
while (node != null) {
// Check for blocking operators that destroy collation
// BiRel covers Join, Correlate, and other binary relations
// SetOp covers Union, Intersect, Except
// Uncollect unnests arrays/multisets which may change ordering
if (node instanceof Aggregate
|| node instanceof BiRel
|| node instanceof SetOp
|| node instanceof Uncollect) {
return null;
}

// Project with window functions has ordering determined by the window's ORDER BY clause
// We should not destroy its output order by inserting a reversed sort
if (node instanceof LogicalProject && ((LogicalProject) node).containsOver()) {
return null;
}

// Check for Sort node with collation
if (node instanceof Sort) {
Sort sort = (Sort) node;
if (sort.getCollation() != null && !sort.getCollation().getFieldCollations().isEmpty()) {
return sort.getCollation();
}
}

// Continue to child node
if (node.getInputs().isEmpty()) {
break;
}
node = node.getInput(0);
}
return null;
}

/**
* Insert a reversed sort node after finding the original sort in the tree. This rebuilds the tree
* with the reversed sort inserted right after the original sort.
*
* @param root the root of the tree to rebuild
* @param reversedCollation the reversed collation to insert
* @param context the Calcite plan context
* @return the rebuilt tree with reversed sort inserted
*/
private RelNode insertReversedSortInTree(
RelNode root, RelCollation reversedCollation, CalcitePlanContext context) {
return root.accept(
new RelHomogeneousShuttle() {
boolean sortFound = false;

@Override
public RelNode visit(RelNode other) {
if (!sortFound && other instanceof Sort) {
Sort sort = (Sort) other;
// Treat a Sort with fetch or offset as a barrier (limit node).
// Place the reversed sort above the barrier to preserve limit semantics,
// rather than inserting below the downstream collation Sort.
if (sort.fetch != null || sort.offset != null) {
sortFound = true;
RelNode visitedBarrier = super.visit(other);
return LogicalSort.create(visitedBarrier, reversedCollation, null, null);
}
// Found a collation Sort - replace in-place with reversed collation.
// Stacking a reversed sort on top would create consecutive sorts, and
// Calcite's SortRemoveRule would merge them keeping the original direction.
if (sort.getCollation() != null
&& !sort.getCollation().getFieldCollations().isEmpty()) {
sortFound = true;
RelNode visitedInput = sort.getInput().accept(this);
return LogicalSort.create(visitedInput, reversedCollation, null, null);
}
}
// For all other nodes, continue traversal
return super.visit(other);
}
});
}

@Override
public RelNode visitReverse(
org.opensearch.sql.ast.tree.Reverse node, CalcitePlanContext context) {
visitChildren(node, context);
// Add ROW_NUMBER() column
RexNode rowNumber =
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(REVERSE_ROW_NUM);
context.relBuilder.projectPlus(rowNumber);
// Sort by row number descending
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field(REVERSE_ROW_NUM)));
// Remove row number column
context.relBuilder.projectExcept(context.relBuilder.field(REVERSE_ROW_NUM));

// Check if there's an existing sort to reverse
List<RelCollation> collations =
context.relBuilder.getCluster().getMetadataQuery().collations(context.relBuilder.peek());
RelCollation collation = collations != null && !collations.isEmpty() ? collations.get(0) : null;

if (collation != null && !collation.getFieldCollations().isEmpty()) {
// If there's an existing sort, reverse its direction
RelCollation reversedCollation = PlanUtils.reverseCollation(collation);
RelNode currentNode = context.relBuilder.peek();
if (currentNode instanceof Sort) {
Sort existingSort = (Sort) currentNode;
if (existingSort.getCollation() != null
&& !existingSort.getCollation().getFieldCollations().isEmpty()
&& existingSort.fetch == null
&& existingSort.offset == null) {
// Pure collation sort (no fetch/offset) - replace in-place to avoid consecutive
// sorts. Calcite's SortRemoveRule merges consecutive LogicalSort nodes and keeps
// the lower sort's direction, which discards the reversed direction.
// Replacing in-place avoids this issue.
RelCollation reversedFromSort = PlanUtils.reverseCollation(existingSort.getCollation());
RelNode replacedSort =
LogicalSort.create(existingSort.getInput(), reversedFromSort, null, null);
PlanUtils.replaceTop(context.relBuilder, replacedSort);
Comment on lines 801 to 812
Copy link
Contributor

@songkant-aws songkant-aws Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this observation correct? The logic is weird because that means an explicit query like source =t | sort -a | sort a could be equivalent to the query of source=t | sort -a. I think it's not merge. The worst case is it needs double sort in case of no pushdown. But in pushdown case, I think the sort will be pushed twice. And finally the top sort will override the scan's traitset.

I checked the PR out in my local env. When I remove this piece of code, the explainIT still gives me the expected collation in physical plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The in-place replacement is intentional and necessary. The issue is that explainIT only shows the logical plan before Calcite's physical optimizer runs. During actual execution, SortRemoveRule merges consecutive LogicalSort nodes and keeps the lower sort's direction, effectively discarding the reversed one. So for a query like sort a | reverse, the plan becomes Sort(ASC) → Sort(DESC), and the optimizer collapses it back to Sort(ASC).

I confirmed this by running the integration tests (CalciteReverseCommandIT) — they fail without the in-place replacement. The workaround replaces the existing Sort node directly instead of stacking a new one on top, so there are no consecutive sorts for the optimizer to merge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

detailed description regarding this issue: #5125

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the conclusion is not true. Explained plans in explainIT include both original logical plan and optimized physical plan. Calcite's VolcanoPlanner optimizes both logical plans and physical plans, then the final optimized physical plan will generate a compiled Java code to execute the query. Rules never execute the query.

The SortRemoveRule replaces an explicit sort operator with a trait requirement on the child RelSubset. It doesn't do the simple merge. It assigns ordering trait to different RelSubsets. VolcanoPlanner will compute the cheapest plan from different RelSubsets. A simplified process is like 1st round SortRemoveRule changes Sort(1 Desc) - Sort(1 Asc) - RelSubset#0(NONE) => Sort(1 Desc) - RelSubset#1(1 Asc), 2nd round SortRemoveRule changes Sort(1 Desc) - RelSubset#2(1 Asc) => RelSubset#3(1 Desc). The final plan must satisfy the (1 Desc) ordering requirements.

The actual root cause is there is a shallow copy bug in AggPushDownAction. In case of testReverseAfterAggregationWithSort test case failure you mentioned in the issue. The actual optimized physical plan is:

// query: source=%s | stats count() as c by gender | sort gender | reverse
calcite:
  logical: |
    LogicalSystemLimit(sort0=[$1], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT])
      LogicalSort(sort0=[$1], dir0=[DESC-nulls-last])
        LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])
          LogicalProject(c=[$1], gender=[$0])
            LogicalAggregate(group=[{0}], c=[COUNT()])
              LogicalProject(gender=[$4])
                CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])
  physical: |
    CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#50:LogicalAggregate.NONE.[](input=RelSubset#49,group={0},c=COUNT()), PROJECT->[c, gender], SORT->[1 DESC LAST], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]}}}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])

You can see PushDownContext has correct SORT action, aka SORT->[1 DESC LAST], which means SortRemoveRule didn't pick the ASC-nulls-first as winner. The problem is the agg request builder doesn't have correct ordering on buckets. You can easily verify SortRemoveRule doesn't do anything wrong by manually removing our own rules like SortIndexScanRule. The testReverseAfterAggregationWithSort will pass without our SortIndexScanRule.

Regardless of the actual bug. I think your current approach still has some value. The collapse of consecutive sorts will reduce VolcanoPlanner's optimization space size. You can keep the current design.

} else {
// Sort with fetch/offset (limit) or fetch-only Sort - add a separate reversed
// sort on top so the "limit then reverse" semantics are preserved.
context.relBuilder.sort(reversedCollation);
}
} else {
context.relBuilder.sort(reversedCollation);
}
} else {
// Collation not found on current node - try backtracking
RelNode currentNode = context.relBuilder.peek();
RelCollation backtrackCollation = backtrackForCollation(currentNode);

if (backtrackCollation != null && !backtrackCollation.getFieldCollations().isEmpty()) {
// Found collation through backtracking - rebuild tree with reversed sort
RelCollation reversedCollation = PlanUtils.reverseCollation(backtrackCollation);
RelNode rebuiltTree = insertReversedSortInTree(currentNode, reversedCollation, context);
// Replace the current node in the builder with the rebuilt tree
context.relBuilder.build(); // Pop the current node
context.relBuilder.push(rebuiltTree); // Push the rebuilt tree
} else {
// Check if @timestamp field exists in the row type
List<String> fieldNames = context.relBuilder.peek().getRowType().getFieldNames();
if (fieldNames.contains(OpenSearchConstants.IMPLICIT_FIELD_TIMESTAMP)) {
// If @timestamp exists, sort by it in descending order
context.relBuilder.sort(
context.relBuilder.desc(
context.relBuilder.field(OpenSearchConstants.IMPLICIT_FIELD_TIMESTAMP)));
}
// If neither collation nor @timestamp exists, ignore the reverse command (no-op)
}
}

return context.relBuilder.peek();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
Expand Down Expand Up @@ -593,6 +596,37 @@ public Void visitCorrelVariable(RexCorrelVariable correlVar) {
}
}

/**
* Reverses the direction of a RelCollation.
*
* @param original The original collation to reverse
* @return A new RelCollation with reversed directions
*/
public static RelCollation reverseCollation(RelCollation original) {
if (original == null || original.getFieldCollations().isEmpty()) {
return original;
}

List<RelFieldCollation> reversedFields = new ArrayList<>();
for (RelFieldCollation field : original.getFieldCollations()) {
RelFieldCollation.Direction reversedDirection = field.direction.reverse();

// Handle null direction properly - reverse it as well
RelFieldCollation.NullDirection reversedNullDirection =
field.nullDirection == RelFieldCollation.NullDirection.FIRST
? RelFieldCollation.NullDirection.LAST
: field.nullDirection == RelFieldCollation.NullDirection.LAST
? RelFieldCollation.NullDirection.FIRST
: field.nullDirection;

RelFieldCollation reversedField =
new RelFieldCollation(field.getFieldIndex(), reversedDirection, reversedNullDirection);
reversedFields.add(reversedField);
}

return RelCollations.of(reversedFields);
}

/** Adds a rel node to the top of the stack while preserving the field names and aliases. */
static void replaceTop(RelBuilder relBuilder, RelNode relNode) {
try {
Expand Down
Loading
Loading