Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Flatten;
import org.opensearch.sql.ast.tree.GraphLookup;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Kmeans;
Expand Down Expand Up @@ -546,6 +547,11 @@ public LogicalPlan visitMvCombine(MvCombine node, AnalysisContext context) {
throw getOnlyForCalciteException("mvcombine");
}

@Override
public LogicalPlan visitGraphLookup(GraphLookup node, AnalysisContext context) {
throw getOnlyForCalciteException("graphlookup");
}

/** Build {@link ParseExpression} to context and skip to child nodes. */
@Override
public LogicalPlan visitParse(Parse node, AnalysisContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Flatten;
import org.opensearch.sql.ast.tree.GraphLookup;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Kmeans;
Expand Down Expand Up @@ -475,4 +476,8 @@ public T visitAddColTotals(AddColTotals node, C context) {
public T visitMvCombine(MvCombine node, C context) {
return visitChildren(node, context);
}

public T visitGraphLookup(GraphLookup node, C context) {
return visitChildren(node, context);
}
}
104 changes: 104 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

/**
* AST node for graphLookup command. Performs BFS graph traversal on a lookup table.
*
* <p>Example: source=employees | graphLookup employees fromField=manager toField=name maxDepth=3
* depthField=level direction=uni as hierarchy
*/
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class GraphLookup extends UnresolvedPlan {
/** Direction mode for graph traversal. */
public enum Direction {
/** Unidirectional - traverse edges in one direction only. */
UNI,
/** Bidirectional - traverse edges in both directions. */
BI
}

/** Target table for graph traversal lookup. */
private final UnresolvedPlan fromTable;

/** Field in sourceTable to start with. */
private final Field startField;

/** Field in fromTable that represents the outgoing edge. */
private final Field fromField;

/** Field in input/fromTable to match against for traversal. */
private final Field toField;

/** Output field name for collected traversal results. */
private final Field as;

/** Maximum traversal depth. Zero means no limit. */
private final Literal maxDepth;

/** Optional field name to include recursion depth in output. */
private @Nullable final Field depthField;

/** Direction mode: UNI (default) or BIO for bidirectional. */
private final Direction direction;

/** Whether to support array-typed fields without early filter pushdown. */
private final boolean supportArray;

/** Whether to batch all source start values into a single unified BFS traversal. */
private final boolean batchMode;

/** Whether to use PIT (Point In Time) search for the lookup table to get complete results. */
private final boolean usePIT;

/**
* Optional filter condition to restrict which lookup table documents participate in traversal.
*/
private @Nullable final UnresolvedExpression filter;

private UnresolvedPlan child;

public String getDepthFieldName() {
return depthField == null ? null : depthField.getField().toString();
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return child == null ? ImmutableList.of() : ImmutableList.of(child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> visitor, C context) {
return visitor.visitGraphLookup(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Flatten;
import org.opensearch.sql.ast.tree.GraphLookup;
import org.opensearch.sql.ast.tree.GraphLookup.Direction;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Kmeans;
Expand Down Expand Up @@ -151,6 +153,7 @@
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
import org.opensearch.sql.calcite.plan.rel.LogicalGraphLookup;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.utils.BinUtils;
Expand Down Expand Up @@ -2524,6 +2527,67 @@ public RelNode visitAddColTotals(AddColTotals node, CalcitePlanContext context)
context, fieldsToAggregate, false, true, null, labelField, label);
}

@Override
public RelNode visitGraphLookup(GraphLookup node, CalcitePlanContext context) {
// 1. Visit source (child) table
visitChildren(node, context);
RelBuilder builder = context.relBuilder;
// TODO: Limit the number of source rows to 100 for now, make it configurable.
builder.limit(0, 100);
Comment on lines +2534 to +2536
Copy link
Contributor

@coderabbitai coderabbitai bot Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Remove the hard-coded 100-row cap on graphLookup sources.

The unconditional limit truncates the start set and will silently drop traversal results once there are more than 100 source rows. Please make this configurable or remove it to preserve correctness.

🛠️ Suggested change
-    // TODO: Limit the number of source rows to 100 for now, make it configurable.
-    builder.limit(0, 100);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
RelBuilder builder = context.relBuilder;
// TODO: Limit the number of source rows to 100 for now, make it configurable.
builder.limit(0, 100);
RelBuilder builder = context.relBuilder;
🤖 Prompt for AI Agents
In `@core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java`
around lines 2534 - 2536, The hard-coded builder.limit(0, 100) in
CalciteRelNodeVisitor (using context.relBuilder / builder) unconditionally
truncates graphLookup sources; remove this unconditional limit or replace it
with a configurable cap: delete the builder.limit(0, 100) call (or wrap it
behind a configuration check), add a new planner/config option (e.g.,
graphLookupSourceRowLimit) accessed from the visitor/context, and only apply
builder.limit(0, limit) when that config is set to a positive value so the
default preserves full source rows.

👋 Leave emoji reaction (👍/👎) to track effectiveness of CodeRabbit.

if (node.isBatchMode()) {
tryToRemoveMetaFields(context, true);
}
RelNode sourceTable = builder.build();

// 2. Extract parameters
String startFieldName = node.getStartField().getField().toString();
String fromFieldName = node.getFromField().getField().toString();
String toFieldName = node.getToField().getField().toString();
String outputFieldName = node.getAs().getField().toString();
String depthFieldName = node.getDepthFieldName();
boolean bidirectional = node.getDirection() == Direction.BI;

RexLiteral maxDepthNode = (RexLiteral) rexVisitor.analyze(node.getMaxDepth(), context);
Integer maxDepthValue = maxDepthNode.getValueAs(Integer.class);
maxDepthValue = maxDepthValue == null ? 0 : maxDepthValue;
boolean supportArray = node.isSupportArray();
boolean batchMode = node.isBatchMode();
boolean usePIT = node.isUsePIT();

// 3. Visit and materialize lookup table
analyze(node.getFromTable(), context);
tryToRemoveMetaFields(context, true);

// 4. Convert filter expression to RexNode against lookup table schema
RexNode filterRex = null;
if (node.getFilter() != null) {
filterRex = rexVisitor.analyze(node.getFilter(), context);
}

RelNode lookupTable = builder.build();

// 5. Create LogicalGraphLookup RelNode
// The conversion rule will extract the OpenSearchIndex from the lookup table
RelNode graphLookup =
LogicalGraphLookup.create(
sourceTable,
lookupTable,
startFieldName,
fromFieldName,
toFieldName,
outputFieldName,
depthFieldName,
maxDepthValue,
bidirectional,
supportArray,
batchMode,
usePIT,
filterRex);

builder.push(graphLookup);
return builder.peek();
}

/**
* Cast integer sum to long, real/float to double to avoid ClassCastException
*
Expand Down
Loading
Loading