diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 58d542538d..2d1cd73f9a 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -73,6 +73,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; @@ -546,6 +547,11 @@ public LogicalPlan visitMvCombine(MvCombine node, AnalysisContext context) { throw getOnlyForCalciteException("mvcombine"); } + @Override + public LogicalPlan visitGraphLookup(GraphLookup node, AnalysisContext context) { + throw getOnlyForCalciteException("graphlookup"); + } + /** Build {@link ParseExpression} to context and skip to child nodes. */ @Override public LogicalPlan visitParse(Parse node, AnalysisContext context) { diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index 2486b63791..45a8202e84 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -61,6 +61,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; @@ -475,4 +476,8 @@ public T visitAddColTotals(AddColTotals node, C context) { public T visitMvCombine(MvCombine node, C context) { return visitChildren(node, context); } + + public T visitGraphLookup(GraphLookup node, C context) { + return visitChildren(node, context); + } } diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java b/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java new file mode 100644 index 0000000000..6771c35c43 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Field; +import org.opensearch.sql.ast.expression.Literal; +import org.opensearch.sql.ast.expression.UnresolvedExpression; + +/** + * AST node for graphLookup command. Performs BFS graph traversal on a lookup table. + * + *

Example: source=employees | graphLookup employees fromField=manager toField=name maxDepth=3 + * depthField=level direction=uni as hierarchy + */ +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor +@AllArgsConstructor +@Builder(toBuilder = true) +public class GraphLookup extends UnresolvedPlan { + /** Direction mode for graph traversal. */ + public enum Direction { + /** Unidirectional - traverse edges in one direction only. */ + UNI, + /** Bidirectional - traverse edges in both directions. */ + BI + } + + /** Target table for graph traversal lookup. */ + private final UnresolvedPlan fromTable; + + /** Field in sourceTable to start with. */ + private final Field startField; + + /** Field in fromTable that represents the outgoing edge. */ + private final Field fromField; + + /** Field in input/fromTable to match against for traversal. */ + private final Field toField; + + /** Output field name for collected traversal results. */ + private final Field as; + + /** Maximum traversal depth. Zero means no limit. */ + private final Literal maxDepth; + + /** Optional field name to include recursion depth in output. */ + private @Nullable final Field depthField; + + /** Direction mode: UNI (default) or BIO for bidirectional. */ + private final Direction direction; + + /** Whether to support array-typed fields without early filter pushdown. */ + private final boolean supportArray; + + /** Whether to batch all source start values into a single unified BFS traversal. */ + private final boolean batchMode; + + /** Whether to use PIT (Point In Time) search for the lookup table to get complete results. */ + private final boolean usePIT; + + /** + * Optional filter condition to restrict which lookup table documents participate in traversal. + */ + private @Nullable final UnresolvedExpression filter; + + private UnresolvedPlan child; + + public String getDepthFieldName() { + return depthField == null ? null : depthField.getField().toString(); + } + + @Override + public UnresolvedPlan attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return child == null ? ImmutableList.of() : ImmutableList.of(child); + } + + @Override + public T accept(AbstractNodeVisitor visitor, C context) { + return visitor.visitGraphLookup(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 68a700b66b..8854c80ae6 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -118,6 +118,8 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; +import org.opensearch.sql.ast.tree.GraphLookup.Direction; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; @@ -151,6 +153,7 @@ import org.opensearch.sql.ast.tree.Window; import org.opensearch.sql.calcite.plan.AliasFieldsWrappable; import org.opensearch.sql.calcite.plan.OpenSearchConstants; +import org.opensearch.sql.calcite.plan.rel.LogicalGraphLookup; import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit; import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType; import org.opensearch.sql.calcite.utils.BinUtils; @@ -2524,6 +2527,67 @@ public RelNode visitAddColTotals(AddColTotals node, CalcitePlanContext context) context, fieldsToAggregate, false, true, null, labelField, label); } + @Override + public RelNode visitGraphLookup(GraphLookup node, CalcitePlanContext context) { + // 1. Visit source (child) table + visitChildren(node, context); + RelBuilder builder = context.relBuilder; + // TODO: Limit the number of source rows to 100 for now, make it configurable. + builder.limit(0, 100); + if (node.isBatchMode()) { + tryToRemoveMetaFields(context, true); + } + RelNode sourceTable = builder.build(); + + // 2. Extract parameters + String startFieldName = node.getStartField().getField().toString(); + String fromFieldName = node.getFromField().getField().toString(); + String toFieldName = node.getToField().getField().toString(); + String outputFieldName = node.getAs().getField().toString(); + String depthFieldName = node.getDepthFieldName(); + boolean bidirectional = node.getDirection() == Direction.BI; + + RexLiteral maxDepthNode = (RexLiteral) rexVisitor.analyze(node.getMaxDepth(), context); + Integer maxDepthValue = maxDepthNode.getValueAs(Integer.class); + maxDepthValue = maxDepthValue == null ? 0 : maxDepthValue; + boolean supportArray = node.isSupportArray(); + boolean batchMode = node.isBatchMode(); + boolean usePIT = node.isUsePIT(); + + // 3. Visit and materialize lookup table + analyze(node.getFromTable(), context); + tryToRemoveMetaFields(context, true); + + // 4. Convert filter expression to RexNode against lookup table schema + RexNode filterRex = null; + if (node.getFilter() != null) { + filterRex = rexVisitor.analyze(node.getFilter(), context); + } + + RelNode lookupTable = builder.build(); + + // 5. Create LogicalGraphLookup RelNode + // The conversion rule will extract the OpenSearchIndex from the lookup table + RelNode graphLookup = + LogicalGraphLookup.create( + sourceTable, + lookupTable, + startFieldName, + fromFieldName, + toFieldName, + outputFieldName, + depthFieldName, + maxDepthValue, + bidirectional, + supportArray, + batchMode, + usePIT, + filterRex); + + builder.push(graphLookup); + return builder.peek(); + } + /** * Cast integer sum to long, real/float to double to avoid ClassCastException * diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java new file mode 100644 index 0000000000..02ed97faf0 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java @@ -0,0 +1,192 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.plan.rel; + +import java.util.List; +import javax.annotation.Nullable; +import lombok.Getter; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.BiRel; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Abstract RelNode for graphLookup command. + * + *

Has two inputs: + * + *

+ * + *

At execution time, performs BFS by dynamically querying OpenSearch with filter pushdown + * instead of loading all lookup data into memory. + * + *

This is a storage-agnostic logical node. Storage-specific implementations (e.g., for + * OpenSearch) should extract the necessary index information from the lookup RelNode during + * conversion to the physical plan. + */ +@Getter +public abstract class GraphLookup extends BiRel { + + // TODO: use RexInputRef instead of String for there fields + protected final String startField; // Field in source table (start entities) + protected final String fromField; // Field in lookup table (edge source) + protected final String toField; // Field in lookup table (edge target) + protected final String outputField; // Name of output array field + @Nullable protected final String depthField; // Name of output array field + + // TODO: add limitation on the maxDepth and input rows count + protected final int maxDepth; // -1 = unlimited + protected final boolean bidirectional; + protected final boolean supportArray; + protected final boolean batchMode; + protected final boolean usePIT; + @Nullable protected final RexNode filter; + + private RelDataType outputRowType; + + /** + * Creates a LogicalGraphLookup. + * + * @param cluster Cluster + * @param traitSet Trait set + * @param source Source table RelNode + * @param lookup Lookup table RelNode + * @param startField Field name for start entities + * @param fromField Field name for outgoing edges + * @param toField Field name for incoming edges + * @param outputField Name of the output array field + * @param depthField Name of the depth field + * @param maxDepth Maximum traversal depth (-1 for unlimited) + * @param bidirectional Whether to traverse edges in both directions + * @param supportArray Whether to support array-typed fields (disables early visited filter + * pushdown) + * @param batchMode Whether to batch all source start values into a single unified BFS + * @param usePIT Whether to use PIT (Point In Time) search for complete results + * @param filter Optional filter condition for lookup table documents + */ + protected GraphLookup( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode source, + RelNode lookup, + String startField, + String fromField, + String toField, + String outputField, + @Nullable String depthField, + int maxDepth, + boolean bidirectional, + boolean supportArray, + boolean batchMode, + boolean usePIT, + @Nullable RexNode filter) { + super(cluster, traitSet, source, lookup); + this.startField = startField; + this.fromField = fromField; + this.toField = toField; + this.outputField = outputField; + this.depthField = depthField; + this.maxDepth = maxDepth; + this.bidirectional = bidirectional; + this.supportArray = supportArray; + this.batchMode = batchMode; + this.usePIT = usePIT; + this.filter = filter; + } + + /** Returns the source table RelNode. */ + public RelNode getSource() { + return left; + } + + /** Returns the lookup table RelNode. */ + public RelNode getLookup() { + return right; + } + + @Override + public abstract RelNode copy(RelTraitSet traitSet, List inputs); + + @Override + protected RelDataType deriveRowType() { + if (outputRowType == null) { + RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder(); + + if (batchMode) { + // Batch mode: Output = [Array, Array] + // First field: aggregated source rows as array + RelDataType sourceRowType = getSource().getRowType(); + RelDataType sourceArrayType = + getCluster().getTypeFactory().createArrayType(sourceRowType, -1); + builder.add(startField, sourceArrayType); + + // Second field: aggregated lookup rows as array + RelDataType lookupRowType = getLookup().getRowType(); + if (this.depthField != null) { + final RelDataTypeFactory.Builder lookupBuilder = getCluster().getTypeFactory().builder(); + lookupBuilder.addAll(lookupRowType.getFieldList()); + RelDataType depthType = getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER); + lookupBuilder.add(this.depthField, depthType); + lookupRowType = lookupBuilder.build(); + } + RelDataType lookupArrayType = + getCluster().getTypeFactory().createArrayType(lookupRowType, -1); + builder.add(outputField, lookupArrayType); + } else { + // Normal mode: Output = source fields + output array field + // Add all source fields + for (var field : getSource().getRowType().getFieldList()) { + builder.add(field); + } + + // Add output field (ARRAY type containing lookup row struct) + RelDataType lookupRowType = getLookup().getRowType(); + if (this.depthField != null) { + final RelDataTypeFactory.Builder lookupBuilder = getCluster().getTypeFactory().builder(); + lookupBuilder.addAll(lookupRowType.getFieldList()); + RelDataType depthType = getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER); + lookupBuilder.add(this.depthField, depthType); + lookupRowType = lookupBuilder.build(); + } + RelDataType arrayType = getCluster().getTypeFactory().createArrayType(lookupRowType, -1); + builder.add(outputField, arrayType); + } + + outputRowType = builder.build(); + } + return outputRowType; + } + + @Override + public double estimateRowCount(RelMetadataQuery mq) { + // Batch mode aggregates all source rows into a single output row + return batchMode ? 1 : getSource().estimateRowCount(mq); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .item("fromField", fromField) + .item("toField", toField) + .item("outputField", outputField) + .item("depthField", depthField) + .item("maxDepth", maxDepth) + .item("bidirectional", bidirectional) + .itemIf("supportArray", supportArray, supportArray) + .itemIf("batchMode", batchMode, batchMode) + .itemIf("usePIT", usePIT, usePIT) + .itemIf("filter", filter, filter != null); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java new file mode 100644 index 0000000000..94db3689f8 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java @@ -0,0 +1,148 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.plan.rel; + +import java.util.List; +import javax.annotation.Nullable; +import lombok.Getter; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; + +/** + * Logical RelNode for graphLookup command. TODO: need to support trim fields and several transpose + * rules for this new added RelNode + */ +@Getter +public class LogicalGraphLookup extends GraphLookup { + + /** + * Creates a LogicalGraphLookup. + * + * @param cluster Cluster + * @param traitSet Trait set + * @param source Source table RelNode + * @param lookup Lookup table RelNode + * @param startField Field name for start entities + * @param fromField Field name for outgoing edges + * @param toField Field name for incoming edges + * @param outputField Name of the output array field + * @param depthField Name of the depth field + * @param maxDepth Maximum traversal depth (-1 for unlimited) + * @param bidirectional Whether to traverse edges in both directions + * @param supportArray Whether to support array-typed fields + * @param batchMode Whether to batch all source start values into a single unified BFS + * @param usePIT Whether to use PIT (Point In Time) search for complete results + * @param filter Optional filter condition for lookup table documents + */ + protected LogicalGraphLookup( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode source, + RelNode lookup, + String startField, + String fromField, + String toField, + String outputField, + @Nullable String depthField, + int maxDepth, + boolean bidirectional, + boolean supportArray, + boolean batchMode, + boolean usePIT, + @Nullable RexNode filter) { + super( + cluster, + traitSet, + source, + lookup, + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode, + usePIT, + filter); + } + + /** + * Creates a LogicalGraphLookup with Convention.NONE. + * + * @param source Source table RelNode + * @param lookup Lookup table RelNode + * @param startField Field name for start entities + * @param fromField Field name for outgoing edges + * @param toField Field name for incoming edges + * @param outputField Name of the output array field + * @param depthField Named of the output depth field + * @param maxDepth Maximum traversal depth (-1 for unlimited) + * @param bidirectional Whether to traverse edges in both directions + * @param supportArray Whether to support array-typed fields + * @param batchMode Whether to batch all source start values into a single unified BFS + * @param usePIT Whether to use PIT (Point In Time) search for complete results + * @param filter Optional filter condition for lookup table documents + * @return A new LogicalGraphLookup instance + */ + public static LogicalGraphLookup create( + RelNode source, + RelNode lookup, + String startField, + String fromField, + String toField, + String outputField, + @Nullable String depthField, + int maxDepth, + boolean bidirectional, + boolean supportArray, + boolean batchMode, + boolean usePIT, + @Nullable RexNode filter) { + RelOptCluster cluster = source.getCluster(); + RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE); + return new LogicalGraphLookup( + cluster, + traitSet, + source, + lookup, + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode, + usePIT, + filter); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new LogicalGraphLookup( + getCluster(), + traitSet, + inputs.get(0), + inputs.get(1), + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode, + usePIT, + filter); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java index 17d99fb4fb..8dfe963081 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java @@ -223,7 +223,7 @@ public static ExprType convertSqlTypeNameToExprType(SqlTypeName sqlTypeName) { case BIGINT -> LONG; case FLOAT, REAL -> FLOAT; case DOUBLE, DECIMAL -> DOUBLE; // TODO the decimal is only used for literal - case CHAR, VARCHAR -> STRING; + case CHAR, VARCHAR, MULTISET -> STRING; // call toString() for MULTISET case BOOLEAN -> BOOLEAN; case DATE -> DATE; case TIME, TIME_TZ, TIME_WITH_LOCAL_TIME_ZONE -> TIME; diff --git a/docs/user/ppl/cmd/graphlookup.md b/docs/user/ppl/cmd/graphlookup.md new file mode 100644 index 0000000000..2d6220edae --- /dev/null +++ b/docs/user/ppl/cmd/graphlookup.md @@ -0,0 +1,357 @@ + +# graphLookup (Experimental) + +The `graphLookup` command performs recursive graph traversal on a collection using a breadth-first search (BFS) algorithm. It searches for documents matching a start value and recursively traverses connections between documents based on specified fields. This is useful for hierarchical data like organizational charts, social networks, or routing graphs. + +## Syntax + +The `graphLookup` command has the following syntax: + +```syntax +graphLookup startField= fromField= toField= [maxDepth=] [depthField=] [direction=(uni | bi)] [supportArray=(true | false)] [batchMode=(true | false)] [usePIT=(true | false)] [filter=()] as +``` + +The following are examples of the `graphLookup` command syntax: + +```syntax +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name as reportingHierarchy +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name maxDepth=2 as reportingHierarchy +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name depthField=level as reportingHierarchy +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name direction=bi as connections +source = travelers | graphLookup airports startField=nearestAirport fromField=connects toField=airport supportArray=true as reachableAirports +source = airports | graphLookup airports startField=airport fromField=connects toField=airport supportArray=true as reachableAirports +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name filter=(status = 'active' AND age > 18) as reportingHierarchy +``` + +## Parameters + +The `graphLookup` command supports the following parameters. + +| Parameter | Required/Optional | Description | +| --- | --- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `` | Required | The name of the index to perform the graph traversal on. Can be the same as the source index for self-referential graphs. | +| `startField=` | Required | The field in the source documents whose value is used to start the recursive search. The value of this field is matched against `toField` in the lookup index. We support both single value and array values as starting points. | +| `fromField=` | Required | The field in the lookup index documents that contains the value to recurse on. After matching a document, the value of this field is used to find the next set of documents. It supports both single value and array values. | +| `toField=` | Required | The field in the lookup index documents to match against. Documents where `toField` equals the current traversal value are included in the results. | +| `maxDepth=` | Optional | The maximum recursion depth of hops. Default is `0`. A value of `0` means only the direct connections to the statr values are returned. A value of `1` means 1 hop connections (initial match plus one recursive step), and so on. | +| `depthField=` | Optional | The name of the field to add to each traversed document indicating its recursion depth. If not specified, no depth field is added. Depth starts at `0` for the first level of matches. | +| `direction=(uni \| bi)` | Optional | The traversal direction. `uni` (default) performs unidirectional traversal following edges in the forward direction only. `bi` performs bidirectional traversal, following edges in both directions. | +| `supportArray=(true \| false)` | Optional | When `true`, disables early visited-node filter pushdown to OpenSearch. Default is `false`. Set to `true` when `fromField` or `toField` contains array values to ensure correct traversal behavior. See [Array Field Handling](#array-field-handling) for details. | +| `batchMode=(true \| false)` | Optional | When `true`, collects all start values from all source rows and performs a single unified BFS traversal. Default is `false`. The output changes to two arrays: `[Array, Array]`. See [Batch Mode](#batch-mode) for details. | +| `usePIT=(true \| false)` | Optional | When `true`, enables PIT (Point In Time) search for the lookup table, allowing paginated retrieval of complete results without the `max_result_window` size limit. Default is `false`. See [PIT Search](#pit-search) for details. | +| `filter=()` | Optional | A filter condition to restrict which lookup table documents participate in the graph traversal. Only documents matching the condition are considered as candidates during BFS. Parentheses around the condition are required. Example: `filter=(status = 'active' AND age > 18)`. | +| `as ` | Required | The name of the output array field that will contain all documents found during the graph traversal. | + +## How It Works + +The `graphLookup` command performs a breadth-first search (BFS) traversal: + +1. For each source document, extract the value of `startField` +2. Query the lookup index to find documents where `toField` matches the start value +3. Add matched documents to the result array +4. Extract `fromField` values from matched documents to continue traversal +5. Repeat steps 2-4 until no new documents are found or `maxDepth` is reached + +For bidirectional traversal (`direction=bi`), the algorithm also follows edges in the reverse direction by additionally matching `fromField` values. + +## Example 1: Employee Hierarchy Traversal + +Given an `employees` index with the following documents: + +| id | name | reportsTo | +|----|------|-----------| +| 1 | Dev | Eliot | +| 2 | Eliot | Ron | +| 3 | Ron | Andrew | +| 4 | Andrew | null | +| 5 | Asya | Ron | +| 6 | Dan | Andrew | + +The following query finds the reporting chain for each employee: + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + as reportingHierarchy +``` + +The query returns the following results: + +```text ++--------+----------+----+---------------------+ +| name | reportsTo| id | reportingHierarchy | ++--------+----------+----+---------------------+ +| Dev | Eliot | 1 | [{Eliot, Ron, 2}] | +| Eliot | Ron | 2 | [{Ron, Andrew, 3}] | +| Ron | Andrew | 3 | [{Andrew, null, 4}] | +| Andrew | null | 4 | [] | +| Asya | Ron | 5 | [{Ron, Andrew, 3}] | +| Dan | Andrew | 6 | [{Andrew, null, 4}] | ++--------+----------+----+---------------------+ +``` + +For Dev, the traversal starts with `reportsTo="Eliot"`, finds the Eliot record, and returns it in the `reportingHierarchy` array. + +## Example 2: Employee Hierarchy with Depth Tracking + +The following query adds a depth field to track how many levels each manager is from the employee: + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + depthField=level + as reportingHierarchy +``` + +The query returns the following results: + +```text ++--------+----------+----+------------------------+ +| name | reportsTo| id | reportingHierarchy | ++--------+----------+----+------------------------+ +| Dev | Eliot | 1 | [{Eliot, Ron, 2, 0}] | +| Eliot | Ron | 2 | [{Ron, Andrew, 3, 0}] | +| Ron | Andrew | 3 | [{Andrew, null, 4, 0}] | +| Andrew | null | 4 | [] | +| Asya | Ron | 5 | [{Ron, Andrew, 3, 0}] | +| Dan | Andrew | 6 | [{Andrew, null, 4, 0}] | ++--------+----------+----+------------------------+ +``` + +The depth field `level` is appended to each document in the result array. A value of `0` indicates the first level of matches. + +## Example 3: Limited Depth Traversal + +The following query limits traversal to 2 levels using `maxDepth=1`: + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + maxDepth=1 + as reportingHierarchy +``` + +The query returns the following results: + +```text ++--------+----------+----+--------------------------------------+ +| name | reportsTo| id | reportingHierarchy | ++--------+----------+----+--------------------------------------+ +| Dev | Eliot | 1 | [{Eliot, Ron, 2}, {Ron, Andrew, 3}] | +| Eliot | Ron | 2 | [{Ron, Andrew, 3}, {Andrew, null, 4}]| +| Ron | Andrew | 3 | [{Andrew, null, 4}] | +| Andrew | null | 4 | [] | +| Asya | Ron | 5 | [{Ron, Andrew, 3}, {Andrew, null, 4}]| +| Dan | Andrew | 6 | [{Andrew, null, 4}] | ++--------+----------+----+--------------------------------------+ +``` + +With `maxDepth=1`, the traversal goes two levels deep (depth 0 and depth 1). + +## Example 4: Airport Connections Graph + +Given an `airports` index with the following documents: + +| airport | connects | +|---------|----------| +| JFK | [BOS, ORD] | +| BOS | [JFK, PWM] | +| ORD | [JFK] | +| PWM | [BOS, LHR] | +| LHR | [PWM] | + +The following query finds reachable airports from each airport: + +```ppl ignore +source = airports + | graphLookup airports + startField=airport + fromField=connects + toField=airport + as reachableAirports +``` + +The query returns the following results: + +```text ++---------+------------+---------------------+ +| airport | connects | reachableAirports | ++---------+------------+---------------------+ +| JFK | [BOS, ORD] | [{JFK, [BOS, ORD]}] | +| BOS | [JFK, PWM] | [{BOS, [JFK, PWM]}] | +| ORD | [JFK] | [{ORD, [JFK]}] | +| PWM | [BOS, LHR] | [{PWM, [BOS, LHR]}] | +| LHR | [PWM] | [{LHR, [PWM]}] | ++---------+------------+---------------------+ +``` + +## Example 5: Cross-Index Graph Lookup + +The `graphLookup` command can use different source and lookup indexes. Given a `travelers` index: + +| name | nearestAirport | +|------|----------------| +| Dev | JFK | +| Eliot | JFK | +| Jeff | BOS | + +The following query finds reachable airports for each traveler: + +```ppl ignore +source = travelers + | graphLookup airports + startField=nearestAirport + fromField=connects + toField=airport + as reachableAirports +``` + +The query returns the following results: + +```text ++-------+----------------+---------------------+ +| name | nearestAirport | reachableAirports | ++-------+----------------+---------------------+ +| Dev | JFK | [{JFK, [BOS, ORD]}] | +| Eliot | JFK | [{JFK, [BOS, ORD]}] | +| Jeff | BOS | [{BOS, [JFK, PWM]}] | ++-------+----------------+---------------------+ +``` + +## Example 6: Bidirectional Traversal + +The following query performs bidirectional traversal to find both managers and colleagues who share the same manager: + +```ppl ignore +source = employees + | where name = 'Ron' + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + direction=bi + as connections +``` + +The query returns the following results: + +```text ++------+----------+----+------------------------------------------------+ +| name | reportsTo| id | connections | ++------+----------+----+------------------------------------------------+ +| Ron | Andrew | 3 | [{Ron, Andrew, 3}, {Andrew, null, 4}, {Dan, Andrew, 6}] | ++------+----------+----+------------------------------------------------+ +``` + +With bidirectional traversal, Ron's connections include: +- His own record (Ron reports to Andrew) +- His manager (Andrew) +- His peer (Dan, who also reports to Andrew) + +## Batch Mode + +When `batchMode=true`, the `graphLookup` command collects all start values from all source rows and performs a single unified BFS traversal instead of separate traversals per row. + +### Output Format Change + +In batch mode, the output is a **single row** with two arrays: +- First array: All source rows collected +- Second array: All lookup results from the unified BFS traversal + +### When to Use Batch Mode + +Use `batchMode=true` when: +- You want to find all nodes reachable from **any** of the source start values +- You need a global view of the graph connectivity from multiple starting points +- You want to avoid duplicate traversals when multiple source rows share overlapping paths + +### Example + +```ppl ignore +source = travelers + | graphLookup airports + startField=nearestAirport + fromField=connects + toField=airport + batchMode=true + maxDepth=2 + as reachableAirports +``` + +**Normal mode** (default): Each traveler gets their own list of reachable airports +```text +| name | nearestAirport | reachableAirports | +|-------|----------------|-------------------| +| Dev | JFK | [JFK, BOS, ORD] | +| Jeff | BOS | [BOS, JFK, PWM] | +``` + +**Batch mode**: A single row with all travelers and all reachable airports combined +```text +| travelers | reachableAirports | +|----------------------------------------|-----------------------------| +| [{Dev, JFK}, {Jeff, BOS}] | [JFK, BOS, ORD, PWM, ...] | +``` + +## Array Field Handling + +When the `fromField` or `toField` contains array values, you should set `supportArray=true` to ensure correct traversal behavior. + +## PIT Search + +By default, each level of BFS traversal limits the number of returned documents to the `max_result_window` setting of the lookup index (typically 10,000). This avoids the overhead of PIT (Point In Time) search but may return incomplete results when a single traversal level matches more documents than the limit. + +When `usePIT=true`, this limit is removed and the lookup table uses PIT-based pagination, which ensures all matching documents are retrieved at each traversal level. This provides complete and accurate results at the cost of additional search overhead. + +### When to Use PIT Search + +Use `usePIT=true` when: +- The graph has high-degree nodes where a single traversal level may match more than `max_result_window` documents +- Result completeness is more important than query performance +- You observe incomplete or missing results with the default setting + +### Example + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + usePIT=true + as reportingHierarchy +``` + +## Filtered Graph Traversal + +The `filter` parameter restricts which documents in the lookup table are considered during the BFS traversal. Only documents matching the filter condition participate as candidates at each traversal level. + +### Example + +The following query traverses only active employees in the reporting hierarchy: + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + filter=(status = 'active') + as reportingHierarchy +``` + +The filter is applied at the OpenSearch query level, so it combines efficiently with the BFS traversal queries. At each BFS level, the query sent to OpenSearch is effectively: `bool { filter: [user_filter, bfs_terms_query] }`. + +## Limitations + +- The source input, which provides the starting point for the traversal, has a limitation of 100 documents to avoid performance issues. +- When `usePIT=false` (default), each level of traversal search returns documents up to the `max_result_window` of the lookup index, which may result in incomplete data. Set `usePIT=true` to retrieve complete results. diff --git a/docs/user/ppl/index.md b/docs/user/ppl/index.md index 12afe96eea..718aa51f0f 100644 --- a/docs/user/ppl/index.md +++ b/docs/user/ppl/index.md @@ -82,6 +82,8 @@ source=accounts | [addcoltotals command](cmd/addcoltotals.md) | 3.5 | stable (since 3.5) | Adds column values and appends a totals row. | | [transpose command](cmd/transpose.md) | 3.5 | stable (since 3.5) | Transpose rows to columns. | | [mvcombine command](cmd/mvcombine.md) | 3.5 | stable (since 3.4) | Combines values of a specified field across rows identical on all other fields. | +| [graphlookup command](cmd/graphlookup.md) | 3.5 | experimental (since 3.5) | Performs recursive graph traversal on a collection using a BFS algorithm.| + - [Syntax](cmd/syntax.md) - PPL query structure and command syntax formatting * **Functions** @@ -101,4 +103,4 @@ source=accounts * **Optimization** - [Optimization](../../user/optimization/optimization.rst) * **Limitations** - - [Limitations](limitations/limitations.md) \ No newline at end of file + - [Limitations](limitations/limitations.md) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java new file mode 100644 index 0000000000..fbaefdb8c3 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java @@ -0,0 +1,738 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_AIRPORTS; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_EMPLOYEES; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_TRAVELERS; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +/** + * Integration tests for graphLookup command. Test cases are inspired by MongoDB's $graphLookup + * examples. + * + *

Test data: + * + *

+ * + * @see MongoDB + * $graphLookup + */ +public class CalcitePPLGraphLookupIT extends PPLIntegTestCase { + + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + + loadIndex(Index.GRAPH_EMPLOYEES); + loadIndex(Index.GRAPH_TRAVELERS); + loadIndex(Index.GRAPH_AIRPORTS); + } + + // ==================== Employee Hierarchy Tests ==================== + + /** Test 1: Basic employee hierarchy traversal. Find all managers in the reporting chain. */ + @Test + public void testEmployeeHierarchyBasicTraversal() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows("Dev", "Eliot", 1, List.of(List.of("Eliot", "Ron", 2))), + rows("Eliot", "Ron", 2, List.of(List.of("Ron", "Andrew", 3))), + rows("Ron", "Andrew", 3, List.of(Arrays.asList("Andrew", null, 4))), + rows("Andrew", null, 4, Collections.emptyList()), + rows("Asya", "Ron", 5, List.of(List.of("Ron", "Andrew", 3))), + rows("Dan", "Andrew", 6, List.of(Arrays.asList("Andrew", null, 4)))); + } + + /** Test 2: Employee hierarchy traversal with depth field. */ + @Test + public void testEmployeeHierarchyWithDepthField() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " depthField=level" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows("Dev", "Eliot", 1, List.of(List.of("Eliot", "Ron", 2, 0))), + rows("Eliot", "Ron", 2, List.of(List.of("Ron", "Andrew", 3, 0))), + rows("Ron", "Andrew", 3, List.of(Arrays.asList("Andrew", null, 4, 0))), + rows("Andrew", null, 4, Collections.emptyList()), + rows("Asya", "Ron", 5, List.of(List.of("Ron", "Andrew", 3, 0))), + rows("Dan", "Andrew", 6, List.of(Arrays.asList("Andrew", null, 4, 0)))); + } + + /** Test 3: Employee hierarchy with maxDepth=1 (allows 2 levels of traversal). */ + @Test + public void testEmployeeHierarchyWithMaxDepth() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " maxDepth=1" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows("Dev", "Eliot", 1, List.of(List.of("Eliot", "Ron", 2), List.of("Ron", "Andrew", 3))), + rows( + "Eliot", + "Ron", + 2, + List.of(List.of("Ron", "Andrew", 3), Arrays.asList("Andrew", null, 4))), + rows("Ron", "Andrew", 3, List.of(Arrays.asList("Andrew", null, 4))), + rows("Andrew", null, 4, Collections.emptyList()), + rows( + "Asya", + "Ron", + 5, + List.of(List.of("Ron", "Andrew", 3), Arrays.asList("Andrew", null, 4))), + rows("Dan", "Andrew", 6, List.of(Arrays.asList("Andrew", null, 4)))); + } + + /** Test 4: Query Dev's complete reporting chain: Dev->Eliot->Ron->Andrew */ + @Test + public void testEmployeeHierarchyForSpecificEmployee() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Dev'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows(result, rows("Dev", "Eliot", 1, List.of(List.of("Eliot", "Ron", 2)))); + } + + // ==================== Airport Connections Tests ==================== + + /** Test 5: Find all reachable airports from each airport. */ + @Test + public void testAirportConnections() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=airport" + + " fromField=connects" + + " toField=airport" + + " supportArray=true" + + " as reachableAirports", + TEST_INDEX_GRAPH_AIRPORTS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("airport", "string"), + schema("connects", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows("JFK", List.of("BOS", "ORD"), List.of(List.of("JFK", List.of("BOS", "ORD")))), + rows("BOS", List.of("JFK", "PWM"), List.of(List.of("BOS", List.of("JFK", "PWM")))), + rows("ORD", List.of("JFK"), List.of(List.of("ORD", List.of("JFK")))), + rows("PWM", List.of("BOS", "LHR"), List.of(List.of("PWM", List.of("BOS", "LHR")))), + rows("LHR", List.of("PWM"), List.of(List.of("LHR", List.of("PWM"))))); + } + + /** Test 6: Find airports reachable from JFK within maxDepth=1. */ + @Test + public void testAirportConnectionsWithMaxDepth() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where airport = 'JFK'" + + " | graphLookup %s" + + " startField=airport" + + " fromField=connects" + + " toField=airport" + + " maxDepth=1" + + " supportArray=true" + + " as reachableAirports", + TEST_INDEX_GRAPH_AIRPORTS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("airport", "string"), + schema("connects", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows( + "JFK", + List.of("BOS", "ORD"), + List.of(List.of("JFK", List.of("BOS", "ORD")), List.of("BOS", List.of("JFK", "PWM"))))); + } + + /** Test 7: Find airports with default depth(=0) and start value of list */ + @Test + public void testAirportConnectionsWithDepthField() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where airport = 'JFK'" + + " | graphLookup %s" + + " startField=connects" + + " fromField=connects" + + " toField=airport" + + " depthField=numConnections" + + " as reachableAirports", + TEST_INDEX_GRAPH_AIRPORTS, TEST_INDEX_GRAPH_AIRPORTS)); + verifySchema( + result, + schema("airport", "string"), + schema("connects", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows("JFK", List.of("BOS", "ORD"), List.of(List.of("BOS", List.of("JFK", "PWM"), 0)))); + } + + /** + * Test 8: Find reachable airports for all travelers. Uses travelers as source and airports as + * lookup table, with nearestAirport as the starting point for graph traversal. + */ + @Test + public void testTravelersReachableAirports() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=nearestAirport" + + " fromField=connects" + + " toField=airport" + + " as reachableAirports", + TEST_INDEX_GRAPH_TRAVELERS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("name", "string"), + schema("nearestAirport", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows("Dev", "JFK", List.of(List.of("JFK", List.of("BOS", "ORD")))), + rows("Eliot", "JFK", List.of(List.of("JFK", List.of("BOS", "ORD")))), + rows("Jeff", "BOS", List.of(List.of("BOS", List.of("JFK", "PWM"))))); + } + + /** + * Test 9: Find reachable airports for a specific traveler (Dev at JFK) with depth tracking. + * Traverses from JFK through connected airports. + */ + @Test + public void testTravelerReachableAirportsWithDepthField() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Dev'" + + " | graphLookup %s" + + " startField=nearestAirport" + + " fromField=connects" + + " toField=airport" + + " depthField=hops" + + " as reachableAirports", + TEST_INDEX_GRAPH_TRAVELERS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("name", "string"), + schema("nearestAirport", "string"), + schema("reachableAirports", "array")); + verifyDataRows(result, rows("Dev", "JFK", List.of(List.of("JFK", List.of("BOS", "ORD"), 0)))); + } + + /** + * Test 10: Find reachable airports for Jeff (at BOS) with maxDepth=1. Finds BOS record as the + * starting point and traverses one level to connected airports. + */ + @Test + public void testTravelerReachableAirportsWithMaxDepth() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Jeff'" + + " | graphLookup %s" + + " startField=nearestAirport" + + " fromField=connects" + + " toField=airport" + + " maxDepth=1" + + " supportArray=true" + + " as reachableAirports", + TEST_INDEX_GRAPH_TRAVELERS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("name", "string"), + schema("nearestAirport", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows( + "Jeff", + "BOS", + List.of( + List.of("BOS", List.of("JFK", "PWM")), + List.of("JFK", List.of("BOS", "ORD")), + List.of("PWM", List.of("BOS", "LHR"))))); + } + + // ==================== Bidirectional Traversal Tests ==================== + + /** Test 11: Bidirectional traversal for Ron (finds both managers and reports). */ + @Test + public void testBidirectionalEmployeeHierarchy() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Ron'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " direction=bi" + + " as connections", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("connections", "array")); + verifyDataRows( + result, + rows( + "Ron", + "Andrew", + 3, + List.of( + List.of("Ron", "Andrew", 3), + Arrays.asList("Andrew", null, 4), + List.of("Dan", "Andrew", 6)))); + } + + /** + * Test 12: Bidirectional airport connections for ORD. Note: Currently returns empty + * allConnections array because the connects field is an array type. + */ + @Test + public void testBidirectionalAirportConnections() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where airport = 'ORD'" + + " | graphLookup %s" + + " startField=connects" + + " fromField=connects" + + " toField=airport" + + " direction=bi" + + " as allConnections", + TEST_INDEX_GRAPH_AIRPORTS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("airport", "string"), + schema("connects", "string"), + schema("allConnections", "array")); + verifyDataRows( + result, + rows( + "ORD", + List.of("JFK"), + List.of(List.of("JFK", List.of("BOS", "ORD")), List.of("BOS", List.of("JFK", "PWM"))))); + } + + // ==================== Filter Tests ==================== + + /** + * Test: Filter employee hierarchy by id. Only lookup documents with id > 3 (Andrew=4, Asya=5, + * Dan=6) participate in traversal. Dev starts with reportsTo=Eliot, but Eliot (id=2) is excluded + * by filter, so Dev gets empty results. + */ + @Test + public void testEmployeeHierarchyWithFilter() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " filter=(id > 3)" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + // Only documents with id > 3 (Andrew=4, Asya=5, Dan=6) are in lookup table + // Dev: reportsTo=Eliot -> Eliot(id=2) is filtered out -> empty + // Eliot: reportsTo=Ron -> Ron(id=3) is filtered out -> empty + // Ron: reportsTo=Andrew -> Andrew(id=4) passes filter -> [{Andrew, null, 4}] + // Andrew: reportsTo=null -> empty + // Asya: reportsTo=Ron -> Ron(id=3) is filtered out -> empty + // Dan: reportsTo=Andrew -> Andrew(id=4) passes filter -> [{Andrew, null, 4}] + verifyDataRows( + result, + rows("Dev", "Eliot", 1, Collections.emptyList()), + rows("Eliot", "Ron", 2, Collections.emptyList()), + rows("Ron", "Andrew", 3, List.of(Arrays.asList("Andrew", null, 4))), + rows("Andrew", null, 4, Collections.emptyList()), + rows("Asya", "Ron", 5, Collections.emptyList()), + rows("Dan", "Andrew", 6, List.of(Arrays.asList("Andrew", null, 4)))); + } + + /** + * Test: Filter employee hierarchy with keyword match. Only employees whose name is NOT 'Andrew' + * participate in traversal. + */ + @Test + public void testEmployeeHierarchyWithKeywordFilter() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Ron'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " filter=(name != 'Andrew')" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + // Ron: reportsTo=Andrew -> Andrew is filtered out by name != 'Andrew' -> empty + verifyDataRows(result, rows("Ron", "Andrew", 3, Collections.emptyList())); + } + + /** + * Test: Filter with maxDepth combined. Dev traverses reporting chain but only considers lookup + * documents with id <= 3. + */ + @Test + public void testEmployeeHierarchyWithFilterAndMaxDepth() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Dev'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " maxDepth=3" + + " filter=(id <= 3)" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + // Dev: reportsTo=Eliot -> Eliot(id=2) passes -> then Eliot.reportsTo=Ron -> Ron(id=3) passes + // -> then Ron.reportsTo=Andrew -> Andrew(id=4) is filtered out -> stops + verifyDataRows( + result, + rows("Dev", "Eliot", 1, List.of(List.of("Eliot", "Ron", 2), List.of("Ron", "Andrew", 3)))); + } + + // ==================== Edge Cases ==================== + + /** Test 13: Graph lookup on empty result set (non-existent employee). */ + @Test + public void testEmptySourceResult() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'NonExistent'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows(result); + } + + /** Test 14: CEO (Andrew) with no manager - hierarchy should be empty. */ + @Test + public void testEmployeeWithNoManager() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Andrew'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows(result, rows("Andrew", null, 4, Collections.emptyList())); + } + + /** Test 15: Combined with stats command. */ + @Test + public void testGraphLookupWithStats() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy" + + " | stats count() by name", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema(result, schema("count()", "bigint"), schema("name", "string")); + verifyDataRows( + result, + rows(1L, "Ron"), + rows(1L, "Dan"), + rows(1L, "Dev"), + rows(1L, "Andrew"), + rows(1L, "Asya"), + rows(1L, "Eliot")); + } + + /** Test 16: Graph lookup with fields projection (name and reportingHierarchy only). */ + @Test + public void testGraphLookupWithFieldsProjection() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy" + + " | fields name, reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema(result, schema("name", "string"), schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows("Dev", List.of(List.of("Eliot", "Ron", 2))), + rows("Eliot", List.of(List.of("Ron", "Andrew", 3))), + rows("Ron", List.of(Arrays.asList("Andrew", null, 4))), + rows("Andrew", Collections.emptyList()), + rows("Asya", List.of(List.of("Ron", "Andrew", 3))), + rows("Dan", List.of(Arrays.asList("Andrew", null, 4)))); + } + + // ==================== Batch Mode Tests ==================== + + /** + * Test 17: Batch mode - collects all start values and performs unified BFS. Output is a single + * row with [Array, Array]. + * + *

Source: Dev (reportsTo=Eliot), Asya (reportsTo=Ron) Start values: {Eliot, Ron} BFS finds: + * Eliot->Ron, Ron->Andrew, Andrew->null + */ + @Test + public void testBatchModeEmployeeHierarchy() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name in ('Dev', 'Asya')" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " depthField=depth" + + " maxDepth=3" + + " batchMode=true" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema(result, schema("reportsTo", "array"), schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows( + List.of(List.of("Dev", "Eliot", 1), List.of("Asya", "Ron", 5)), + List.of(List.of("Ron", "Andrew", 3, 0), Arrays.asList("Andrew", null, 4, 1)))); + } + + /** + * Test 18: Batch mode for travelers - find all airports reachable from any traveler. All + * travelers' nearest airports: JFK (Dev, Eliot), BOS (Jeff) Unified BFS from {JFK, BOS} with + * maxDepth=1 finds connected airports. + */ + @Test + public void testBatchModeTravelersAirports() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=nearestAirport" + + " fromField=connects" + + " toField=airport" + + " batchMode=true" + + " depthField=depth" + + " maxDepth=3" + + " supportArray=true" + + " as reachableAirports", + TEST_INDEX_GRAPH_TRAVELERS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema(result, schema("nearestAirport", "array"), schema("reachableAirports", "array")); + // Batch mode returns single row with: + // - sourceRows: [{Dev, JFK}, {Eliot, JFK}, {Jeff, BOS}] + // - lookupResults: airports reachable from JFK and BOS within maxDepth=1 + verifyDataRows( + result, + rows( + List.of(List.of("Dev", "JFK"), List.of("Eliot", "JFK"), List.of("Jeff", "BOS")), + List.of( + List.of("JFK", List.of("BOS", "ORD"), 0), + List.of("BOS", List.of("JFK", "PWM"), 0), + List.of("PWM", List.of("BOS", "LHR"), 1)))); + } + + /** + * Test 19: Batch mode with bidirectional traversal. Dev (reportsTo=Eliot), Dan (reportsTo=Andrew) + * Bidirectional BFS from {Eliot, Andrew} finds connections in both directions. + */ + @Test + public void testBatchModeBidirectional() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name in ('Dev', 'Dan')" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " depthField=depth" + + " maxDepth=3" + + " direction=bi" + + " batchMode=true" + + " as connections", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema(result, schema("reportsTo", "array"), schema("connections", "array")); + // Batch mode returns single row with bidirectional traversal results + // Start from {Eliot, Andrew}, find connections in both directions + verifyDataRows( + result, + rows( + List.of(List.of("Dev", "Eliot", 1), List.of("Dan", "Andrew", 6)), + List.of( + List.of("Dev", "Eliot", 1, 0), + List.of("Eliot", "Ron", 2, 0), + Arrays.asList("Andrew", null, 4, 0), + List.of("Dan", "Andrew", 6, 0), + List.of("Asya", "Ron", 5, 1)))); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index c9de7a584c..5910bc8d47 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -838,6 +838,22 @@ public enum Index { "duplication_nullable", getDuplicationNullableIndexMapping(), "src/test/resources/duplication_nullable.json"), + // Graph lookup test indices (inspired by MongoDB $graphLookup examples) + GRAPH_EMPLOYEES( + TestsConstants.TEST_INDEX_GRAPH_EMPLOYEES, + "graph_employees", + getGraphEmployeesIndexMapping(), + "src/test/resources/graph_employees.json"), + GRAPH_TRAVELERS( + TestsConstants.TEST_INDEX_GRAPH_TRAVELERS, + "graph_travelers", + getGraphTravelersIndexMapping(), + "src/test/resources/graph_travelers.json"), + GRAPH_AIRPORTS( + TestsConstants.TEST_INDEX_GRAPH_AIRPORTS, + "graph_airports", + getGraphAirportsIndexMapping(), + "src/test/resources/graph_airports.json"), TPCH_ORDERS( "orders", "tpch", diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java index aa8b52af4a..1682762305 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java @@ -338,6 +338,21 @@ public static String getWorkInformationIndexMapping() { return getMappingFile(mappingFile); } + public static String getGraphEmployeesIndexMapping() { + String mappingFile = "graph_employees_index_mapping.json"; + return getMappingFile(mappingFile); + } + + public static String getGraphTravelersIndexMapping() { + String mappingFile = "graph_travelers_index_mapping.json"; + return getMappingFile(mappingFile); + } + + public static String getGraphAirportsIndexMapping() { + String mappingFile = "graph_airports_index_mapping.json"; + return getMappingFile(mappingFile); + } + public static String getDuplicationNullableIndexMapping() { String mappingFile = "duplication_nullable_index_mapping.json"; return getMappingFile(mappingFile); diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java index ad8a232bab..b5e49bbe02 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java @@ -84,6 +84,9 @@ public class TestsConstants { public static final String TEST_INDEX_WORKER = TEST_INDEX + "_worker"; public static final String TEST_INDEX_WORK_INFORMATION = TEST_INDEX + "_work_information"; public static final String TEST_INDEX_DUPLICATION_NULLABLE = TEST_INDEX + "_duplication_nullable"; + public static final String TEST_INDEX_GRAPH_EMPLOYEES = TEST_INDEX + "_graph_employees"; + public static final String TEST_INDEX_GRAPH_TRAVELERS = TEST_INDEX + "_graph_travelers"; + public static final String TEST_INDEX_GRAPH_AIRPORTS = TEST_INDEX + "_graph_airports"; public static final String TEST_INDEX_MERGE_TEST_1 = TEST_INDEX + "_merge_test_1"; public static final String TEST_INDEX_MERGE_TEST_2 = TEST_INDEX + "_merge_test_2"; public static final String TEST_INDEX_MERGE_TEST_WILDCARD = TEST_INDEX + "_merge_test_*"; diff --git a/integ-test/src/test/resources/graph_airports.json b/integ-test/src/test/resources/graph_airports.json new file mode 100644 index 0000000000..c644a24dc0 --- /dev/null +++ b/integ-test/src/test/resources/graph_airports.json @@ -0,0 +1,10 @@ +{"index":{"_id":"1"}} +{"airport":"JFK","connects":["BOS","ORD"]} +{"index":{"_id":"2"}} +{"airport":"BOS","connects":["JFK","PWM"]} +{"index":{"_id":"3"}} +{"airport":"ORD","connects":["JFK"]} +{"index":{"_id":"4"}} +{"airport":"PWM","connects":["BOS","LHR"]} +{"index":{"_id":"5"}} +{"airport":"LHR","connects":["PWM"]} diff --git a/integ-test/src/test/resources/graph_employees.json b/integ-test/src/test/resources/graph_employees.json new file mode 100644 index 0000000000..a9a2630fc0 --- /dev/null +++ b/integ-test/src/test/resources/graph_employees.json @@ -0,0 +1,12 @@ +{"index":{"_id":"1"}} +{"id":1,"name":"Dev","reportsTo":"Eliot"} +{"index":{"_id":"2"}} +{"id":2,"name":"Eliot","reportsTo":"Ron"} +{"index":{"_id":"3"}} +{"id":3,"name":"Ron","reportsTo":"Andrew"} +{"index":{"_id":"4"}} +{"id":4,"name":"Andrew","reportsTo":null} +{"index":{"_id":"5"}} +{"id":5,"name":"Asya","reportsTo":"Ron"} +{"index":{"_id":"6"}} +{"id":6,"name":"Dan","reportsTo":"Andrew"} diff --git a/integ-test/src/test/resources/graph_travelers.json b/integ-test/src/test/resources/graph_travelers.json new file mode 100644 index 0000000000..eb11d2206c --- /dev/null +++ b/integ-test/src/test/resources/graph_travelers.json @@ -0,0 +1,6 @@ +{"index":{"_id":"1"}} +{"name":"Dev","nearestAirport":"JFK"} +{"index":{"_id":"2"}} +{"name":"Eliot","nearestAirport":"JFK"} +{"index":{"_id":"3"}} +{"name":"Jeff","nearestAirport":"BOS"} diff --git a/integ-test/src/test/resources/indexDefinitions/graph_airports_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/graph_airports_index_mapping.json new file mode 100644 index 0000000000..e93812c8a1 --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/graph_airports_index_mapping.json @@ -0,0 +1,12 @@ +{ + "mappings": { + "properties": { + "airport": { + "type": "keyword" + }, + "connects": { + "type": "keyword" + } + } + } +} diff --git a/integ-test/src/test/resources/indexDefinitions/graph_employees_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/graph_employees_index_mapping.json new file mode 100644 index 0000000000..8c6674396e --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/graph_employees_index_mapping.json @@ -0,0 +1,15 @@ +{ + "mappings": { + "properties": { + "id": { + "type": "integer" + }, + "name": { + "type": "keyword" + }, + "reportsTo": { + "type": "keyword" + } + } + } +} diff --git a/integ-test/src/test/resources/indexDefinitions/graph_travelers_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/graph_travelers_index_mapping.json new file mode 100644 index 0000000000..f4697dead1 --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/graph_travelers_index_mapping.json @@ -0,0 +1,12 @@ +{ + "mappings": { + "properties": { + "name": { + "type": "keyword" + }, + "nearestAirport": { + "type": "keyword" + } + } + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index e3a4733716..c6be393d93 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -11,6 +11,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -19,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import org.apache.calcite.avatica.util.StructImpl; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; @@ -228,7 +230,7 @@ public void execute( * Process values recursively, handling geo points and nested maps. Geo points are converted to * OpenSearchExprGeoPointValue. Maps are recursively processed to handle nested structures. */ - private static Object processValue(Object value) { + private static Object processValue(Object value) throws SQLException { if (value == null) { return null; } @@ -244,6 +246,9 @@ private static Object processValue(Object value) { } return convertedMap; } + if (value instanceof StructImpl) { + return Arrays.asList(((StructImpl) value).getAttributes()); + } if (value instanceof List) { List list = (List) value; List convertedList = new ArrayList<>(); @@ -327,6 +332,9 @@ private void registerOpenSearchFunctions() { BuiltinFunctionName.DISTINCT_COUNT_APPROX, approxDistinctCountFunction); OperatorTable.addOperator( BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(), approxDistinctCountFunction); + + // Note: GraphLookup is now implemented as a custom RelNode (LogicalGraphLookup) + // instead of a UDF, so no registration is needed here. } /** diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java new file mode 100644 index 0000000000..e210095b48 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java @@ -0,0 +1,107 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.planner.rules; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.opensearch.sql.calcite.plan.rel.LogicalGraphLookup; +import org.opensearch.sql.opensearch.storage.OpenSearchIndex; +import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; +import org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableGraphLookup; + +/** Rule to convert a {@link LogicalGraphLookup} to a {@link CalciteEnumerableGraphLookup}. */ +public class EnumerableGraphLookupRule extends ConverterRule { + + /** Default configuration. */ + public static final Config DEFAULT_CONFIG = + Config.INSTANCE + .as(Config.class) + .withConversion( + LogicalGraphLookup.class, + Convention.NONE, + EnumerableConvention.INSTANCE, + "EnumerableGraphLookupRule") + .withRuleFactory(EnumerableGraphLookupRule::new); + + /** Creates an EnumerableGraphLookupRule. */ + protected EnumerableGraphLookupRule(Config config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + LogicalGraphLookup graphLookup = call.rel(0); + // Only match if we can extract the OpenSearchIndex from the lookup table + return extractOpenSearchIndex(graphLookup.getLookup()) != null; + } + + /** + * Recursively extracts OpenSearchIndex from a RelNode by traversing down to find the index scan. + * + * @param node The RelNode to extract from + * @return The OpenSearchIndex, or null if not found + */ + private static OpenSearchIndex extractOpenSearchIndex(RelNode node) { + if (node instanceof AbstractCalciteIndexScan scan) { + return scan.getOsIndex(); + } + if (node instanceof RelSubset subset) { + return extractOpenSearchIndex(subset.getOriginal()); + } + // Recursively check inputs + for (RelNode input : node.getInputs()) { + OpenSearchIndex index = extractOpenSearchIndex(input); + if (index != null) { + return index; + } + } + return null; + } + + @Override + public RelNode convert(RelNode rel) { + final LogicalGraphLookup graphLookup = (LogicalGraphLookup) rel; + + // Extract the OpenSearchIndex from the lookup table + OpenSearchIndex lookupIndex = extractOpenSearchIndex(graphLookup.getLookup()); + if (lookupIndex == null) { + throw new IllegalStateException("Cannot extract OpenSearchIndex from lookup table"); + } + + // Convert inputs to enumerable convention + RelTraitSet traitSet = graphLookup.getTraitSet().replace(EnumerableConvention.INSTANCE); + + RelNode convertedSource = + convert( + graphLookup.getSource(), + graphLookup.getSource().getTraitSet().replace(EnumerableConvention.INSTANCE)); + RelNode convertedLookup = + convert( + graphLookup.getLookup(), + graphLookup.getLookup().getTraitSet().replace(EnumerableConvention.INSTANCE)); + return new CalciteEnumerableGraphLookup( + graphLookup.getCluster(), + traitSet, + convertedSource, + convertedLookup, + graphLookup.getStartField(), + graphLookup.getFromField(), + graphLookup.getToField(), + graphLookup.getOutputField(), + graphLookup.getDepthField(), + graphLookup.getMaxDepth(), + graphLookup.isBidirectional(), + graphLookup.isSupportArray(), + graphLookup.isBatchMode(), + graphLookup.isUsePIT(), + graphLookup.getFilter()); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java index db65bb51a8..0068f445ce 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java @@ -16,6 +16,8 @@ public class OpenSearchIndexRules { EnumerableSystemIndexScanRule.DEFAULT_CONFIG.toRule(); private static final RelOptRule NESTED_AGGREGATE_RULE = EnumerableNestedAggregateRule.DEFAULT_CONFIG.toRule(); + private static final RelOptRule GRAPH_LOOKUP_RULE = + EnumerableGraphLookupRule.DEFAULT_CONFIG.toRule(); // Rule that always pushes down relevance functions regardless of pushdown settings private static final RelevanceFunctionPushdownRule RELEVANCE_FUNCTION_RULE = RelevanceFunctionPushdownRule.Config.DEFAULT.toRule(); @@ -23,7 +25,11 @@ public class OpenSearchIndexRules { /** The rules will apply whatever the pushdown setting is. */ public static final List OPEN_SEARCH_NON_PUSHDOWN_RULES = ImmutableList.of( - INDEX_SCAN_RULE, SYSTEM_INDEX_SCAN_RULE, NESTED_AGGREGATE_RULE, RELEVANCE_FUNCTION_RULE); + INDEX_SCAN_RULE, + SYSTEM_INDEX_SCAN_RULE, + NESTED_AGGREGATE_RULE, + GRAPH_LOOKUP_RULE, + RELEVANCE_FUNCTION_RULE); private static final ProjectIndexScanRule PROJECT_INDEX_SCAN = ProjectIndexScanRule.Config.DEFAULT.toRule(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java index ea28a746a3..53571f0a7a 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -1124,7 +1124,7 @@ QueryExpression notIn(LiteralExpression literal) { throw new PredicateAnalyzerException("notIn cannot be applied to " + this.getClass()); } - static QueryExpression create(TerminalExpression expression) { + public static QueryExpression create(TerminalExpression expression) { if (expression instanceof CastExpression) { expression = CastExpression.unpack(expression); } @@ -1813,11 +1813,11 @@ public String getReferenceForTermQuery() { } /** Literal like {@code 'foo' or 42 or true} etc. */ - static final class LiteralExpression implements TerminalExpression { + public static final class LiteralExpression implements TerminalExpression { final RexLiteral literal; - LiteralExpression(RexLiteral literal) { + public LiteralExpression(RexLiteral literal) { this.literal = literal; } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index d7539312cd..3350c00fb0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -80,7 +80,7 @@ public class OpenSearchIndex extends AbstractOpenSearchTable { @Getter private final Settings settings; /** {@link OpenSearchRequest.IndexName}. */ - private final OpenSearchRequest.IndexName indexName; + @Getter private final OpenSearchRequest.IndexName indexName; /** The cached mapping of field and type in index. */ private Map cachedFieldOpenSearchTypes = null; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java new file mode 100644 index 0000000000..6307a74146 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java @@ -0,0 +1,545 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage.scan; + +import static org.opensearch.index.query.QueryBuilders.boolQuery; +import static org.opensearch.index.query.QueryBuilders.termsQuery; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import lombok.Getter; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.tree.Blocks; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.sql.calcite.plan.Scannable; +import org.opensearch.sql.calcite.plan.rel.GraphLookup; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.opensearch.request.PredicateAnalyzer; +import org.opensearch.sql.opensearch.request.PredicateAnalyzer.NamedFieldExpression; +import org.opensearch.sql.opensearch.storage.scan.context.LimitDigest; +import org.opensearch.sql.opensearch.storage.scan.context.OSRequestBuilderAction; +import org.opensearch.sql.opensearch.storage.scan.context.PushDownType; +import org.opensearch.sql.opensearch.util.OpenSearchRelOptUtil; + +/** + * Enumerable implementation for graphLookup command. + * + *

Performs BFS graph traversal by dynamically querying OpenSearch with filter pushdown instead + * of loading all lookup data into memory. For each source row, it executes BFS queries to find all + * connected nodes in the graph. + */ +@Getter +public class CalciteEnumerableGraphLookup extends GraphLookup implements EnumerableRel, Scannable { + private static final Logger LOG = LogManager.getLogger(); + + /** + * Creates a CalciteEnumerableGraphLookup. + * + * @param cluster Cluster + * @param traitSet Trait set (must include EnumerableConvention) + * @param source Source table RelNode + * @param lookup Lookup table RelNode // * @param lookupIndex OpenSearchIndex for the lookup table + * (extracted from lookup RelNode) + * @param startField Field name for start entities + * @param fromField Field name for outgoing edges + * @param toField Field name for incoming edges + * @param outputField Name of the output array field + * @param depthField Name of the depth field + * @param maxDepth Maximum traversal depth (-1 for unlimited) + * @param bidirectional Whether to traverse edges in both directions + * @param supportArray Whether to support array-typed fields + * @param batchMode Whether to batch all source start values into a single unified BFS + * @param usePIT Whether to use PIT (Point In Time) search for complete results + * @param filter Optional filter condition for lookup table documents + */ + public CalciteEnumerableGraphLookup( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode source, + RelNode lookup, + String startField, + String fromField, + String toField, + String outputField, + String depthField, + int maxDepth, + boolean bidirectional, + boolean supportArray, + boolean batchMode, + boolean usePIT, + @Nullable RexNode filter) { + super( + cluster, + traitSet, + source, + lookup, + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode, + usePIT, + filter); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new CalciteEnumerableGraphLookup( + getCluster(), + traitSet, + inputs.get(0), + inputs.get(1), + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode, + usePIT, + filter); + } + + @Override + public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + // TODO: make it more accurate + return super.computeSelfCost(planner, mq); + } + + // TODO: support non-scannable inputs + @Override + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + PhysType physType = + PhysTypeImpl.of( + implementor.getTypeFactory(), + OpenSearchRelOptUtil.replaceDot(getCluster().getTypeFactory(), getRowType()), + pref.preferArray()); + + var scanOperator = implementor.stash(this, CalciteEnumerableGraphLookup.class); + return implementor.result(physType, Blocks.toBlock(Expressions.call(scanOperator, "scan"))); + } + + @Override + public Enumerable<@Nullable Object> scan() { + return new GraphLookupEnumerable(this); + } + + /** Enumerable implementation that performs BFS traversal for each source row. */ + private static class GraphLookupEnumerable extends AbstractEnumerable<@Nullable Object> { + + private final CalciteEnumerableGraphLookup graphLookup; + + GraphLookupEnumerable(CalciteEnumerableGraphLookup graphLookup) { + this.graphLookup = graphLookup; + } + + @Override + public Enumerator<@Nullable Object> enumerator() { + return new GraphLookupEnumerator(graphLookup); + } + } + + /** Enumerator that performs BFS for each source row. */ + private static class GraphLookupEnumerator implements Enumerator<@Nullable Object> { + + private final CalciteEnumerableGraphLookup graphLookup; + private final CalciteEnumerableIndexScan lookupScan; + private final Enumerator<@Nullable Object> sourceEnumerator; + private final List lookupFields; + private final int startFieldIndex; + private final int fromFieldIdx; + private final int toFieldIdx; + + private Object[] current = null; + private boolean batchModeCompleted = false; + + @SuppressWarnings("unchecked") + GraphLookupEnumerator(CalciteEnumerableGraphLookup graphLookup) { + this.graphLookup = graphLookup; + this.lookupScan = (CalciteEnumerableIndexScan) graphLookup.getLookup(); + if (!graphLookup.usePIT) { + // When usePIT is false (default), limit the size of the lookup table to MaxResultWindow + // to avoid PIT search for better performance, but results may be incomplete + final int maxResultWindow = this.lookupScan.getOsIndex().getMaxResultWindow(); + this.lookupScan.pushDownContext.add( + PushDownType.LIMIT, + new LimitDigest(maxResultWindow, 0), + (OSRequestBuilderAction) + requestBuilder -> requestBuilder.pushDownLimit(maxResultWindow, 0)); + } + // When usePIT is true, no limit is set, allowing PIT-based pagination for complete results + + // Get the source enumerator + if (graphLookup.getSource() instanceof Scannable scannable) { + Enumerable sourceEnum = scannable.scan(); + this.sourceEnumerator = (Enumerator<@Nullable Object>) sourceEnum.enumerator(); + } else { + throw new IllegalStateException( + "Source must be Scannable, got: " + graphLookup.getSource().getClass()); + } + + List sourceFields = graphLookup.getSource().getRowType().getFieldNames(); + this.lookupFields = graphLookup.getLookup().getRowType().getFieldNames(); + this.startFieldIndex = sourceFields.indexOf(graphLookup.getStartField()); + this.fromFieldIdx = lookupFields.indexOf(graphLookup.fromField); + this.toFieldIdx = lookupFields.indexOf(graphLookup.toField); + + // Push down user-specified filter to the lookup scan + if (graphLookup.filter != null) { + List schema = graphLookup.getLookup().getRowType().getFieldNames(); + Map fieldTypes = this.lookupScan.getOsIndex().getAllFieldTypes(); + try { + QueryBuilder filterQuery = + PredicateAnalyzer.analyze(graphLookup.filter, schema, fieldTypes); + this.lookupScan.pushDownContext.add( + PushDownType.FILTER, + null, + (OSRequestBuilderAction) rb -> rb.pushDownFilterForCalcite(filterQuery)); + } catch (PredicateAnalyzer.ExpressionNotAnalyzableException e) { + throw new RuntimeException( + "Cannot push down filter for graphLookup: " + e.getMessage(), e); + } + } + } + + @Override + public Object current() { + // source fields + output array (normal mode) or [source array, lookup array] (batch mode) + return current; + } + + @Override + public boolean moveNext() { + if (graphLookup.batchMode) { + return moveNextBatchMode(); + } else { + return moveNextNormalMode(); + } + } + + /** + * Batch mode: collect all source start values, perform unified BFS, return single aggregated + * row. + */ + private boolean moveNextBatchMode() { + // Batch mode only returns one row + if (batchModeCompleted) { + return false; + } + batchModeCompleted = true; + + // Collect all source rows and start values + List allSourceRows = new ArrayList<>(); + Set allStartValues = new HashSet<>(); + + while (sourceEnumerator.moveNext()) { + Object sourceRow = sourceEnumerator.current(); + Object[] sourceValues; + + if (sourceRow instanceof Object[] arr) { + sourceValues = arr; + } else { + sourceValues = new Object[] {sourceRow}; + } + + // Store the source row + allSourceRows.add(sourceValues); + + // Collect start value(s) + Object startValue = + (startFieldIndex >= 0 && startFieldIndex < sourceValues.length) + ? sourceValues[startFieldIndex] + : null; + + if (startValue != null) { + if (startValue instanceof List list) { + allStartValues.addAll(list); + } else { + allStartValues.add(startValue); + } + } + } + + // Perform unified BFS with all start values + List bfsResults = performBfs(allStartValues); + + // Build output row: [Array, Array] + current = new Object[] {allSourceRows, bfsResults}; + + return true; + } + + /** Normal mode: perform BFS for each source row individually. */ + private boolean moveNextNormalMode() { + if (!sourceEnumerator.moveNext()) { + return false; + } + + // Get current source row + Object sourceRow = sourceEnumerator.current(); + Object[] sourceValues; + + if (sourceRow instanceof Object[] arr) { + sourceValues = arr; + } else { + // Single column case + sourceValues = new Object[] {sourceRow}; + } + + // Get the start value for BFS + Object startValue = + (startFieldIndex >= 0 && startFieldIndex < sourceValues.length) + ? sourceValues[startFieldIndex] + : null; + + // Perform BFS traversal + List bfsResults = performBfs(startValue); + + // Build output row: source fields + array of BFS results + current = new Object[sourceValues.length + 1]; + System.arraycopy(sourceValues, 0, current, 0, sourceValues.length); + current[sourceValues.length] = bfsResults; + + return true; + } + + /** + * Performs BFS traversal starting from the given value by dynamically querying OpenSearch. + * + * @param startValue The starting value for BFS + * @return List of rows found during traversal + */ + private List performBfs(Object startValue) { + if (startValue == null) { + return List.of(); + } + + // TODO: support spillable for these collections + List results = new ArrayList<>(); + // TODO: If we want to include loop edges, we also need to track the visited edges + Set visitedNodes = new HashSet<>(); + Queue queue = new ArrayDeque<>(); + + // Initialize BFS with start value + if (startValue instanceof Collection collection) { + collection.forEach( + value -> { + if (!visitedNodes.contains(value)) { + visitedNodes.add(value); + queue.offer(value); + } + }); + } else { + visitedNodes.add(startValue); + queue.offer(startValue); + } + + int currentLevelDepth = 0; + while (!queue.isEmpty()) { + // Collect all values at current level for batch query + List currentLevelValues = new ArrayList<>(); + + while (!queue.isEmpty()) { + Object value = queue.poll(); + currentLevelValues.add(value); + } + + if (currentLevelValues.isEmpty()) { + break; + } + + // Query OpenSearch for all current level values + // Forward direction: fromField = currentLevelValues + List forwardResults = queryLookupTable(currentLevelValues, visitedNodes); + + if (!graphLookup.usePIT + && forwardResults.size() >= this.lookupScan.getOsIndex().getMaxResultWindow()) { + LOG.warn("BFS result size exceeds max result window, returning partial result."); + } + for (Object row : forwardResults) { + Object[] rowArray = (Object[]) (row); + Object fromValue = rowArray[fromFieldIdx]; + // Collect next values to traverse (may be single value or list) + // For forward traversal: extract fromField values for next level + // For bidirectional: also extract toField values. + // Skip visited values while keep null value + List nextValues = new ArrayList<>(); + collectValues(fromValue, nextValues, visitedNodes); + if (graphLookup.bidirectional) { + Object toValue = rowArray[toFieldIdx]; + collectValues(toValue, nextValues, visitedNodes); + } + + // Add row to results if the nextValues is not empty + if (!nextValues.isEmpty()) { + if (graphLookup.depthField != null) { + Object[] rowWithDepth = new Object[rowArray.length + 1]; + System.arraycopy(rowArray, 0, rowWithDepth, 0, rowArray.length); + rowWithDepth[rowArray.length] = currentLevelDepth; + results.add(rowWithDepth); + } else { + results.add(rowArray); + } + + // Add unvisited non-null values to queue for next level traversal + for (Object val : nextValues) { + if (val != null) { + visitedNodes.add(val); + queue.offer(val); + } + } + } + } + + if (++currentLevelDepth > graphLookup.maxDepth) break; + } + + return results; + } + + /** + * Queries the lookup table with a terms filter. + * + * @param values Values to match + * @param visitedValues Values to not match (ignored when supportArray is true) + * @return List of matching rows + */ + private List queryLookupTable( + Collection values, Collection visitedValues) { + if (values.isEmpty()) { + return List.of(); + } + + // Forward direction query + QueryBuilder query; + if (graphLookup.supportArray) { + // When supportArray is true, don't push down visited filter + // because array fields may contain multiple values that need to be checked individually + query = getQueryBuilder(toFieldIdx, values); + } else { + query = + boolQuery() + .must(getQueryBuilder(toFieldIdx, values)) + .mustNot(getQueryBuilder(fromFieldIdx, visitedValues)); + } + + if (graphLookup.bidirectional) { + // Also query fromField for bidirectional traversal + QueryBuilder backQuery; + if (graphLookup.supportArray) { + backQuery = getQueryBuilder(fromFieldIdx, values); + } else { + backQuery = + boolQuery() + .must(getQueryBuilder(fromFieldIdx, values)) + .mustNot(getQueryBuilder(toFieldIdx, visitedValues)); + } + query = QueryBuilders.boolQuery().should(query).should(backQuery); + } + CalciteEnumerableIndexScan newScan = (CalciteEnumerableIndexScan) this.lookupScan.copy(); + QueryBuilder finalQuery = query; + newScan.pushDownContext.add( + PushDownType.FILTER, + null, + (OSRequestBuilderAction) + requestBuilder -> requestBuilder.pushDownFilterForCalcite(finalQuery)); + Iterator<@Nullable Object> res = newScan.scan().iterator(); + List results = new ArrayList<>(); + while (res.hasNext()) { + results.add(res.next()); + } + closeIterator(res); + return results; + } + + private static void closeIterator(@Nullable Iterator iterator) { + if (iterator instanceof AutoCloseable) { + try { + ((AutoCloseable) iterator).close(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Provides a query builder to search edges with the field matching values + * + * @param fieldIdx field index + * @param values values to match + * @return query builder + */ + private QueryBuilder getQueryBuilder(int fieldIdx, Collection values) { + String fieldName = + new NamedFieldExpression(fieldIdx, lookupFields, lookupScan.getOsIndex().getFieldTypes()) + .getReferenceForTermQuery(); + return termsQuery(fieldName, values); + } + + /** + * Collects values from a field that may be a single value or a list. + * + * @param value The field value (may be single value or List) + * @param collector The list to collect values into + * @param visited Previously visited values to avoid duplicates + */ + private void collectValues(Object value, List collector, Set visited) { + if (value instanceof List list) { + for (Object item : list) { + if (!visited.contains(item)) { + collector.add(item); + } + } + } else if (!visited.contains(value)) { + collector.add(value); + } + } + + @Override + public void reset() { + sourceEnumerator.reset(); + current = null; + } + + @Override + public void close() { + sourceEnumerator.close(); + } + } +} diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 9113663e47..e456914ff5 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -53,6 +53,18 @@ TIMECHART: 'TIMECHART'; APPENDCOL: 'APPENDCOL'; ADDTOTALS: 'ADDTOTALS'; ADDCOLTOTALS: 'ADDCOLTOTALS'; +GRAPHLOOKUP: 'GRAPHLOOKUP'; +START_FIELD: 'STARTFIELD'; +FROM_FIELD: 'FROMFIELD'; +TO_FIELD: 'TOFIELD'; +MAX_DEPTH: 'MAXDEPTH'; +DEPTH_FIELD: 'DEPTHFIELD'; +DIRECTION: 'DIRECTION'; +UNI: 'UNI'; +BI: 'BI'; +SUPPORT_ARRAY: 'SUPPORTARRAY'; +BATCH_MODE: 'BATCHMODE'; +USE_PIT: 'USEPIT'; ROW: 'ROW'; COL: 'COL'; EXPAND: 'EXPAND'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 8cc4ed932d..1749c6ebf4 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -91,6 +91,7 @@ commands | replaceCommand | mvcombineCommand | fieldformatCommand + | graphLookupCommand ; commandName @@ -137,6 +138,7 @@ commandName | REPLACE | MVCOMBINE | TRANSPOSE + | GRAPHLOOKUP ; searchCommand @@ -631,6 +633,23 @@ addcoltotalsOption | (LABELFIELD EQUAL stringLiteral) ; +graphLookupCommand + : GRAPHLOOKUP lookupTable = tableSourceClause graphLookupOption* AS outputField = fieldExpression + ; + +graphLookupOption + : (START_FIELD EQUAL fieldExpression) + | (FROM_FIELD EQUAL fieldExpression) + | (TO_FIELD EQUAL fieldExpression) + | (MAX_DEPTH EQUAL integerLiteral) + | (DEPTH_FIELD EQUAL fieldExpression) + | (DIRECTION EQUAL (UNI | BI)) + | (SUPPORT_ARRAY EQUAL booleanLiteral) + | (BATCH_MODE EQUAL booleanLiteral) + | (USE_PIT EQUAL booleanLiteral) + | (FILTER EQUAL LT_PRTHS logicalExpression RT_PRTHS) + ; + // clauses fromClause : SOURCE EQUAL tableOrSubqueryClause @@ -1694,5 +1713,11 @@ searchableKeyWord | ROW | COL | COLUMN_NAME + | FROM_FIELD + | TO_FIELD + | MAX_DEPTH + | DEPTH_FIELD + | DIRECTION + | UNI + | BI ; - diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 1ff9d2818d..45d9a14bd8 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -85,6 +85,8 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; +import org.opensearch.sql.ast.tree.GraphLookup.Direction; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; @@ -1492,4 +1494,79 @@ public UnresolvedPlan visitAddcoltotalsCommand( java.util.Map options = cmdOptionsBuilder.build(); return new AddColTotals(fieldList, options); } + + @Override + public UnresolvedPlan visitGraphLookupCommand(OpenSearchPPLParser.GraphLookupCommandContext ctx) { + // Parse lookup table + UnresolvedPlan fromTable = visitTableSourceClause(ctx.lookupTable); + + // Parse options with defaults + Field fromField = null; + Field toField = null; + Literal maxDepth = Literal.ZERO; + Field startField = null; + Field depthField = null; + Direction direction = Direction.UNI; + boolean supportArray = false; + boolean batchMode = false; + boolean usePIT = false; + UnresolvedExpression filter = null; + + for (OpenSearchPPLParser.GraphLookupOptionContext option : ctx.graphLookupOption()) { + if (option.FROM_FIELD() != null) { + fromField = (Field) internalVisitExpression(option.fieldExpression()); + } + if (option.TO_FIELD() != null) { + toField = (Field) internalVisitExpression(option.fieldExpression()); + } + if (option.MAX_DEPTH() != null) { + maxDepth = (Literal) internalVisitExpression(option.integerLiteral()); + } + if (option.START_FIELD() != null) { + startField = (Field) internalVisitExpression(option.fieldExpression()); + } + if (option.DEPTH_FIELD() != null) { + depthField = (Field) internalVisitExpression(option.fieldExpression()); + } + if (option.DIRECTION() != null) { + direction = option.BI() != null ? Direction.BI : Direction.UNI; + } + if (option.SUPPORT_ARRAY() != null) { + Literal literal = (Literal) internalVisitExpression(option.booleanLiteral()); + supportArray = Boolean.TRUE.equals(literal.getValue()); + } + if (option.BATCH_MODE() != null) { + Literal literal = (Literal) internalVisitExpression(option.booleanLiteral()); + batchMode = Boolean.TRUE.equals(literal.getValue()); + } + if (option.USE_PIT() != null) { + Literal literal = (Literal) internalVisitExpression(option.booleanLiteral()); + usePIT = Boolean.TRUE.equals(literal.getValue()); + } + if (option.FILTER() != null) { + filter = internalVisitExpression(option.logicalExpression()); + } + } + + Field as = (Field) internalVisitExpression(ctx.outputField); + + if (fromField == null || toField == null) { + throw new SemanticCheckException("fromField and toField must be specified for graphLookup"); + } + + return GraphLookup.builder() + .fromTable(fromTable) + .fromField(fromField) + .toField(toField) + .as(as) + .maxDepth(maxDepth) + .startField(startField) + .depthField(depthField) + .direction(direction) + .supportArray(supportArray) + .batchMode(batchMode) + .usePIT(usePIT) + .filter(filter) + .build(); + } } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 4376b5659d..90f4ce9272 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -77,6 +77,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Lookup; @@ -224,6 +225,42 @@ public String visitLookup(Lookup node, String context) { "%s | lookup %s %s%s%s", child, MASK_TABLE, mappingFields, strategy, outputFields); } + @Override + public String visitGraphLookup(GraphLookup node, String context) { + String child = node.getChild().get(0).accept(this, context); + StringBuilder command = new StringBuilder(); + command.append(child).append(" | graphlookup ").append(MASK_TABLE); + if (node.getStartField() != null) { + command.append(" startField=").append(MASK_COLUMN); + } + command.append(" fromField=").append(MASK_COLUMN); + command.append(" toField=").append(MASK_COLUMN); + if (node.getMaxDepth() != null && !Integer.valueOf(0).equals(node.getMaxDepth().getValue())) { + command.append(" maxDepth=").append(MASK_LITERAL); + } + if (node.getDepthField() != null) { + command.append(" depthField=").append(MASK_COLUMN); + } + command.append(" direction=").append(node.getDirection().name().toLowerCase()); + if (node.isSupportArray()) { + command.append(" supportArray=true"); + } + if (node.isBatchMode()) { + command.append(" batchMode=true"); + } + if (node.isUsePIT()) { + command.append(" usePIT=true"); + } + if (node.getFilter() != null) { + command + .append(" filter=(") + .append(expressionAnalyzer.analyze(node.getFilter(), context)) + .append(")"); + } + command.append(" as ").append(MASK_COLUMN); + return command.toString(); + } + private String formatFieldAlias(java.util.Map fieldMap) { return fieldMap.entrySet().stream() .map( diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java new file mode 100644 index 0000000000..2790f3aadd --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java @@ -0,0 +1,222 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.apache.calcite.DataContext; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +public class CalcitePPLGraphLookupTest extends CalcitePPLAbstractTest { + + public CalcitePPLGraphLookupTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Test + public void testGraphLookupBasic() { + // Test basic graphLookup with same source and lookup table + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[0]," + + " bidirectional=[false])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testGraphLookupWithDepthField() { + // Test graphLookup with depthField parameter + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name depthField=level as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[level]," + + " maxDepth=[0], bidirectional=[false])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testGraphLookupWithMaxDepth() { + // Test graphLookup with maxDepth parameter + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name maxDepth=3 as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[3]," + + " bidirectional=[false])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testGraphLookupWithFilter() { + // Test graphLookup with filter parameter + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name filter=(id > 2) as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[0]," + + " bidirectional=[false], filter=[>($0, 2)])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testGraphLookupWithCompoundFilter() { + // Test graphLookup with compound filter condition + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name filter=(id > 1 AND name != 'Andrew') as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[0]," + + " bidirectional=[false], filter=[AND(>($0, 1), <>($1, 'Andrew'))])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testGraphLookupBidirectional() { + // Test graphLookup with bidirectional traversal + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name direction=bi as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[0]," + + " bidirectional=[true])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Override + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); + // Add employee table for graphLookup tests + ImmutableList rows = + ImmutableList.of( + new Object[] {1, "Dev", null}, + new Object[] {2, "Eliot", "Dev"}, + new Object[] {3, "Ron", "Eliot"}, + new Object[] {4, "Andrew", "Eliot"}, + new Object[] {5, "Asya", "Ron"}, + new Object[] {6, "Dan", "Andrew"}); + schema.add("employee", new EmployeeTable(rows)); + + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(schema) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + } + + @RequiredArgsConstructor + public static class EmployeeTable implements ScannableTable { + private final ImmutableList rows; + + protected final RelProtoDataType protoRowType = + factory -> + factory + .builder() + .add("id", SqlTypeName.INTEGER) + .nullable(false) + .add("name", SqlTypeName.VARCHAR) + .nullable(false) + .add("reportsTo", SqlTypeName.VARCHAR) + .nullable(true) + .build(); + + @Override + public Enumerable<@Nullable Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + @Nullable SqlNode parent, + @Nullable CalciteConnectionConfig config) { + return false; + } + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index 9e1cfe05a4..f7cadaaf57 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -77,6 +77,7 @@ import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.tree.AD; import org.opensearch.sql.ast.tree.Chart; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.RareTopN.CommandType; @@ -1643,4 +1644,76 @@ public void testMvmapWithNonFieldFirstArgThrowsException() { () -> plan("source=t | eval result = mvmap(123, 123 * 10)")) .getMessage()); } + + @Test + public void testGraphLookupCommand() { + // Basic graphLookup with required parameters + assertEqual( + "source=t | graphLookup employees fromField=manager toField=name maxDepth=3" + + " as reportingHierarchy", + GraphLookup.builder() + .child(relation("t")) + .fromTable(relation("employees")) + .fromField(field("manager")) + .toField(field("name")) + .as(field("reportingHierarchy")) + .maxDepth(intLiteral(3)) + .startField(null) + .depthField(null) + .direction(GraphLookup.Direction.UNI) + .build()); + + // graphLookup with startField filter + assertEqual( + "source=t | graphLookup employees fromField=manager toField=name" + + " startField=id as reportingHierarchy", + GraphLookup.builder() + .child(relation("t")) + .fromTable(relation("employees")) + .fromField(field("manager")) + .toField(field("name")) + .as(field("reportingHierarchy")) + .maxDepth(intLiteral(0)) + .startField(field("id")) + .depthField(null) + .direction(GraphLookup.Direction.UNI) + .build()); + + // graphLookup with depthField and bidirectional + assertEqual( + "source=t | graphLookup employees fromField=manager toField=name" + + " depthField=level direction=bi as reportingHierarchy", + GraphLookup.builder() + .child(relation("t")) + .fromTable(relation("employees")) + .fromField(field("manager")) + .toField(field("name")) + .as(field("reportingHierarchy")) + .maxDepth(intLiteral(0)) + .startField(null) + .depthField(field("level")) + .direction(GraphLookup.Direction.BI) + .build()); + + // Error: missing fromField - SemanticCheckException thrown by AstBuilder + assertThrows( + SemanticCheckException.class, + () -> + plan( + "source=t | graphLookup employees toField=name startField=id as" + + " reportingHierarchy")); + + // Error: missing lookup table - SyntaxCheckException from grammar + assertThrows( + SyntaxCheckException.class, + () -> + plan( + "source=t | graphLookup fromField=manager toField=name as" + + " reportingHierarchy")); + + // Error: missing toField - SemanticCheckException thrown by AstBuilder + assertThrows( + SemanticCheckException.class, + () -> plan("source=t | graphLookup employees fromField=manager as reportingHierarchy")); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 1e200eb092..0398d30bf1 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -643,6 +643,74 @@ public void testLookup() { + " COUNTRY2")); } + @Test + public void testGraphLookup() { + // Basic graphLookup with required parameters + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " as reportingHierarchy")); + // graphLookup with maxDepth + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " maxDepth=*** direction=uni as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " maxDepth=3 as reportingHierarchy")); + // graphLookup with depthField + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " depthField=identifier direction=uni as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " depthField=level as reportingHierarchy")); + // graphLookup with bidirectional mode + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=bi as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " direction=bi as reportingHierarchy")); + // graphLookup with all optional parameters + assertEquals( + "source=table | graphlookup table startField=identifier fromField=identifier" + + " toField=identifier maxDepth=*** depthField=identifier direction=bi" + + " as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " startField=id maxDepth=5 depthField=level direction=bi as reportingHierarchy")); + // graphLookup with supportArray + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni supportArray=true as identifier", + anonymize( + "source=t | graphLookup airports fromField=connects toField=airport" + + " supportArray=true as reachableAirports")); + // graphLookup with batchMode + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni batchMode=true as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " batchMode=true as reportingHierarchy")); + // graphLookup with filter + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni filter=(identifier = ***) as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " filter=(status = 'active') as reportingHierarchy")); + // graphLookup with compound filter + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni filter=(identifier = *** and identifier > ***) as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " filter=(status = 'active' AND id > 2) as reportingHierarchy")); + } + @Test public void testInSubquery() { assertEquals(