-
Notifications
You must be signed in to change notification settings - Fork 181
[Feature] Support bi-directional graph traversal command graphlookup
#5138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.ast.tree; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import java.util.List; | ||
| import javax.annotation.Nullable; | ||
| import lombok.AllArgsConstructor; | ||
| import lombok.Builder; | ||
| import lombok.EqualsAndHashCode; | ||
| import lombok.Getter; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.Setter; | ||
| import lombok.ToString; | ||
| import org.opensearch.sql.ast.AbstractNodeVisitor; | ||
| import org.opensearch.sql.ast.expression.Field; | ||
| import org.opensearch.sql.ast.expression.Literal; | ||
| import org.opensearch.sql.ast.expression.UnresolvedExpression; | ||
|
|
||
| /** | ||
| * AST node for graphLookup command. Performs BFS graph traversal on a lookup table. | ||
| * | ||
| * <p>Example: source=employees | graphLookup employees fromField=manager toField=name maxDepth=3 | ||
| * depthField=level direction=uni as hierarchy | ||
| */ | ||
| @Getter | ||
| @Setter | ||
| @ToString | ||
| @EqualsAndHashCode(callSuper = false) | ||
| @RequiredArgsConstructor | ||
| @AllArgsConstructor | ||
| @Builder(toBuilder = true) | ||
| public class GraphLookup extends UnresolvedPlan { | ||
| /** Direction mode for graph traversal. */ | ||
| public enum Direction { | ||
| /** Unidirectional - traverse edges in one direction only. */ | ||
| UNI, | ||
| /** Bidirectional - traverse edges in both directions. */ | ||
| BI | ||
| } | ||
|
|
||
| /** Target table for graph traversal lookup. */ | ||
| private final UnresolvedPlan fromTable; | ||
|
|
||
| /** Field in sourceTable to start with. */ | ||
| private final Field startField; | ||
|
|
||
| /** Field in fromTable that represents the outgoing edge. */ | ||
| private final Field fromField; | ||
|
|
||
| /** Field in input/fromTable to match against for traversal. */ | ||
| private final Field toField; | ||
|
|
||
| /** Output field name for collected traversal results. */ | ||
| private final Field as; | ||
|
|
||
| /** Maximum traversal depth. Zero means no limit. */ | ||
| private final Literal maxDepth; | ||
|
|
||
| /** Optional field name to include recursion depth in output. */ | ||
| private @Nullable final Field depthField; | ||
|
|
||
| /** Direction mode: UNI (default) or BIO for bidirectional. */ | ||
| private final Direction direction; | ||
|
|
||
| /** Whether to support array-typed fields without early filter pushdown. */ | ||
| private final boolean supportArray; | ||
|
|
||
| /** Whether to batch all source start values into a single unified BFS traversal. */ | ||
| private final boolean batchMode; | ||
|
|
||
| /** Whether to use PIT (Point In Time) search for the lookup table to get complete results. */ | ||
| private final boolean usePIT; | ||
|
|
||
| /** | ||
| * Optional filter condition to restrict which lookup table documents participate in traversal. | ||
| */ | ||
| private @Nullable final UnresolvedExpression filter; | ||
|
|
||
| private UnresolvedPlan child; | ||
|
|
||
| public String getDepthFieldName() { | ||
| return depthField == null ? null : depthField.getField().toString(); | ||
| } | ||
|
|
||
| @Override | ||
| public UnresolvedPlan attach(UnresolvedPlan child) { | ||
| this.child = child; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public List<UnresolvedPlan> getChild() { | ||
| return child == null ? ImmutableList.of() : ImmutableList.of(child); | ||
| } | ||
|
|
||
| @Override | ||
| public <T, C> T accept(AbstractNodeVisitor<T, C> visitor, C context) { | ||
| return visitor.visitGraphLookup(this, context); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -118,6 +118,8 @@ | |||||||||
| import org.opensearch.sql.ast.tree.FillNull; | ||||||||||
| import org.opensearch.sql.ast.tree.Filter; | ||||||||||
| import org.opensearch.sql.ast.tree.Flatten; | ||||||||||
| import org.opensearch.sql.ast.tree.GraphLookup; | ||||||||||
| import org.opensearch.sql.ast.tree.GraphLookup.Direction; | ||||||||||
| import org.opensearch.sql.ast.tree.Head; | ||||||||||
| import org.opensearch.sql.ast.tree.Join; | ||||||||||
| import org.opensearch.sql.ast.tree.Kmeans; | ||||||||||
|
|
@@ -151,6 +153,7 @@ | |||||||||
| import org.opensearch.sql.ast.tree.Window; | ||||||||||
| import org.opensearch.sql.calcite.plan.AliasFieldsWrappable; | ||||||||||
| import org.opensearch.sql.calcite.plan.OpenSearchConstants; | ||||||||||
| import org.opensearch.sql.calcite.plan.rel.LogicalGraphLookup; | ||||||||||
| import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit; | ||||||||||
| import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType; | ||||||||||
| import org.opensearch.sql.calcite.utils.BinUtils; | ||||||||||
|
|
@@ -2524,6 +2527,67 @@ public RelNode visitAddColTotals(AddColTotals node, CalcitePlanContext context) | |||||||||
| context, fieldsToAggregate, false, true, null, labelField, label); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public RelNode visitGraphLookup(GraphLookup node, CalcitePlanContext context) { | ||||||||||
| // 1. Visit source (child) table | ||||||||||
| visitChildren(node, context); | ||||||||||
| RelBuilder builder = context.relBuilder; | ||||||||||
| // TODO: Limit the number of source rows to 100 for now, make it configurable. | ||||||||||
| builder.limit(0, 100); | ||||||||||
|
Comment on lines
+2534
to
+2536
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove the hard-coded 100-row cap on graphLookup sources. The unconditional limit truncates the start set and will silently drop traversal results once there are more than 100 source rows. Please make this configurable or remove it to preserve correctness. 🛠️ Suggested change- // TODO: Limit the number of source rows to 100 for now, make it configurable.
- builder.limit(0, 100);📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents👋 Leave emoji reaction (👍/👎) to track effectiveness of CodeRabbit. |
||||||||||
| if (node.isBatchMode()) { | ||||||||||
| tryToRemoveMetaFields(context, true); | ||||||||||
| } | ||||||||||
| RelNode sourceTable = builder.build(); | ||||||||||
|
|
||||||||||
| // 2. Extract parameters | ||||||||||
| String startFieldName = node.getStartField().getField().toString(); | ||||||||||
| String fromFieldName = node.getFromField().getField().toString(); | ||||||||||
| String toFieldName = node.getToField().getField().toString(); | ||||||||||
| String outputFieldName = node.getAs().getField().toString(); | ||||||||||
| String depthFieldName = node.getDepthFieldName(); | ||||||||||
| boolean bidirectional = node.getDirection() == Direction.BI; | ||||||||||
|
|
||||||||||
| RexLiteral maxDepthNode = (RexLiteral) rexVisitor.analyze(node.getMaxDepth(), context); | ||||||||||
| Integer maxDepthValue = maxDepthNode.getValueAs(Integer.class); | ||||||||||
| maxDepthValue = maxDepthValue == null ? 0 : maxDepthValue; | ||||||||||
| boolean supportArray = node.isSupportArray(); | ||||||||||
| boolean batchMode = node.isBatchMode(); | ||||||||||
| boolean usePIT = node.isUsePIT(); | ||||||||||
|
|
||||||||||
| // 3. Visit and materialize lookup table | ||||||||||
| analyze(node.getFromTable(), context); | ||||||||||
| tryToRemoveMetaFields(context, true); | ||||||||||
|
|
||||||||||
| // 4. Convert filter expression to RexNode against lookup table schema | ||||||||||
| RexNode filterRex = null; | ||||||||||
| if (node.getFilter() != null) { | ||||||||||
| filterRex = rexVisitor.analyze(node.getFilter(), context); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| RelNode lookupTable = builder.build(); | ||||||||||
|
|
||||||||||
| // 5. Create LogicalGraphLookup RelNode | ||||||||||
| // The conversion rule will extract the OpenSearchIndex from the lookup table | ||||||||||
| RelNode graphLookup = | ||||||||||
| LogicalGraphLookup.create( | ||||||||||
| sourceTable, | ||||||||||
| lookupTable, | ||||||||||
| startFieldName, | ||||||||||
| fromFieldName, | ||||||||||
| toFieldName, | ||||||||||
| outputFieldName, | ||||||||||
| depthFieldName, | ||||||||||
| maxDepthValue, | ||||||||||
| bidirectional, | ||||||||||
| supportArray, | ||||||||||
| batchMode, | ||||||||||
| usePIT, | ||||||||||
| filterRex); | ||||||||||
|
|
||||||||||
| builder.push(graphLookup); | ||||||||||
| return builder.peek(); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Cast integer sum to long, real/float to double to avoid ClassCastException | ||||||||||
| * | ||||||||||
|
|
||||||||||
Uh oh!
There was an error while loading. Please reload this page.