From 81eae25aaffecf4406e812f84774d8c5b1aec4da Mon Sep 17 00:00:00 2001 From: qianheng Date: Tue, 10 Feb 2026 11:10:20 +0800 Subject: [PATCH 1/2] Support bi-directional graph traversal command `graphlookup` (#5113) * succeed to graph lookup single index Signed-off-by: Lantao Jin * Implement graph lookup RelNode Signed-off-by: Heng Qian * Refine - remove depth from BFS node Signed-off-by: Heng Qian * Support bidirectional mode Signed-off-by: Heng Qian * Support anonymize graph lookup Signed-off-by: Heng Qian * Fix UT Signed-off-by: Heng Qian * Add IT Signed-off-by: Heng Qian * Add limitation for GraphLookup Signed-off-by: Heng Qian * Simplify GraphLookup param names Signed-off-by: Heng Qian * Refine IT Signed-off-by: Heng Qian * Support value of list; Support retrieve circle edges also Signed-off-by: Heng Qian * Add documentation for graph lookup Signed-off-by: Heng Qian * Don't include loop edges Signed-off-by: Heng Qian * Refine code Signed-off-by: Heng Qian * spotlessApply Signed-off-by: Heng Qian * Refine code Signed-off-by: Heng Qian * Filter visited nodes in search query Signed-off-by: Heng Qian * Fix UT Signed-off-by: Heng Qian * Add parameter supportArray for handling fields with array values Signed-off-by: Heng Qian * Remove unused code Signed-off-by: Heng Qian * Support batch mode Signed-off-by: Heng Qian * Close lookup table scan Signed-off-by: Heng Qian * refine code Signed-off-by: Heng Qian * Add param usePIT Signed-off-by: Heng Qian --------- Signed-off-by: Lantao Jin Signed-off-by: Heng Qian Co-authored-by: Lantao Jin --- .../org/opensearch/sql/analysis/Analyzer.java | 6 + .../sql/ast/AbstractNodeVisitor.java | 5 + .../ast/analysis/FieldResolutionVisitor.java | 7 + .../opensearch/sql/ast/tree/GraphLookup.java | 98 +++ .../sql/calcite/CalciteRelNodeVisitor.java | 56 ++ .../sql/calcite/plan/rel/GraphLookup.java | 186 ++++++ .../calcite/plan/rel/LogicalGraphLookup.java | 140 ++++ .../calcite/utils/OpenSearchTypeFactory.java | 2 +- docs/user/ppl/cmd/graphlookup.md | 335 ++++++++++ .../remote/CalcitePPLGraphLookupIT.java | 603 ++++++++++++++++++ .../sql/legacy/SQLIntegTestCase.java | 16 + .../org/opensearch/sql/legacy/TestUtils.java | 15 + .../opensearch/sql/legacy/TestsConstants.java | 3 + .../src/test/resources/graph_airports.json | 10 + .../src/test/resources/graph_employees.json | 12 + .../src/test/resources/graph_travelers.json | 6 + .../graph_airports_index_mapping.json | 12 + .../graph_employees_index_mapping.json | 15 + .../graph_travelers_index_mapping.json | 12 + .../executor/OpenSearchExecutionEngine.java | 7 + .../rules/EnumerableGraphLookupRule.java | 106 +++ .../planner/rules/OpenSearchIndexRules.java | 8 +- .../opensearch/request/PredicateAnalyzer.java | 8 +- .../opensearch/storage/OpenSearchIndex.java | 2 +- .../scan/CalciteEnumerableGraphLookup.java | 520 +++++++++++++++ ppl/src/main/antlr/OpenSearchPPLLexer.g4 | 12 + ppl/src/main/antlr/OpenSearchPPLParser.g4 | 26 +- .../opensearch/sql/ppl/parser/AstBuilder.java | 72 +++ .../sql/ppl/utils/PPLQueryDataAnonymizer.java | 31 + .../calcite/CalcitePPLGraphLookupTest.java | 186 ++++++ .../sql/ppl/parser/AstBuilderTest.java | 73 +++ .../ppl/utils/PPLQueryDataAnonymizerTest.java | 54 ++ 32 files changed, 2636 insertions(+), 8 deletions(-) create mode 100644 core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java create mode 100644 core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java create mode 100644 core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java create mode 100644 docs/user/ppl/cmd/graphlookup.md create mode 100644 integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java create mode 100644 integ-test/src/test/resources/graph_airports.json create mode 100644 integ-test/src/test/resources/graph_employees.json create mode 100644 integ-test/src/test/resources/graph_travelers.json create mode 100644 integ-test/src/test/resources/indexDefinitions/graph_airports_index_mapping.json create mode 100644 integ-test/src/test/resources/indexDefinitions/graph_employees_index_mapping.json create mode 100644 integ-test/src/test/resources/indexDefinitions/graph_travelers_index_mapping.json create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java create mode 100644 ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 2b2fe3cf1cd..0d1ea327324 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -73,6 +73,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; @@ -541,6 +542,11 @@ public LogicalPlan visitMvCombine(MvCombine node, AnalysisContext context) { throw getOnlyForCalciteException("mvcombine"); } + @Override + public LogicalPlan visitGraphLookup(GraphLookup node, AnalysisContext context) { + throw getOnlyForCalciteException("graphlookup"); + } + /** Build {@link ParseExpression} to context and skip to child nodes. */ @Override public LogicalPlan visitParse(Parse node, AnalysisContext context) { diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index b1082759a3f..8abafbc286e 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -61,6 +61,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; @@ -471,4 +472,8 @@ public T visitAddColTotals(AddColTotals node, C context) { public T visitMvCombine(MvCombine node, C context) { return visitChildren(node, context); } + + public T visitGraphLookup(GraphLookup node, C context) { + return visitChildren(node, context); + } } diff --git a/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java b/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java index a6f6671084a..a44f0bca41c 100644 --- a/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java @@ -40,6 +40,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Lookup; @@ -529,6 +530,12 @@ public Node visitLookup(Lookup node, FieldResolutionContext context) { throw new IllegalArgumentException("Lookup command cannot be used together with spath command"); } + @Override + public Node visitGraphLookup(GraphLookup node, FieldResolutionContext context) { + throw new IllegalArgumentException( + "GraphLookup command cannot be used together with spath command"); + } + @Override public Node visitValues(Values node, FieldResolutionContext context) { // do nothing diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java b/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java new file mode 100644 index 00000000000..51084d7b9b9 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Field; +import org.opensearch.sql.ast.expression.Literal; + +/** + * AST node for graphLookup command. Performs BFS graph traversal on a lookup table. + * + *

Example: source=employees | graphLookup employees fromField=manager toField=name maxDepth=3 + * depthField=level direction=uni as hierarchy + */ +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor +@AllArgsConstructor +@Builder(toBuilder = true) +public class GraphLookup extends UnresolvedPlan { + /** Direction mode for graph traversal. */ + public enum Direction { + /** Unidirectional - traverse edges in one direction only. */ + UNI, + /** Bidirectional - traverse edges in both directions. */ + BI + } + + /** Target table for graph traversal lookup. */ + private final UnresolvedPlan fromTable; + + /** Field in sourceTable to start with. */ + private final Field startField; + + /** Field in fromTable that represents the outgoing edge. */ + private final Field fromField; + + /** Field in input/fromTable to match against for traversal. */ + private final Field toField; + + /** Output field name for collected traversal results. */ + private final Field as; + + /** Maximum traversal depth. Zero means no limit. */ + private final Literal maxDepth; + + /** Optional field name to include recursion depth in output. */ + private @Nullable final Field depthField; + + /** Direction mode: UNI (default) or BIO for bidirectional. */ + private final Direction direction; + + /** Whether to support array-typed fields without early filter pushdown. */ + private final boolean supportArray; + + /** Whether to batch all source start values into a single unified BFS traversal. */ + private final boolean batchMode; + + /** Whether to use PIT (Point In Time) search for the lookup table to get complete results. */ + private final boolean usePIT; + + private UnresolvedPlan child; + + public String getDepthFieldName() { + return depthField == null ? null : depthField.getField().toString(); + } + + @Override + public UnresolvedPlan attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return child == null ? ImmutableList.of() : ImmutableList.of(child); + } + + @Override + public T accept(AbstractNodeVisitor visitor, C context) { + return visitor.visitGraphLookup(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 5825011f653..c9227781f6a 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -118,6 +118,8 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; +import org.opensearch.sql.ast.tree.GraphLookup.Direction; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; @@ -151,6 +153,7 @@ import org.opensearch.sql.ast.tree.Window; import org.opensearch.sql.calcite.plan.AliasFieldsWrappable; import org.opensearch.sql.calcite.plan.OpenSearchConstants; +import org.opensearch.sql.calcite.plan.rel.LogicalGraphLookup; import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit; import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType; import org.opensearch.sql.calcite.utils.BinUtils; @@ -2573,6 +2576,59 @@ public RelNode visitAddColTotals(AddColTotals node, CalcitePlanContext context) context, fieldsToAggregate, false, true, null, labelField, label); } + @Override + public RelNode visitGraphLookup(GraphLookup node, CalcitePlanContext context) { + // 1. Visit source (child) table + visitChildren(node, context); + RelBuilder builder = context.relBuilder; + // TODO: Limit the number of source rows to 100 for now, make it configurable. + builder.limit(0, 100); + if (node.isBatchMode()) { + tryToRemoveMetaFields(context, true); + } + RelNode sourceTable = builder.build(); + + // 2. Extract parameters + String startFieldName = node.getStartField().getField().toString(); + String fromFieldName = node.getFromField().getField().toString(); + String toFieldName = node.getToField().getField().toString(); + String outputFieldName = node.getAs().getField().toString(); + String depthFieldName = node.getDepthFieldName(); + boolean bidirectional = node.getDirection() == Direction.BI; + + RexLiteral maxDepthNode = (RexLiteral) rexVisitor.analyze(node.getMaxDepth(), context); + Integer maxDepthValue = maxDepthNode.getValueAs(Integer.class); + maxDepthValue = maxDepthValue == null ? 0 : maxDepthValue; + boolean supportArray = node.isSupportArray(); + boolean batchMode = node.isBatchMode(); + boolean usePIT = node.isUsePIT(); + + // 3. Visit and materialize lookup table + analyze(node.getFromTable(), context); + tryToRemoveMetaFields(context, true); + RelNode lookupTable = builder.build(); + + // 4. Create LogicalGraphLookup RelNode + // The conversion rule will extract the OpenSearchIndex from the lookup table + RelNode graphLookup = + LogicalGraphLookup.create( + sourceTable, + lookupTable, + startFieldName, + fromFieldName, + toFieldName, + outputFieldName, + depthFieldName, + maxDepthValue, + bidirectional, + supportArray, + batchMode, + usePIT); + + builder.push(graphLookup); + return builder.peek(); + } + /** * Cast integer sum to long, real/float to double to avoid ClassCastException * diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java new file mode 100644 index 00000000000..ef7134a0162 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.plan.rel; + +import java.util.List; +import javax.annotation.Nullable; +import lombok.Getter; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.BiRel; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Abstract RelNode for graphLookup command. + * + *

Has two inputs: + * + *

    + *
  • source: source table (rows to start BFS from) + *
  • lookup: lookup table (graph edges to traverse) + *
+ * + *

At execution time, performs BFS by dynamically querying OpenSearch with filter pushdown + * instead of loading all lookup data into memory. + * + *

This is a storage-agnostic logical node. Storage-specific implementations (e.g., for + * OpenSearch) should extract the necessary index information from the lookup RelNode during + * conversion to the physical plan. + */ +@Getter +public abstract class GraphLookup extends BiRel { + + // TODO: use RexInputRef instead of String for there fields + protected final String startField; // Field in source table (start entities) + protected final String fromField; // Field in lookup table (edge source) + protected final String toField; // Field in lookup table (edge target) + protected final String outputField; // Name of output array field + @Nullable protected final String depthField; // Name of output array field + + // TODO: add limitation on the maxDepth and input rows count + protected final int maxDepth; // -1 = unlimited + protected final boolean bidirectional; + protected final boolean supportArray; + protected final boolean batchMode; + protected final boolean usePIT; + + private RelDataType outputRowType; + + /** + * Creates a LogicalGraphLookup. + * + * @param cluster Cluster + * @param traitSet Trait set + * @param source Source table RelNode + * @param lookup Lookup table RelNode + * @param startField Field name for start entities + * @param fromField Field name for outgoing edges + * @param toField Field name for incoming edges + * @param outputField Name of the output array field + * @param depthField Name of the depth field + * @param maxDepth Maximum traversal depth (-1 for unlimited) + * @param bidirectional Whether to traverse edges in both directions + * @param supportArray Whether to support array-typed fields (disables early visited filter + * pushdown) + * @param batchMode Whether to batch all source start values into a single unified BFS + * @param usePIT Whether to use PIT (Point In Time) search for complete results + */ + protected GraphLookup( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode source, + RelNode lookup, + String startField, + String fromField, + String toField, + String outputField, + @Nullable String depthField, + int maxDepth, + boolean bidirectional, + boolean supportArray, + boolean batchMode, + boolean usePIT) { + super(cluster, traitSet, source, lookup); + this.startField = startField; + this.fromField = fromField; + this.toField = toField; + this.outputField = outputField; + this.depthField = depthField; + this.maxDepth = maxDepth; + this.bidirectional = bidirectional; + this.supportArray = supportArray; + this.batchMode = batchMode; + this.usePIT = usePIT; + } + + /** Returns the source table RelNode. */ + public RelNode getSource() { + return left; + } + + /** Returns the lookup table RelNode. */ + public RelNode getLookup() { + return right; + } + + @Override + public abstract RelNode copy(RelTraitSet traitSet, List inputs); + + @Override + protected RelDataType deriveRowType() { + if (outputRowType == null) { + RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder(); + + if (batchMode) { + // Batch mode: Output = [Array, Array] + // First field: aggregated source rows as array + RelDataType sourceRowType = getSource().getRowType(); + RelDataType sourceArrayType = + getCluster().getTypeFactory().createArrayType(sourceRowType, -1); + builder.add(startField, sourceArrayType); + + // Second field: aggregated lookup rows as array + RelDataType lookupRowType = getLookup().getRowType(); + if (this.depthField != null) { + final RelDataTypeFactory.Builder lookupBuilder = getCluster().getTypeFactory().builder(); + lookupBuilder.addAll(lookupRowType.getFieldList()); + RelDataType depthType = getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER); + lookupBuilder.add(this.depthField, depthType); + lookupRowType = lookupBuilder.build(); + } + RelDataType lookupArrayType = + getCluster().getTypeFactory().createArrayType(lookupRowType, -1); + builder.add(outputField, lookupArrayType); + } else { + // Normal mode: Output = source fields + output array field + // Add all source fields + for (var field : getSource().getRowType().getFieldList()) { + builder.add(field); + } + + // Add output field (ARRAY type containing lookup row struct) + RelDataType lookupRowType = getLookup().getRowType(); + if (this.depthField != null) { + final RelDataTypeFactory.Builder lookupBuilder = getCluster().getTypeFactory().builder(); + lookupBuilder.addAll(lookupRowType.getFieldList()); + RelDataType depthType = getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER); + lookupBuilder.add(this.depthField, depthType); + lookupRowType = lookupBuilder.build(); + } + RelDataType arrayType = getCluster().getTypeFactory().createArrayType(lookupRowType, -1); + builder.add(outputField, arrayType); + } + + outputRowType = builder.build(); + } + return outputRowType; + } + + @Override + public double estimateRowCount(RelMetadataQuery mq) { + // Batch mode aggregates all source rows into a single output row + return batchMode ? 1 : getSource().estimateRowCount(mq); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .item("fromField", fromField) + .item("toField", toField) + .item("outputField", outputField) + .item("depthField", depthField) + .item("maxDepth", maxDepth) + .item("bidirectional", bidirectional) + .itemIf("supportArray", supportArray, supportArray) + .itemIf("batchMode", batchMode, batchMode) + .itemIf("usePIT", usePIT, usePIT); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java new file mode 100644 index 00000000000..b02bbec3742 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java @@ -0,0 +1,140 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.plan.rel; + +import java.util.List; +import javax.annotation.Nullable; +import lombok.Getter; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; + +/** + * Logical RelNode for graphLookup command. TODO: need to support trim fields and several transpose + * rules for this new added RelNode + */ +@Getter +public class LogicalGraphLookup extends GraphLookup { + + /** + * Creates a LogicalGraphLookup. + * + * @param cluster Cluster + * @param traitSet Trait set + * @param source Source table RelNode + * @param lookup Lookup table RelNode + * @param startField Field name for start entities + * @param fromField Field name for outgoing edges + * @param toField Field name for incoming edges + * @param outputField Name of the output array field + * @param depthField Name of the depth field + * @param maxDepth Maximum traversal depth (-1 for unlimited) + * @param bidirectional Whether to traverse edges in both directions + * @param supportArray Whether to support array-typed fields + * @param batchMode Whether to batch all source start values into a single unified BFS + * @param usePIT Whether to use PIT (Point In Time) search for complete results + */ + protected LogicalGraphLookup( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode source, + RelNode lookup, + String startField, + String fromField, + String toField, + String outputField, + @Nullable String depthField, + int maxDepth, + boolean bidirectional, + boolean supportArray, + boolean batchMode, + boolean usePIT) { + super( + cluster, + traitSet, + source, + lookup, + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode, + usePIT); + } + + /** + * Creates a LogicalGraphLookup with Convention.NONE. + * + * @param source Source table RelNode + * @param lookup Lookup table RelNode + * @param startField Field name for start entities + * @param fromField Field name for outgoing edges + * @param toField Field name for incoming edges + * @param outputField Name of the output array field + * @param depthField Named of the output depth field + * @param maxDepth Maximum traversal depth (-1 for unlimited) + * @param bidirectional Whether to traverse edges in both directions + * @param supportArray Whether to support array-typed fields + * @param batchMode Whether to batch all source start values into a single unified BFS + * @param usePIT Whether to use PIT (Point In Time) search for complete results + * @return A new LogicalGraphLookup instance + */ + public static LogicalGraphLookup create( + RelNode source, + RelNode lookup, + String startField, + String fromField, + String toField, + String outputField, + @Nullable String depthField, + int maxDepth, + boolean bidirectional, + boolean supportArray, + boolean batchMode, + boolean usePIT) { + RelOptCluster cluster = source.getCluster(); + RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE); + return new LogicalGraphLookup( + cluster, + traitSet, + source, + lookup, + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode, + usePIT); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new LogicalGraphLookup( + getCluster(), + traitSet, + inputs.get(0), + inputs.get(1), + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode, + usePIT); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java index 17d99fb4fbb..8dfe963081c 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java @@ -223,7 +223,7 @@ public static ExprType convertSqlTypeNameToExprType(SqlTypeName sqlTypeName) { case BIGINT -> LONG; case FLOAT, REAL -> FLOAT; case DOUBLE, DECIMAL -> DOUBLE; // TODO the decimal is only used for literal - case CHAR, VARCHAR -> STRING; + case CHAR, VARCHAR, MULTISET -> STRING; // call toString() for MULTISET case BOOLEAN -> BOOLEAN; case DATE -> DATE; case TIME, TIME_TZ, TIME_WITH_LOCAL_TIME_ZONE -> TIME; diff --git a/docs/user/ppl/cmd/graphlookup.md b/docs/user/ppl/cmd/graphlookup.md new file mode 100644 index 00000000000..e768e02f6b8 --- /dev/null +++ b/docs/user/ppl/cmd/graphlookup.md @@ -0,0 +1,335 @@ + +# graphLookup + +The `graphLookup` command performs recursive graph traversal on a collection using a breadth-first search (BFS) algorithm. It searches for documents matching a start value and recursively traverses connections between documents based on specified fields. This is useful for hierarchical data like organizational charts, social networks, or routing graphs. + +## Syntax + +The `graphLookup` command has the following syntax: + +```syntax +graphLookup startField= fromField= toField= [maxDepth=] [depthField=] [direction=(uni | bi)] [supportArray=(true | false)] [batchMode=(true | false)] [usePIT=(true | false)] as +``` + +The following are examples of the `graphLookup` command syntax: + +```syntax +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name as reportingHierarchy +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name maxDepth=2 as reportingHierarchy +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name depthField=level as reportingHierarchy +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name direction=bi as connections +source = travelers | graphLookup airports startField=nearestAirport fromField=connects toField=airport supportArray=true as reachableAirports +source = airports | graphLookup airports startField=airport fromField=connects toField=airport supportArray=true as reachableAirports +``` + +## Parameters + +The `graphLookup` command supports the following parameters. + +| Parameter | Required/Optional | Description | +| --- | --- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `` | Required | The name of the index to perform the graph traversal on. Can be the same as the source index for self-referential graphs. | +| `startField=` | Required | The field in the source documents whose value is used to start the recursive search. The value of this field is matched against `toField` in the lookup index. We support both single value and array values as starting points. | +| `fromField=` | Required | The field in the lookup index documents that contains the value to recurse on. After matching a document, the value of this field is used to find the next set of documents. It supports both single value and array values. | +| `toField=` | Required | The field in the lookup index documents to match against. Documents where `toField` equals the current traversal value are included in the results. | +| `maxDepth=` | Optional | The maximum recursion depth of hops. Default is `0`. A value of `0` means only the direct connections to the statr values are returned. A value of `1` means 1 hop connections (initial match plus one recursive step), and so on. | +| `depthField=` | Optional | The name of the field to add to each traversed document indicating its recursion depth. If not specified, no depth field is added. Depth starts at `0` for the first level of matches. | +| `direction=(uni \| bi)` | Optional | The traversal direction. `uni` (default) performs unidirectional traversal following edges in the forward direction only. `bi` performs bidirectional traversal, following edges in both directions. | +| `supportArray=(true \| false)` | Optional | When `true`, disables early visited-node filter pushdown to OpenSearch. Default is `false`. Set to `true` when `fromField` or `toField` contains array values to ensure correct traversal behavior. See [Array Field Handling](#array-field-handling) for details. | +| `batchMode=(true \| false)` | Optional | When `true`, collects all start values from all source rows and performs a single unified BFS traversal. Default is `false`. The output changes to two arrays: `[Array, Array]`. See [Batch Mode](#batch-mode) for details. | +| `usePIT=(true \| false)` | Optional | When `true`, enables PIT (Point In Time) search for the lookup table, allowing paginated retrieval of complete results without the `max_result_window` size limit. Default is `false`. See [PIT Search](#pit-search) for details. | +| `as ` | Required | The name of the output array field that will contain all documents found during the graph traversal. | + +## How It Works + +The `graphLookup` command performs a breadth-first search (BFS) traversal: + +1. For each source document, extract the value of `startField` +2. Query the lookup index to find documents where `toField` matches the start value +3. Add matched documents to the result array +4. Extract `fromField` values from matched documents to continue traversal +5. Repeat steps 2-4 until no new documents are found or `maxDepth` is reached + +For bidirectional traversal (`direction=bi`), the algorithm also follows edges in the reverse direction by additionally matching `fromField` values. + +## Example 1: Employee Hierarchy Traversal + +Given an `employees` index with the following documents: + +| id | name | reportsTo | +|----|------|-----------| +| 1 | Dev | Eliot | +| 2 | Eliot | Ron | +| 3 | Ron | Andrew | +| 4 | Andrew | null | +| 5 | Asya | Ron | +| 6 | Dan | Andrew | + +The following query finds the reporting chain for each employee: + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + as reportingHierarchy +``` + +The query returns the following results: + +```text ++--------+----------+----+---------------------+ +| name | reportsTo| id | reportingHierarchy | ++--------+----------+----+---------------------+ +| Dev | Eliot | 1 | [{Eliot, Ron, 2}] | +| Eliot | Ron | 2 | [{Ron, Andrew, 3}] | +| Ron | Andrew | 3 | [{Andrew, null, 4}] | +| Andrew | null | 4 | [] | +| Asya | Ron | 5 | [{Ron, Andrew, 3}] | +| Dan | Andrew | 6 | [{Andrew, null, 4}] | ++--------+----------+----+---------------------+ +``` + +For Dev, the traversal starts with `reportsTo="Eliot"`, finds the Eliot record, and returns it in the `reportingHierarchy` array. + +## Example 2: Employee Hierarchy with Depth Tracking + +The following query adds a depth field to track how many levels each manager is from the employee: + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + depthField=level + as reportingHierarchy +``` + +The query returns the following results: + +```text ++--------+----------+----+------------------------+ +| name | reportsTo| id | reportingHierarchy | ++--------+----------+----+------------------------+ +| Dev | Eliot | 1 | [{Eliot, Ron, 2, 0}] | +| Eliot | Ron | 2 | [{Ron, Andrew, 3, 0}] | +| Ron | Andrew | 3 | [{Andrew, null, 4, 0}] | +| Andrew | null | 4 | [] | +| Asya | Ron | 5 | [{Ron, Andrew, 3, 0}] | +| Dan | Andrew | 6 | [{Andrew, null, 4, 0}] | ++--------+----------+----+------------------------+ +``` + +The depth field `level` is appended to each document in the result array. A value of `0` indicates the first level of matches. + +## Example 3: Limited Depth Traversal + +The following query limits traversal to 2 levels using `maxDepth=1`: + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + maxDepth=1 + as reportingHierarchy +``` + +The query returns the following results: + +```text ++--------+----------+----+--------------------------------------+ +| name | reportsTo| id | reportingHierarchy | ++--------+----------+----+--------------------------------------+ +| Dev | Eliot | 1 | [{Eliot, Ron, 2}, {Ron, Andrew, 3}] | +| Eliot | Ron | 2 | [{Ron, Andrew, 3}, {Andrew, null, 4}]| +| Ron | Andrew | 3 | [{Andrew, null, 4}] | +| Andrew | null | 4 | [] | +| Asya | Ron | 5 | [{Ron, Andrew, 3}, {Andrew, null, 4}]| +| Dan | Andrew | 6 | [{Andrew, null, 4}] | ++--------+----------+----+--------------------------------------+ +``` + +With `maxDepth=1`, the traversal goes two levels deep (depth 0 and depth 1). + +## Example 4: Airport Connections Graph + +Given an `airports` index with the following documents: + +| airport | connects | +|---------|----------| +| JFK | [BOS, ORD] | +| BOS | [JFK, PWM] | +| ORD | [JFK] | +| PWM | [BOS, LHR] | +| LHR | [PWM] | + +The following query finds reachable airports from each airport: + +```ppl ignore +source = airports + | graphLookup airports + startField=airport + fromField=connects + toField=airport + as reachableAirports +``` + +The query returns the following results: + +```text ++---------+------------+---------------------+ +| airport | connects | reachableAirports | ++---------+------------+---------------------+ +| JFK | [BOS, ORD] | [{JFK, [BOS, ORD]}] | +| BOS | [JFK, PWM] | [{BOS, [JFK, PWM]}] | +| ORD | [JFK] | [{ORD, [JFK]}] | +| PWM | [BOS, LHR] | [{PWM, [BOS, LHR]}] | +| LHR | [PWM] | [{LHR, [PWM]}] | ++---------+------------+---------------------+ +``` + +## Example 5: Cross-Index Graph Lookup + +The `graphLookup` command can use different source and lookup indexes. Given a `travelers` index: + +| name | nearestAirport | +|------|----------------| +| Dev | JFK | +| Eliot | JFK | +| Jeff | BOS | + +The following query finds reachable airports for each traveler: + +```ppl ignore +source = travelers + | graphLookup airports + startField=nearestAirport + fromField=connects + toField=airport + as reachableAirports +``` + +The query returns the following results: + +```text ++-------+----------------+---------------------+ +| name | nearestAirport | reachableAirports | ++-------+----------------+---------------------+ +| Dev | JFK | [{JFK, [BOS, ORD]}] | +| Eliot | JFK | [{JFK, [BOS, ORD]}] | +| Jeff | BOS | [{BOS, [JFK, PWM]}] | ++-------+----------------+---------------------+ +``` + +## Example 6: Bidirectional Traversal + +The following query performs bidirectional traversal to find both managers and colleagues who share the same manager: + +```ppl ignore +source = employees + | where name = 'Ron' + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + direction=bi + as connections +``` + +The query returns the following results: + +```text ++------+----------+----+------------------------------------------------+ +| name | reportsTo| id | connections | ++------+----------+----+------------------------------------------------+ +| Ron | Andrew | 3 | [{Ron, Andrew, 3}, {Andrew, null, 4}, {Dan, Andrew, 6}] | ++------+----------+----+------------------------------------------------+ +``` + +With bidirectional traversal, Ron's connections include: +- His own record (Ron reports to Andrew) +- His manager (Andrew) +- His peer (Dan, who also reports to Andrew) + +## Batch Mode + +When `batchMode=true`, the `graphLookup` command collects all start values from all source rows and performs a single unified BFS traversal instead of separate traversals per row. + +### Output Format Change + +In batch mode, the output is a **single row** with two arrays: +- First array: All source rows collected +- Second array: All lookup results from the unified BFS traversal + +### When to Use Batch Mode + +Use `batchMode=true` when: +- You want to find all nodes reachable from **any** of the source start values +- You need a global view of the graph connectivity from multiple starting points +- You want to avoid duplicate traversals when multiple source rows share overlapping paths + +### Example + +```ppl ignore +source = travelers + | graphLookup airports + startField=nearestAirport + fromField=connects + toField=airport + batchMode=true + maxDepth=2 + as reachableAirports +``` + +**Normal mode** (default): Each traveler gets their own list of reachable airports +```text +| name | nearestAirport | reachableAirports | +|-------|----------------|-------------------| +| Dev | JFK | [JFK, BOS, ORD] | +| Jeff | BOS | [BOS, JFK, PWM] | +``` + +**Batch mode**: A single row with all travelers and all reachable airports combined +```text +| travelers | reachableAirports | +|----------------------------------------|-----------------------------| +| [{Dev, JFK}, {Jeff, BOS}] | [JFK, BOS, ORD, PWM, ...] | +``` + +## Array Field Handling + +When the `fromField` or `toField` contains array values, you should set `supportArray=true` to ensure correct traversal behavior. + +## PIT Search + +By default, each level of BFS traversal limits the number of returned documents to the `max_result_window` setting of the lookup index (typically 10,000). This avoids the overhead of PIT (Point In Time) search but may return incomplete results when a single traversal level matches more documents than the limit. + +When `usePIT=true`, this limit is removed and the lookup table uses PIT-based pagination, which ensures all matching documents are retrieved at each traversal level. This provides complete and accurate results at the cost of additional search overhead. + +### When to Use PIT Search + +Use `usePIT=true` when: +- The graph has high-degree nodes where a single traversal level may match more than `max_result_window` documents +- Result completeness is more important than query performance +- You observe incomplete or missing results with the default setting + +### Example + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + usePIT=true + as reportingHierarchy +``` + +## Limitations + +- The source input, which provides the starting point for the traversal, has a limitation of 100 documents to avoid performance issues. +- When `usePIT=false` (default), each level of traversal search returns documents up to the `max_result_window` of the lookup index, which may result in incomplete data. Set `usePIT=true` to retrieve complete results. diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java new file mode 100644 index 00000000000..498b17dab91 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java @@ -0,0 +1,603 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_AIRPORTS; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_EMPLOYEES; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_TRAVELERS; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +/** + * Integration tests for graphLookup command. Test cases are inspired by MongoDB's $graphLookup + * examples. + * + *

Test data: + * + *

    + *
  • graph_employees: Employee hierarchy (Dev->Eliot->Ron->Andrew, Asya->Ron, Dan->Andrew) + *
  • graph_travelers: Travelers with nearest airport (Dev->JFK, Eliot->JFK, Jeff->BOS) + *
  • graph_airports: Airport connections (JFK, BOS, ORD, PWM, LHR) + *
+ * + * @see MongoDB + * $graphLookup + */ +public class CalcitePPLGraphLookupIT extends PPLIntegTestCase { + + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + + loadIndex(Index.GRAPH_EMPLOYEES); + loadIndex(Index.GRAPH_TRAVELERS); + loadIndex(Index.GRAPH_AIRPORTS); + } + + // ==================== Employee Hierarchy Tests ==================== + + /** Test 1: Basic employee hierarchy traversal. Find all managers in the reporting chain. */ + @Test + public void testEmployeeHierarchyBasicTraversal() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows("Dev", "Eliot", 1, List.of("{Eliot, Ron, 2}")), + rows("Eliot", "Ron", 2, List.of("{Ron, Andrew, 3}")), + rows("Ron", "Andrew", 3, List.of("{Andrew, null, 4}")), + rows("Andrew", null, 4, Collections.emptyList()), + rows("Asya", "Ron", 5, List.of("{Ron, Andrew, 3}")), + rows("Dan", "Andrew", 6, List.of("{Andrew, null, 4}"))); + } + + /** Test 2: Employee hierarchy traversal with depth field. */ + @Test + public void testEmployeeHierarchyWithDepthField() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " depthField=level" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows("Dev", "Eliot", 1, List.of("{Eliot, Ron, 2, 0}")), + rows("Eliot", "Ron", 2, List.of("{Ron, Andrew, 3, 0}")), + rows("Ron", "Andrew", 3, List.of("{Andrew, null, 4, 0}")), + rows("Andrew", null, 4, Collections.emptyList()), + rows("Asya", "Ron", 5, List.of("{Ron, Andrew, 3, 0}")), + rows("Dan", "Andrew", 6, List.of("{Andrew, null, 4, 0}"))); + } + + /** Test 3: Employee hierarchy with maxDepth=1 (allows 2 levels of traversal). */ + @Test + public void testEmployeeHierarchyWithMaxDepth() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " maxDepth=1" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows("Dev", "Eliot", 1, List.of("{Eliot, Ron, 2}", "{Ron, Andrew, 3}")), + rows("Eliot", "Ron", 2, List.of("{Ron, Andrew, 3}", "{Andrew, null, 4}")), + rows("Ron", "Andrew", 3, List.of("{Andrew, null, 4}")), + rows("Andrew", null, 4, Collections.emptyList()), + rows("Asya", "Ron", 5, List.of("{Ron, Andrew, 3}", "{Andrew, null, 4}")), + rows("Dan", "Andrew", 6, List.of("{Andrew, null, 4}"))); + } + + /** Test 4: Query Dev's complete reporting chain: Dev->Eliot->Ron->Andrew */ + @Test + public void testEmployeeHierarchyForSpecificEmployee() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Dev'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows(result, rows("Dev", "Eliot", 1, List.of("{Eliot, Ron, 2}"))); + } + + // ==================== Airport Connections Tests ==================== + + /** Test 5: Find all reachable airports from each airport. */ + @Test + public void testAirportConnections() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=airport" + + " fromField=connects" + + " toField=airport" + + " supportArray=true" + + " as reachableAirports", + TEST_INDEX_GRAPH_AIRPORTS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("airport", "string"), + schema("connects", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows("JFK", List.of("BOS", "ORD"), List.of("{JFK, [BOS, ORD]}")), + rows("BOS", List.of("JFK", "PWM"), List.of("{BOS, [JFK, PWM]}")), + rows("ORD", List.of("JFK"), List.of("{ORD, [JFK]}")), + rows("PWM", List.of("BOS", "LHR"), List.of("{PWM, [BOS, LHR]}")), + rows("LHR", List.of("PWM"), List.of("{LHR, [PWM]}"))); + } + + /** Test 6: Find airports reachable from JFK within maxDepth=1. */ + @Test + public void testAirportConnectionsWithMaxDepth() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where airport = 'JFK'" + + " | graphLookup %s" + + " startField=airport" + + " fromField=connects" + + " toField=airport" + + " maxDepth=1" + + " supportArray=true" + + " as reachableAirports", + TEST_INDEX_GRAPH_AIRPORTS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("airport", "string"), + schema("connects", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows("JFK", List.of("BOS", "ORD"), List.of("{JFK, [BOS, ORD]}", "{BOS, [JFK, PWM]}"))); + } + + /** Test 7: Find airports with default depth(=0) and start value of list */ + @Test + public void testAirportConnectionsWithDepthField() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where airport = 'JFK'" + + " | graphLookup %s" + + " startField=connects" + + " fromField=connects" + + " toField=airport" + + " depthField=numConnections" + + " as reachableAirports", + TEST_INDEX_GRAPH_AIRPORTS, TEST_INDEX_GRAPH_AIRPORTS)); + verifySchema( + result, + schema("airport", "string"), + schema("connects", "string"), + schema("reachableAirports", "array")); + verifyDataRows(result, rows("JFK", List.of("BOS", "ORD"), List.of("{BOS, [JFK, PWM], 0}"))); + } + + /** + * Test 8: Find reachable airports for all travelers. Uses travelers as source and airports as + * lookup table, with nearestAirport as the starting point for graph traversal. + */ + @Test + public void testTravelersReachableAirports() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=nearestAirport" + + " fromField=connects" + + " toField=airport" + + " as reachableAirports", + TEST_INDEX_GRAPH_TRAVELERS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("name", "string"), + schema("nearestAirport", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows("Dev", "JFK", List.of("{JFK, [BOS, ORD]}")), + rows("Eliot", "JFK", List.of("{JFK, [BOS, ORD]}")), + rows("Jeff", "BOS", List.of("{BOS, [JFK, PWM]}"))); + } + + /** + * Test 9: Find reachable airports for a specific traveler (Dev at JFK) with depth tracking. + * Traverses from JFK through connected airports. + */ + @Test + public void testTravelerReachableAirportsWithDepthField() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Dev'" + + " | graphLookup %s" + + " startField=nearestAirport" + + " fromField=connects" + + " toField=airport" + + " depthField=hops" + + " as reachableAirports", + TEST_INDEX_GRAPH_TRAVELERS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("name", "string"), + schema("nearestAirport", "string"), + schema("reachableAirports", "array")); + verifyDataRows(result, rows("Dev", "JFK", List.of("{JFK, [BOS, ORD], 0}"))); + } + + /** + * Test 10: Find reachable airports for Jeff (at BOS) with maxDepth=1. Finds BOS record as the + * starting point and traverses one level to connected airports. + */ + @Test + public void testTravelerReachableAirportsWithMaxDepth() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Jeff'" + + " | graphLookup %s" + + " startField=nearestAirport" + + " fromField=connects" + + " toField=airport" + + " maxDepth=1" + + " supportArray=true" + + " as reachableAirports", + TEST_INDEX_GRAPH_TRAVELERS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("name", "string"), + schema("nearestAirport", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows( + "Jeff", "BOS", List.of("{BOS, [JFK, PWM]}", "{JFK, [BOS, ORD]}", "{PWM, [BOS, LHR]}"))); + } + + // ==================== Bidirectional Traversal Tests ==================== + + /** Test 11: Bidirectional traversal for Ron (finds both managers and reports). */ + @Test + public void testBidirectionalEmployeeHierarchy() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Ron'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " direction=bi" + + " as connections", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("connections", "array")); + verifyDataRows( + result, + rows( + "Ron", + "Andrew", + 3, + List.of("{Ron, Andrew, 3}", "{Andrew, null, 4}", "{Dan, Andrew, 6}"))); + } + + /** + * Test 12: Bidirectional airport connections for ORD. Note: Currently returns empty + * allConnections array because the connects field is an array type. + */ + @Test + public void testBidirectionalAirportConnections() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where airport = 'ORD'" + + " | graphLookup %s" + + " startField=connects" + + " fromField=connects" + + " toField=airport" + + " direction=bi" + + " as allConnections", + TEST_INDEX_GRAPH_AIRPORTS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("airport", "string"), + schema("connects", "string"), + schema("allConnections", "array")); + verifyDataRows( + result, rows("ORD", List.of("JFK"), List.of("{JFK, [BOS, ORD]}", "{BOS, [JFK, PWM]}"))); + } + + // ==================== Edge Cases ==================== + + /** Test 13: Graph lookup on empty result set (non-existent employee). */ + @Test + public void testEmptySourceResult() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'NonExistent'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows(result); + } + + /** Test 14: CEO (Andrew) with no manager - hierarchy should be empty. */ + @Test + public void testEmployeeWithNoManager() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Andrew'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows(result, rows("Andrew", null, 4, Collections.emptyList())); + } + + /** Test 15: Combined with stats command. */ + @Test + public void testGraphLookupWithStats() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy" + + " | stats count() by name", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema(result, schema("count()", "bigint"), schema("name", "string")); + verifyDataRows( + result, + rows(1L, "Ron"), + rows(1L, "Dan"), + rows(1L, "Dev"), + rows(1L, "Andrew"), + rows(1L, "Asya"), + rows(1L, "Eliot")); + } + + /** Test 16: Graph lookup with fields projection (name and reportingHierarchy only). */ + @Test + public void testGraphLookupWithFieldsProjection() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy" + + " | fields name, reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema(result, schema("name", "string"), schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows("Dev", List.of("{Eliot, Ron, 2}")), + rows("Eliot", List.of("{Ron, Andrew, 3}")), + rows("Ron", List.of("{Andrew, null, 4}")), + rows("Andrew", Collections.emptyList()), + rows("Asya", List.of("{Ron, Andrew, 3}")), + rows("Dan", List.of("{Andrew, null, 4}"))); + } + + // ==================== Batch Mode Tests ==================== + + /** + * Test 17: Batch mode - collects all start values and performs unified BFS. Output is a single + * row with [Array, Array]. + * + *

Source: Dev (reportsTo=Eliot), Asya (reportsTo=Ron) Start values: {Eliot, Ron} BFS finds: + * Eliot->Ron, Ron->Andrew, Andrew->null + */ + @Test + public void testBatchModeEmployeeHierarchy() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name in ('Dev', 'Asya')" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " depthField=depth" + + " maxDepth=3" + + " batchMode=true" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema(result, schema("reportsTo", "array"), schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows( + List.of("{Dev, Eliot, 1}", "{Asya, Ron, 5}"), + List.of("{Ron, Andrew, 3, 0}", "{Andrew, null, 4, 1}"))); + } + + /** + * Test 18: Batch mode for travelers - find all airports reachable from any traveler. All + * travelers' nearest airports: JFK (Dev, Eliot), BOS (Jeff) Unified BFS from {JFK, BOS} with + * maxDepth=1 finds connected airports. + */ + @Test + public void testBatchModeTravelersAirports() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=nearestAirport" + + " fromField=connects" + + " toField=airport" + + " batchMode=true" + + " depthField=depth" + + " maxDepth=3" + + " supportArray=true" + + " as reachableAirports", + TEST_INDEX_GRAPH_TRAVELERS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema(result, schema("nearestAirport", "array"), schema("reachableAirports", "array")); + // Batch mode returns single row with: + // - sourceRows: [{Dev, JFK}, {Eliot, JFK}, {Jeff, BOS}] + // - lookupResults: airports reachable from JFK and BOS within maxDepth=1 + verifyDataRows( + result, + rows( + List.of("{Dev, JFK}", "{Eliot, JFK}", "{Jeff, BOS}"), + List.of("{JFK, [BOS, ORD], 0}", "{BOS, [JFK, PWM], 0}", "{PWM, [BOS, LHR], 1}"))); + } + + /** + * Test 19: Batch mode with bidirectional traversal. Dev (reportsTo=Eliot), Dan (reportsTo=Andrew) + * Bidirectional BFS from {Eliot, Andrew} finds connections in both directions. + */ + @Test + public void testBatchModeBidirectional() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name in ('Dev', 'Dan')" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " depthField=depth" + + " maxDepth=3" + + " direction=bi" + + " batchMode=true" + + " as connections", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema(result, schema("reportsTo", "array"), schema("connections", "array")); + // Batch mode returns single row with bidirectional traversal results + // Start from {Eliot, Andrew}, find connections in both directions + verifyDataRows( + result, + rows( + List.of("{Dev, Eliot, 1}", "{Dan, Andrew, 6}"), + List.of( + "{Dev, Eliot, 1, 0}", + "{Eliot, Ron, 2, 0}", + "{Andrew, null, 4, 0}", + "{Dan, Andrew, 6, 0}", + "{Asya, Ron, 5, 1}"))); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index c9de7a584c6..5910bc8d476 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -838,6 +838,22 @@ public enum Index { "duplication_nullable", getDuplicationNullableIndexMapping(), "src/test/resources/duplication_nullable.json"), + // Graph lookup test indices (inspired by MongoDB $graphLookup examples) + GRAPH_EMPLOYEES( + TestsConstants.TEST_INDEX_GRAPH_EMPLOYEES, + "graph_employees", + getGraphEmployeesIndexMapping(), + "src/test/resources/graph_employees.json"), + GRAPH_TRAVELERS( + TestsConstants.TEST_INDEX_GRAPH_TRAVELERS, + "graph_travelers", + getGraphTravelersIndexMapping(), + "src/test/resources/graph_travelers.json"), + GRAPH_AIRPORTS( + TestsConstants.TEST_INDEX_GRAPH_AIRPORTS, + "graph_airports", + getGraphAirportsIndexMapping(), + "src/test/resources/graph_airports.json"), TPCH_ORDERS( "orders", "tpch", diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java index aa8b52af4aa..16827623058 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java @@ -338,6 +338,21 @@ public static String getWorkInformationIndexMapping() { return getMappingFile(mappingFile); } + public static String getGraphEmployeesIndexMapping() { + String mappingFile = "graph_employees_index_mapping.json"; + return getMappingFile(mappingFile); + } + + public static String getGraphTravelersIndexMapping() { + String mappingFile = "graph_travelers_index_mapping.json"; + return getMappingFile(mappingFile); + } + + public static String getGraphAirportsIndexMapping() { + String mappingFile = "graph_airports_index_mapping.json"; + return getMappingFile(mappingFile); + } + public static String getDuplicationNullableIndexMapping() { String mappingFile = "duplication_nullable_index_mapping.json"; return getMappingFile(mappingFile); diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java index ad8a232bab3..b5e49bbe022 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java @@ -84,6 +84,9 @@ public class TestsConstants { public static final String TEST_INDEX_WORKER = TEST_INDEX + "_worker"; public static final String TEST_INDEX_WORK_INFORMATION = TEST_INDEX + "_work_information"; public static final String TEST_INDEX_DUPLICATION_NULLABLE = TEST_INDEX + "_duplication_nullable"; + public static final String TEST_INDEX_GRAPH_EMPLOYEES = TEST_INDEX + "_graph_employees"; + public static final String TEST_INDEX_GRAPH_TRAVELERS = TEST_INDEX + "_graph_travelers"; + public static final String TEST_INDEX_GRAPH_AIRPORTS = TEST_INDEX + "_graph_airports"; public static final String TEST_INDEX_MERGE_TEST_1 = TEST_INDEX + "_merge_test_1"; public static final String TEST_INDEX_MERGE_TEST_2 = TEST_INDEX + "_merge_test_2"; public static final String TEST_INDEX_MERGE_TEST_WILDCARD = TEST_INDEX + "_merge_test_*"; diff --git a/integ-test/src/test/resources/graph_airports.json b/integ-test/src/test/resources/graph_airports.json new file mode 100644 index 00000000000..c644a24dc04 --- /dev/null +++ b/integ-test/src/test/resources/graph_airports.json @@ -0,0 +1,10 @@ +{"index":{"_id":"1"}} +{"airport":"JFK","connects":["BOS","ORD"]} +{"index":{"_id":"2"}} +{"airport":"BOS","connects":["JFK","PWM"]} +{"index":{"_id":"3"}} +{"airport":"ORD","connects":["JFK"]} +{"index":{"_id":"4"}} +{"airport":"PWM","connects":["BOS","LHR"]} +{"index":{"_id":"5"}} +{"airport":"LHR","connects":["PWM"]} diff --git a/integ-test/src/test/resources/graph_employees.json b/integ-test/src/test/resources/graph_employees.json new file mode 100644 index 00000000000..a9a2630fc05 --- /dev/null +++ b/integ-test/src/test/resources/graph_employees.json @@ -0,0 +1,12 @@ +{"index":{"_id":"1"}} +{"id":1,"name":"Dev","reportsTo":"Eliot"} +{"index":{"_id":"2"}} +{"id":2,"name":"Eliot","reportsTo":"Ron"} +{"index":{"_id":"3"}} +{"id":3,"name":"Ron","reportsTo":"Andrew"} +{"index":{"_id":"4"}} +{"id":4,"name":"Andrew","reportsTo":null} +{"index":{"_id":"5"}} +{"id":5,"name":"Asya","reportsTo":"Ron"} +{"index":{"_id":"6"}} +{"id":6,"name":"Dan","reportsTo":"Andrew"} diff --git a/integ-test/src/test/resources/graph_travelers.json b/integ-test/src/test/resources/graph_travelers.json new file mode 100644 index 00000000000..eb11d2206cc --- /dev/null +++ b/integ-test/src/test/resources/graph_travelers.json @@ -0,0 +1,6 @@ +{"index":{"_id":"1"}} +{"name":"Dev","nearestAirport":"JFK"} +{"index":{"_id":"2"}} +{"name":"Eliot","nearestAirport":"JFK"} +{"index":{"_id":"3"}} +{"name":"Jeff","nearestAirport":"BOS"} diff --git a/integ-test/src/test/resources/indexDefinitions/graph_airports_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/graph_airports_index_mapping.json new file mode 100644 index 00000000000..e93812c8a1a --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/graph_airports_index_mapping.json @@ -0,0 +1,12 @@ +{ + "mappings": { + "properties": { + "airport": { + "type": "keyword" + }, + "connects": { + "type": "keyword" + } + } + } +} diff --git a/integ-test/src/test/resources/indexDefinitions/graph_employees_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/graph_employees_index_mapping.json new file mode 100644 index 00000000000..8c6674396e4 --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/graph_employees_index_mapping.json @@ -0,0 +1,15 @@ +{ + "mappings": { + "properties": { + "id": { + "type": "integer" + }, + "name": { + "type": "keyword" + }, + "reportsTo": { + "type": "keyword" + } + } + } +} diff --git a/integ-test/src/test/resources/indexDefinitions/graph_travelers_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/graph_travelers_index_mapping.json new file mode 100644 index 00000000000..f4697dead12 --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/graph_travelers_index_mapping.json @@ -0,0 +1,12 @@ +{ + "mappings": { + "properties": { + "name": { + "type": "keyword" + }, + "nearestAirport": { + "type": "keyword" + } + } + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index a7eb3ad57be..58d797f4bf9 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import org.apache.calcite.avatica.util.StructImpl; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; @@ -245,6 +246,9 @@ private static Object processValue(Object value) { } return convertedMap; } + if (value instanceof StructImpl) { + return ((StructImpl) value).toString(); + } if (value instanceof List) { List list = (List) value; List convertedList = new ArrayList<>(); @@ -330,6 +334,9 @@ private void registerOpenSearchFunctions() { BuiltinFunctionName.DISTINCT_COUNT_APPROX, approxDistinctCountFunction); OperatorTable.addOperator( BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(), approxDistinctCountFunction); + + // Note: GraphLookup is now implemented as a custom RelNode (LogicalGraphLookup) + // instead of a UDF, so no registration is needed here. } /** diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java new file mode 100644 index 00000000000..f76c90ab47d --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.planner.rules; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.opensearch.sql.calcite.plan.rel.LogicalGraphLookup; +import org.opensearch.sql.opensearch.storage.OpenSearchIndex; +import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; +import org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableGraphLookup; + +/** Rule to convert a {@link LogicalGraphLookup} to a {@link CalciteEnumerableGraphLookup}. */ +public class EnumerableGraphLookupRule extends ConverterRule { + + /** Default configuration. */ + public static final Config DEFAULT_CONFIG = + Config.INSTANCE + .as(Config.class) + .withConversion( + LogicalGraphLookup.class, + Convention.NONE, + EnumerableConvention.INSTANCE, + "EnumerableGraphLookupRule") + .withRuleFactory(EnumerableGraphLookupRule::new); + + /** Creates an EnumerableGraphLookupRule. */ + protected EnumerableGraphLookupRule(Config config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + LogicalGraphLookup graphLookup = call.rel(0); + // Only match if we can extract the OpenSearchIndex from the lookup table + return extractOpenSearchIndex(graphLookup.getLookup()) != null; + } + + /** + * Recursively extracts OpenSearchIndex from a RelNode by traversing down to find the index scan. + * + * @param node The RelNode to extract from + * @return The OpenSearchIndex, or null if not found + */ + private static OpenSearchIndex extractOpenSearchIndex(RelNode node) { + if (node instanceof AbstractCalciteIndexScan scan) { + return scan.getOsIndex(); + } + if (node instanceof RelSubset subset) { + return extractOpenSearchIndex(subset.getOriginal()); + } + // Recursively check inputs + for (RelNode input : node.getInputs()) { + OpenSearchIndex index = extractOpenSearchIndex(input); + if (index != null) { + return index; + } + } + return null; + } + + @Override + public RelNode convert(RelNode rel) { + final LogicalGraphLookup graphLookup = (LogicalGraphLookup) rel; + + // Extract the OpenSearchIndex from the lookup table + OpenSearchIndex lookupIndex = extractOpenSearchIndex(graphLookup.getLookup()); + if (lookupIndex == null) { + throw new IllegalStateException("Cannot extract OpenSearchIndex from lookup table"); + } + + // Convert inputs to enumerable convention + RelTraitSet traitSet = graphLookup.getTraitSet().replace(EnumerableConvention.INSTANCE); + + RelNode convertedSource = + convert( + graphLookup.getSource(), + graphLookup.getSource().getTraitSet().replace(EnumerableConvention.INSTANCE)); + RelNode convertedLookup = + convert( + graphLookup.getLookup(), + graphLookup.getLookup().getTraitSet().replace(EnumerableConvention.INSTANCE)); + return new CalciteEnumerableGraphLookup( + graphLookup.getCluster(), + traitSet, + convertedSource, + convertedLookup, + graphLookup.getStartField(), + graphLookup.getFromField(), + graphLookup.getToField(), + graphLookup.getOutputField(), + graphLookup.getDepthField(), + graphLookup.getMaxDepth(), + graphLookup.isBidirectional(), + graphLookup.isSupportArray(), + graphLookup.isBatchMode(), + graphLookup.isUsePIT()); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java index db65bb51a80..0068f445ce7 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java @@ -16,6 +16,8 @@ public class OpenSearchIndexRules { EnumerableSystemIndexScanRule.DEFAULT_CONFIG.toRule(); private static final RelOptRule NESTED_AGGREGATE_RULE = EnumerableNestedAggregateRule.DEFAULT_CONFIG.toRule(); + private static final RelOptRule GRAPH_LOOKUP_RULE = + EnumerableGraphLookupRule.DEFAULT_CONFIG.toRule(); // Rule that always pushes down relevance functions regardless of pushdown settings private static final RelevanceFunctionPushdownRule RELEVANCE_FUNCTION_RULE = RelevanceFunctionPushdownRule.Config.DEFAULT.toRule(); @@ -23,7 +25,11 @@ public class OpenSearchIndexRules { /** The rules will apply whatever the pushdown setting is. */ public static final List OPEN_SEARCH_NON_PUSHDOWN_RULES = ImmutableList.of( - INDEX_SCAN_RULE, SYSTEM_INDEX_SCAN_RULE, NESTED_AGGREGATE_RULE, RELEVANCE_FUNCTION_RULE); + INDEX_SCAN_RULE, + SYSTEM_INDEX_SCAN_RULE, + NESTED_AGGREGATE_RULE, + GRAPH_LOOKUP_RULE, + RELEVANCE_FUNCTION_RULE); private static final ProjectIndexScanRule PROJECT_INDEX_SCAN = ProjectIndexScanRule.Config.DEFAULT.toRule(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java index 355262b2d6a..50f782cd154 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -1057,7 +1057,7 @@ QueryExpression isTrue() { throw new PredicateAnalyzerException("isTrue cannot be applied to " + this.getClass()); } - QueryExpression in(LiteralExpression literal) { + public QueryExpression in(LiteralExpression literal) { throw new PredicateAnalyzerException("in cannot be applied to " + this.getClass()); } @@ -1065,7 +1065,7 @@ QueryExpression notIn(LiteralExpression literal) { throw new PredicateAnalyzerException("notIn cannot be applied to " + this.getClass()); } - static QueryExpression create(TerminalExpression expression) { + public static QueryExpression create(TerminalExpression expression) { if (expression instanceof CastExpression) { expression = CastExpression.unpack(expression); } @@ -1673,11 +1673,11 @@ public String getReferenceForTermQuery() { } /** Literal like {@code 'foo' or 42 or true} etc. */ - static final class LiteralExpression implements TerminalExpression { + public static final class LiteralExpression implements TerminalExpression { final RexLiteral literal; - LiteralExpression(RexLiteral literal) { + public LiteralExpression(RexLiteral literal) { this.literal = literal; } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index d7539312cd1..3350c00fb0c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -80,7 +80,7 @@ public class OpenSearchIndex extends AbstractOpenSearchTable { @Getter private final Settings settings; /** {@link OpenSearchRequest.IndexName}. */ - private final OpenSearchRequest.IndexName indexName; + @Getter private final OpenSearchRequest.IndexName indexName; /** The cached mapping of field and type in index. */ private Map cachedFieldOpenSearchTypes = null; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java new file mode 100644 index 00000000000..2cd0f4a746f --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java @@ -0,0 +1,520 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage.scan; + +import static org.opensearch.index.query.QueryBuilders.boolQuery; +import static org.opensearch.index.query.QueryBuilders.termsQuery; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import lombok.Getter; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.tree.Blocks; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.sql.calcite.plan.Scannable; +import org.opensearch.sql.calcite.plan.rel.GraphLookup; +import org.opensearch.sql.opensearch.request.PredicateAnalyzer.NamedFieldExpression; +import org.opensearch.sql.opensearch.storage.scan.context.LimitDigest; +import org.opensearch.sql.opensearch.storage.scan.context.OSRequestBuilderAction; +import org.opensearch.sql.opensearch.storage.scan.context.PushDownType; +import org.opensearch.sql.opensearch.util.OpenSearchRelOptUtil; + +/** + * Enumerable implementation for graphLookup command. + * + *

Performs BFS graph traversal by dynamically querying OpenSearch with filter pushdown instead + * of loading all lookup data into memory. For each source row, it executes BFS queries to find all + * connected nodes in the graph. + */ +@Getter +public class CalciteEnumerableGraphLookup extends GraphLookup implements EnumerableRel, Scannable { + private static final Logger LOG = LogManager.getLogger(); + + /** + * Creates a CalciteEnumerableGraphLookup. + * + * @param cluster Cluster + * @param traitSet Trait set (must include EnumerableConvention) + * @param source Source table RelNode + * @param lookup Lookup table RelNode // * @param lookupIndex OpenSearchIndex for the lookup table + * (extracted from lookup RelNode) + * @param startField Field name for start entities + * @param fromField Field name for outgoing edges + * @param toField Field name for incoming edges + * @param outputField Name of the output array field + * @param depthField Name of the depth field + * @param maxDepth Maximum traversal depth (-1 for unlimited) + * @param bidirectional Whether to traverse edges in both directions + * @param supportArray Whether to support array-typed fields + * @param batchMode Whether to batch all source start values into a single unified BFS + * @param usePIT Whether to use PIT (Point In Time) search for complete results + */ + public CalciteEnumerableGraphLookup( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode source, + RelNode lookup, + String startField, + String fromField, + String toField, + String outputField, + String depthField, + int maxDepth, + boolean bidirectional, + boolean supportArray, + boolean batchMode, + boolean usePIT) { + super( + cluster, + traitSet, + source, + lookup, + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode, + usePIT); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new CalciteEnumerableGraphLookup( + getCluster(), + traitSet, + inputs.get(0), + inputs.get(1), + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode, + usePIT); + } + + @Override + public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + // TODO: make it more accurate + return super.computeSelfCost(planner, mq); + } + + // TODO: support non-scannable inputs + @Override + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + PhysType physType = + PhysTypeImpl.of( + implementor.getTypeFactory(), + OpenSearchRelOptUtil.replaceDot(getCluster().getTypeFactory(), getRowType()), + pref.preferArray()); + + var scanOperator = implementor.stash(this, CalciteEnumerableGraphLookup.class); + return implementor.result(physType, Blocks.toBlock(Expressions.call(scanOperator, "scan"))); + } + + @Override + public Enumerable<@Nullable Object> scan() { + return new GraphLookupEnumerable(this); + } + + /** Enumerable implementation that performs BFS traversal for each source row. */ + private static class GraphLookupEnumerable extends AbstractEnumerable<@Nullable Object> { + + private final CalciteEnumerableGraphLookup graphLookup; + + GraphLookupEnumerable(CalciteEnumerableGraphLookup graphLookup) { + this.graphLookup = graphLookup; + } + + @Override + public Enumerator<@Nullable Object> enumerator() { + return new GraphLookupEnumerator(graphLookup); + } + } + + /** Enumerator that performs BFS for each source row. */ + private static class GraphLookupEnumerator implements Enumerator<@Nullable Object> { + + private final CalciteEnumerableGraphLookup graphLookup; + private final CalciteEnumerableIndexScan lookupScan; + private final Enumerator<@Nullable Object> sourceEnumerator; + private final List lookupFields; + private final int startFieldIndex; + private final int fromFieldIdx; + private final int toFieldIdx; + + private Object[] current = null; + private boolean batchModeCompleted = false; + + @SuppressWarnings("unchecked") + GraphLookupEnumerator(CalciteEnumerableGraphLookup graphLookup) { + this.graphLookup = graphLookup; + this.lookupScan = (CalciteEnumerableIndexScan) graphLookup.getLookup(); + if (!graphLookup.usePIT) { + // When usePIT is false (default), limit the size of the lookup table to MaxResultWindow + // to avoid PIT search for better performance, but results may be incomplete + final int maxResultWindow = this.lookupScan.getOsIndex().getMaxResultWindow(); + this.lookupScan.pushDownContext.add( + PushDownType.LIMIT, + new LimitDigest(maxResultWindow, 0), + (OSRequestBuilderAction) + requestBuilder -> requestBuilder.pushDownLimit(maxResultWindow, 0)); + } + // When usePIT is true, no limit is set, allowing PIT-based pagination for complete results + + // Get the source enumerator + if (graphLookup.getSource() instanceof Scannable scannable) { + Enumerable sourceEnum = scannable.scan(); + this.sourceEnumerator = (Enumerator<@Nullable Object>) sourceEnum.enumerator(); + } else { + throw new IllegalStateException( + "Source must be Scannable, got: " + graphLookup.getSource().getClass()); + } + + List sourceFields = graphLookup.getSource().getRowType().getFieldNames(); + this.lookupFields = graphLookup.getLookup().getRowType().getFieldNames(); + this.startFieldIndex = sourceFields.indexOf(graphLookup.getStartField()); + this.fromFieldIdx = lookupFields.indexOf(graphLookup.fromField); + this.toFieldIdx = lookupFields.indexOf(graphLookup.toField); + } + + @Override + public Object current() { + // source fields + output array (normal mode) or [source array, lookup array] (batch mode) + return current; + } + + @Override + public boolean moveNext() { + if (graphLookup.batchMode) { + return moveNextBatchMode(); + } else { + return moveNextNormalMode(); + } + } + + /** + * Batch mode: collect all source start values, perform unified BFS, return single aggregated + * row. + */ + private boolean moveNextBatchMode() { + // Batch mode only returns one row + if (batchModeCompleted) { + return false; + } + batchModeCompleted = true; + + // Collect all source rows and start values + List allSourceRows = new ArrayList<>(); + Set allStartValues = new HashSet<>(); + + while (sourceEnumerator.moveNext()) { + Object sourceRow = sourceEnumerator.current(); + Object[] sourceValues; + + if (sourceRow instanceof Object[] arr) { + sourceValues = arr; + } else { + sourceValues = new Object[] {sourceRow}; + } + + // Store the source row + allSourceRows.add(sourceValues); + + // Collect start value(s) + Object startValue = + (startFieldIndex >= 0 && startFieldIndex < sourceValues.length) + ? sourceValues[startFieldIndex] + : null; + + if (startValue != null) { + if (startValue instanceof List list) { + allStartValues.addAll(list); + } else { + allStartValues.add(startValue); + } + } + } + + // Perform unified BFS with all start values + List bfsResults = performBfs(allStartValues); + + // Build output row: [Array, Array] + current = new Object[] {allSourceRows, bfsResults}; + + return true; + } + + /** Normal mode: perform BFS for each source row individually. */ + private boolean moveNextNormalMode() { + if (!sourceEnumerator.moveNext()) { + return false; + } + + // Get current source row + Object sourceRow = sourceEnumerator.current(); + Object[] sourceValues; + + if (sourceRow instanceof Object[] arr) { + sourceValues = arr; + } else { + // Single column case + sourceValues = new Object[] {sourceRow}; + } + + // Get the start value for BFS + Object startValue = + (startFieldIndex >= 0 && startFieldIndex < sourceValues.length) + ? sourceValues[startFieldIndex] + : null; + + // Perform BFS traversal + List bfsResults = performBfs(startValue); + + // Build output row: source fields + array of BFS results + current = new Object[sourceValues.length + 1]; + System.arraycopy(sourceValues, 0, current, 0, sourceValues.length); + current[sourceValues.length] = bfsResults; + + return true; + } + + /** + * Performs BFS traversal starting from the given value by dynamically querying OpenSearch. + * + * @param startValue The starting value for BFS + * @return List of rows found during traversal + */ + private List performBfs(Object startValue) { + if (startValue == null) { + return List.of(); + } + + // TODO: support spillable for these collections + List results = new ArrayList<>(); + // TODO: If we want to include loop edges, we also need to track the visited edges + Set visitedNodes = new HashSet<>(); + Queue queue = new ArrayDeque<>(); + + // Initialize BFS with start value + if (startValue instanceof Collection collection) { + collection.forEach( + value -> { + if (!visitedNodes.contains(value)) { + visitedNodes.add(value); + queue.offer(value); + } + }); + } else { + visitedNodes.add(startValue); + queue.offer(startValue); + } + + int currentLevelDepth = 0; + while (!queue.isEmpty()) { + // Collect all values at current level for batch query + List currentLevelValues = new ArrayList<>(); + + while (!queue.isEmpty()) { + Object value = queue.poll(); + currentLevelValues.add(value); + } + + if (currentLevelValues.isEmpty()) { + break; + } + + // Query OpenSearch for all current level values + // Forward direction: fromField = currentLevelValues + List forwardResults = queryLookupTable(currentLevelValues, visitedNodes); + + if (!graphLookup.usePIT + && forwardResults.size() >= this.lookupScan.getOsIndex().getMaxResultWindow()) { + LOG.warn("BFS result size exceeds max result window, returning partial result."); + } + for (Object row : forwardResults) { + Object[] rowArray = (Object[]) (row); + Object fromValue = rowArray[fromFieldIdx]; + // Collect next values to traverse (may be single value or list) + // For forward traversal: extract fromField values for next level + // For bidirectional: also extract toField values. + // Skip visited values while keep null value + List nextValues = new ArrayList<>(); + collectValues(fromValue, nextValues, visitedNodes); + if (graphLookup.bidirectional) { + Object toValue = rowArray[toFieldIdx]; + collectValues(toValue, nextValues, visitedNodes); + } + + // Add row to results if the nextValues is not empty + if (!nextValues.isEmpty()) { + if (graphLookup.depthField != null) { + Object[] rowWithDepth = new Object[rowArray.length + 1]; + System.arraycopy(rowArray, 0, rowWithDepth, 0, rowArray.length); + rowWithDepth[rowArray.length] = currentLevelDepth; + results.add(rowWithDepth); + } else { + results.add(rowArray); + } + + // Add unvisited non-null values to queue for next level traversal + for (Object val : nextValues) { + if (val != null) { + visitedNodes.add(val); + queue.offer(val); + } + } + } + } + + if (++currentLevelDepth > graphLookup.maxDepth) break; + } + + return results; + } + + /** + * Queries the lookup table with a terms filter. + * + * @param values Values to match + * @param visitedValues Values to not match (ignored when supportArray is true) + * @return List of matching rows + */ + private List queryLookupTable( + Collection values, Collection visitedValues) { + if (values.isEmpty()) { + return List.of(); + } + + // Forward direction query + QueryBuilder query; + if (graphLookup.supportArray) { + // When supportArray is true, don't push down visited filter + // because array fields may contain multiple values that need to be checked individually + query = getQueryBuilder(toFieldIdx, values); + } else { + query = + boolQuery() + .must(getQueryBuilder(toFieldIdx, values)) + .mustNot(getQueryBuilder(fromFieldIdx, visitedValues)); + } + + if (graphLookup.bidirectional) { + // Also query fromField for bidirectional traversal + QueryBuilder backQuery; + if (graphLookup.supportArray) { + backQuery = getQueryBuilder(fromFieldIdx, values); + } else { + backQuery = + boolQuery() + .must(getQueryBuilder(fromFieldIdx, values)) + .mustNot(getQueryBuilder(toFieldIdx, visitedValues)); + } + query = QueryBuilders.boolQuery().should(query).should(backQuery); + } + CalciteEnumerableIndexScan newScan = (CalciteEnumerableIndexScan) this.lookupScan.copy(); + QueryBuilder finalQuery = query; + newScan.pushDownContext.add( + PushDownType.FILTER, + null, + (OSRequestBuilderAction) + requestBuilder -> requestBuilder.pushDownFilterForCalcite(finalQuery)); + Iterator<@Nullable Object> res = newScan.scan().iterator(); + List results = new ArrayList<>(); + while (res.hasNext()) { + results.add(res.next()); + } + closeIterator(res); + return results; + } + + private static void closeIterator(@Nullable Iterator iterator) { + if (iterator instanceof AutoCloseable) { + try { + ((AutoCloseable) iterator).close(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Provides a query builder to search edges with the field matching values + * + * @param fieldIdx field index + * @param values values to match + * @return query builder + */ + private QueryBuilder getQueryBuilder(int fieldIdx, Collection values) { + String fieldName = + new NamedFieldExpression(fieldIdx, lookupFields, lookupScan.getOsIndex().getFieldTypes()) + .getReferenceForTermQuery(); + return termsQuery(fieldName, values); + } + + /** + * Collects values from a field that may be a single value or a list. + * + * @param value The field value (may be single value or List) + * @param collector The list to collect values into + * @param visited Previously visited values to avoid duplicates + */ + private void collectValues(Object value, List collector, Set visited) { + if (value instanceof List list) { + for (Object item : list) { + if (!visited.contains(item)) { + collector.add(item); + } + } + } else if (!visited.contains(value)) { + collector.add(value); + } + } + + @Override + public void reset() { + sourceEnumerator.reset(); + current = null; + } + + @Override + public void close() { + sourceEnumerator.close(); + } + } +} diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 1939124eedb..09ec84c4535 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -52,6 +52,18 @@ TIMECHART: 'TIMECHART'; APPENDCOL: 'APPENDCOL'; ADDTOTALS: 'ADDTOTALS'; ADDCOLTOTALS: 'ADDCOLTOTALS'; +GRAPHLOOKUP: 'GRAPHLOOKUP'; +START_FIELD: 'STARTFIELD'; +FROM_FIELD: 'FROMFIELD'; +TO_FIELD: 'TOFIELD'; +MAX_DEPTH: 'MAXDEPTH'; +DEPTH_FIELD: 'DEPTHFIELD'; +DIRECTION: 'DIRECTION'; +UNI: 'UNI'; +BI: 'BI'; +SUPPORT_ARRAY: 'SUPPORTARRAY'; +BATCH_MODE: 'BATCHMODE'; +USE_PIT: 'USEPIT'; ROW: 'ROW'; COL: 'COL'; EXPAND: 'EXPAND'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 2131eeae939..d860874a1e9 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -90,6 +90,7 @@ commands | appendPipeCommand | replaceCommand | mvcombineCommand + | graphLookupCommand ; commandName @@ -135,6 +136,7 @@ commandName | REPLACE | MVCOMBINE | TRANSPOSE + | GRAPHLOOKUP ; searchCommand @@ -625,6 +627,22 @@ addcoltotalsOption | (LABELFIELD EQUAL stringLiteral) ; +graphLookupCommand + : GRAPHLOOKUP lookupTable = tableSourceClause graphLookupOption* AS outputField = fieldExpression + ; + +graphLookupOption + : (START_FIELD EQUAL fieldExpression) + | (FROM_FIELD EQUAL fieldExpression) + | (TO_FIELD EQUAL fieldExpression) + | (MAX_DEPTH EQUAL integerLiteral) + | (DEPTH_FIELD EQUAL fieldExpression) + | (DIRECTION EQUAL (UNI | BI)) + | (SUPPORT_ARRAY EQUAL booleanLiteral) + | (BATCH_MODE EQUAL booleanLiteral) + | (USE_PIT EQUAL booleanLiteral) + ; + // clauses fromClause : SOURCE EQUAL tableOrSubqueryClause @@ -1676,5 +1694,11 @@ searchableKeyWord | ROW | COL | COLUMN_NAME + | FROM_FIELD + | TO_FIELD + | MAX_DEPTH + | DEPTH_FIELD + | DIRECTION + | UNI + | BI ; - diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index d7c725f957a..721f13033f3 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -84,6 +84,8 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; +import org.opensearch.sql.ast.tree.GraphLookup.Direction; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; @@ -1481,4 +1483,74 @@ public UnresolvedPlan visitAddcoltotalsCommand( java.util.Map options = cmdOptionsBuilder.build(); return new AddColTotals(fieldList, options); } + + @Override + public UnresolvedPlan visitGraphLookupCommand(OpenSearchPPLParser.GraphLookupCommandContext ctx) { + // Parse lookup table + UnresolvedPlan fromTable = visitTableSourceClause(ctx.lookupTable); + + // Parse options with defaults + Field fromField = null; + Field toField = null; + Literal maxDepth = Literal.ZERO; + Field startField = null; + Field depthField = null; + Direction direction = Direction.UNI; + boolean supportArray = false; + boolean batchMode = false; + boolean usePIT = false; + + for (OpenSearchPPLParser.GraphLookupOptionContext option : ctx.graphLookupOption()) { + if (option.FROM_FIELD() != null) { + fromField = (Field) internalVisitExpression(option.fieldExpression()); + } + if (option.TO_FIELD() != null) { + toField = (Field) internalVisitExpression(option.fieldExpression()); + } + if (option.MAX_DEPTH() != null) { + maxDepth = (Literal) internalVisitExpression(option.integerLiteral()); + } + if (option.START_FIELD() != null) { + startField = (Field) internalVisitExpression(option.fieldExpression()); + } + if (option.DEPTH_FIELD() != null) { + depthField = (Field) internalVisitExpression(option.fieldExpression()); + } + if (option.DIRECTION() != null) { + direction = option.BI() != null ? Direction.BI : Direction.UNI; + } + if (option.SUPPORT_ARRAY() != null) { + Literal literal = (Literal) internalVisitExpression(option.booleanLiteral()); + supportArray = Boolean.TRUE.equals(literal.getValue()); + } + if (option.BATCH_MODE() != null) { + Literal literal = (Literal) internalVisitExpression(option.booleanLiteral()); + batchMode = Boolean.TRUE.equals(literal.getValue()); + } + if (option.USE_PIT() != null) { + Literal literal = (Literal) internalVisitExpression(option.booleanLiteral()); + usePIT = Boolean.TRUE.equals(literal.getValue()); + } + } + + Field as = (Field) internalVisitExpression(ctx.outputField); + + if (fromField == null || toField == null) { + throw new SemanticCheckException("fromField and toField must be specified for graphLookup"); + } + + return GraphLookup.builder() + .fromTable(fromTable) + .fromField(fromField) + .toField(toField) + .as(as) + .maxDepth(maxDepth) + .startField(startField) + .depthField(depthField) + .direction(direction) + .supportArray(supportArray) + .batchMode(batchMode) + .usePIT(usePIT) + .build(); + } } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 4376b5659d4..2402fac8700 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -77,6 +77,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Lookup; @@ -224,6 +225,36 @@ public String visitLookup(Lookup node, String context) { "%s | lookup %s %s%s%s", child, MASK_TABLE, mappingFields, strategy, outputFields); } + @Override + public String visitGraphLookup(GraphLookup node, String context) { + String child = node.getChild().get(0).accept(this, context); + StringBuilder command = new StringBuilder(); + command.append(child).append(" | graphlookup ").append(MASK_TABLE); + if (node.getStartField() != null) { + command.append(" startField=").append(MASK_COLUMN); + } + command.append(" fromField=").append(MASK_COLUMN); + command.append(" toField=").append(MASK_COLUMN); + if (node.getMaxDepth() != null && !Integer.valueOf(0).equals(node.getMaxDepth().getValue())) { + command.append(" maxDepth=").append(MASK_LITERAL); + } + if (node.getDepthField() != null) { + command.append(" depthField=").append(MASK_COLUMN); + } + command.append(" direction=").append(node.getDirection().name().toLowerCase()); + if (node.isSupportArray()) { + command.append(" supportArray=true"); + } + if (node.isBatchMode()) { + command.append(" batchMode=true"); + } + if (node.isUsePIT()) { + command.append(" usePIT=true"); + } + command.append(" as ").append(MASK_COLUMN); + return command.toString(); + } + private String formatFieldAlias(java.util.Map fieldMap) { return fieldMap.entrySet().stream() .map( diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java new file mode 100644 index 00000000000..da169704925 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.apache.calcite.DataContext; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +public class CalcitePPLGraphLookupTest extends CalcitePPLAbstractTest { + + public CalcitePPLGraphLookupTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Test + public void testGraphLookupBasic() { + // Test basic graphLookup with same source and lookup table + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[0]," + + " bidirectional=[false])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testGraphLookupWithDepthField() { + // Test graphLookup with depthField parameter + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name depthField=level as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[level]," + + " maxDepth=[0], bidirectional=[false])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testGraphLookupWithMaxDepth() { + // Test graphLookup with maxDepth parameter + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name maxDepth=3 as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[3]," + + " bidirectional=[false])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testGraphLookupBidirectional() { + // Test graphLookup with bidirectional traversal + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name direction=bi as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[0]," + + " bidirectional=[true])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Override + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); + // Add employee table for graphLookup tests + ImmutableList rows = + ImmutableList.of( + new Object[] {1, "Dev", null}, + new Object[] {2, "Eliot", "Dev"}, + new Object[] {3, "Ron", "Eliot"}, + new Object[] {4, "Andrew", "Eliot"}, + new Object[] {5, "Asya", "Ron"}, + new Object[] {6, "Dan", "Andrew"}); + schema.add("employee", new EmployeeTable(rows)); + + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(schema) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + } + + @RequiredArgsConstructor + public static class EmployeeTable implements ScannableTable { + private final ImmutableList rows; + + protected final RelProtoDataType protoRowType = + factory -> + factory + .builder() + .add("id", SqlTypeName.INTEGER) + .nullable(false) + .add("name", SqlTypeName.VARCHAR) + .nullable(false) + .add("reportsTo", SqlTypeName.VARCHAR) + .nullable(true) + .build(); + + @Override + public Enumerable<@Nullable Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + @Nullable SqlNode parent, + @Nullable CalciteConnectionConfig config) { + return false; + } + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index 9e1cfe05a4b..f7cadaaf57d 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -77,6 +77,7 @@ import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.tree.AD; import org.opensearch.sql.ast.tree.Chart; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.RareTopN.CommandType; @@ -1643,4 +1644,76 @@ public void testMvmapWithNonFieldFirstArgThrowsException() { () -> plan("source=t | eval result = mvmap(123, 123 * 10)")) .getMessage()); } + + @Test + public void testGraphLookupCommand() { + // Basic graphLookup with required parameters + assertEqual( + "source=t | graphLookup employees fromField=manager toField=name maxDepth=3" + + " as reportingHierarchy", + GraphLookup.builder() + .child(relation("t")) + .fromTable(relation("employees")) + .fromField(field("manager")) + .toField(field("name")) + .as(field("reportingHierarchy")) + .maxDepth(intLiteral(3)) + .startField(null) + .depthField(null) + .direction(GraphLookup.Direction.UNI) + .build()); + + // graphLookup with startField filter + assertEqual( + "source=t | graphLookup employees fromField=manager toField=name" + + " startField=id as reportingHierarchy", + GraphLookup.builder() + .child(relation("t")) + .fromTable(relation("employees")) + .fromField(field("manager")) + .toField(field("name")) + .as(field("reportingHierarchy")) + .maxDepth(intLiteral(0)) + .startField(field("id")) + .depthField(null) + .direction(GraphLookup.Direction.UNI) + .build()); + + // graphLookup with depthField and bidirectional + assertEqual( + "source=t | graphLookup employees fromField=manager toField=name" + + " depthField=level direction=bi as reportingHierarchy", + GraphLookup.builder() + .child(relation("t")) + .fromTable(relation("employees")) + .fromField(field("manager")) + .toField(field("name")) + .as(field("reportingHierarchy")) + .maxDepth(intLiteral(0)) + .startField(null) + .depthField(field("level")) + .direction(GraphLookup.Direction.BI) + .build()); + + // Error: missing fromField - SemanticCheckException thrown by AstBuilder + assertThrows( + SemanticCheckException.class, + () -> + plan( + "source=t | graphLookup employees toField=name startField=id as" + + " reportingHierarchy")); + + // Error: missing lookup table - SyntaxCheckException from grammar + assertThrows( + SyntaxCheckException.class, + () -> + plan( + "source=t | graphLookup fromField=manager toField=name as" + + " reportingHierarchy")); + + // Error: missing toField - SemanticCheckException thrown by AstBuilder + assertThrows( + SemanticCheckException.class, + () -> plan("source=t | graphLookup employees fromField=manager as reportingHierarchy")); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 1e200eb092b..943db6c2ba2 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -643,6 +643,60 @@ public void testLookup() { + " COUNTRY2")); } + @Test + public void testGraphLookup() { + // Basic graphLookup with required parameters + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " as reportingHierarchy")); + // graphLookup with maxDepth + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " maxDepth=*** direction=uni as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " maxDepth=3 as reportingHierarchy")); + // graphLookup with depthField + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " depthField=identifier direction=uni as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " depthField=level as reportingHierarchy")); + // graphLookup with bidirectional mode + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=bi as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " direction=bi as reportingHierarchy")); + // graphLookup with all optional parameters + assertEquals( + "source=table | graphlookup table startField=identifier fromField=identifier" + + " toField=identifier maxDepth=*** depthField=identifier direction=bi" + + " as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " startField=id maxDepth=5 depthField=level direction=bi as reportingHierarchy")); + // graphLookup with supportArray + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni supportArray=true as identifier", + anonymize( + "source=t | graphLookup airports fromField=connects toField=airport" + + " supportArray=true as reachableAirports")); + // graphLookup with batchMode + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni batchMode=true as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " batchMode=true as reportingHierarchy")); + } + @Test public void testInSubquery() { assertEquals( From a6cd0b832c6e0a357ab9e9e13751c08f78ad0655 Mon Sep 17 00:00:00 2001 From: qianheng Date: Wed, 11 Feb 2026 18:40:11 +0800 Subject: [PATCH 2/2] Support filter params in graphlookup (#5134) * Struct return array value instead of string Signed-off-by: Heng Qian * Support filter in GraphLookup Signed-off-by: Heng Qian * Fix IT Signed-off-by: Heng Qian * Add experimental tag in doc Signed-off-by: Heng Qian --------- Signed-off-by: Heng Qian --- .../opensearch/sql/ast/tree/GraphLookup.java | 6 + .../sql/calcite/CalciteRelNodeVisitor.java | 12 +- .../sql/calcite/plan/rel/GraphLookup.java | 10 +- .../calcite/plan/rel/LogicalGraphLookup.java | 18 +- docs/user/ppl/cmd/graphlookup.md | 26 +- docs/user/ppl/index.md | 4 +- .../remote/CalcitePPLGraphLookupIT.java | 225 ++++++++++++++---- .../executor/OpenSearchExecutionEngine.java | 5 +- .../rules/EnumerableGraphLookupRule.java | 3 +- .../scan/CalciteEnumerableGraphLookup.java | 31 ++- ppl/src/main/antlr/OpenSearchPPLParser.g4 | 1 + .../opensearch/sql/ppl/parser/AstBuilder.java | 5 + .../sql/ppl/utils/PPLQueryDataAnonymizer.java | 6 + .../calcite/CalcitePPLGraphLookupTest.java | 36 +++ .../ppl/utils/PPLQueryDataAnonymizerTest.java | 14 ++ 15 files changed, 339 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java b/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java index 51084d7b9b9..6771c35c43d 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java @@ -18,6 +18,7 @@ import org.opensearch.sql.ast.AbstractNodeVisitor; import org.opensearch.sql.ast.expression.Field; import org.opensearch.sql.ast.expression.Literal; +import org.opensearch.sql.ast.expression.UnresolvedExpression; /** * AST node for graphLookup command. Performs BFS graph traversal on a lookup table. @@ -74,6 +75,11 @@ public enum Direction { /** Whether to use PIT (Point In Time) search for the lookup table to get complete results. */ private final boolean usePIT; + /** + * Optional filter condition to restrict which lookup table documents participate in traversal. + */ + private @Nullable final UnresolvedExpression filter; + private UnresolvedPlan child; public String getDepthFieldName() { diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index c9227781f6a..9a16f2bf802 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -2606,9 +2606,16 @@ public RelNode visitGraphLookup(GraphLookup node, CalcitePlanContext context) { // 3. Visit and materialize lookup table analyze(node.getFromTable(), context); tryToRemoveMetaFields(context, true); + + // 4. Convert filter expression to RexNode against lookup table schema + RexNode filterRex = null; + if (node.getFilter() != null) { + filterRex = rexVisitor.analyze(node.getFilter(), context); + } + RelNode lookupTable = builder.build(); - // 4. Create LogicalGraphLookup RelNode + // 5. Create LogicalGraphLookup RelNode // The conversion rule will extract the OpenSearchIndex from the lookup table RelNode graphLookup = LogicalGraphLookup.create( @@ -2623,7 +2630,8 @@ public RelNode visitGraphLookup(GraphLookup node, CalcitePlanContext context) { bidirectional, supportArray, batchMode, - usePIT); + usePIT, + filterRex); builder.push(graphLookup); return builder.peek(); diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java index ef7134a0162..02ed97faf0c 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java @@ -16,6 +16,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.type.SqlTypeName; /** @@ -51,6 +52,7 @@ public abstract class GraphLookup extends BiRel { protected final boolean supportArray; protected final boolean batchMode; protected final boolean usePIT; + @Nullable protected final RexNode filter; private RelDataType outputRowType; @@ -72,6 +74,7 @@ public abstract class GraphLookup extends BiRel { * pushdown) * @param batchMode Whether to batch all source start values into a single unified BFS * @param usePIT Whether to use PIT (Point In Time) search for complete results + * @param filter Optional filter condition for lookup table documents */ protected GraphLookup( RelOptCluster cluster, @@ -87,7 +90,8 @@ protected GraphLookup( boolean bidirectional, boolean supportArray, boolean batchMode, - boolean usePIT) { + boolean usePIT, + @Nullable RexNode filter) { super(cluster, traitSet, source, lookup); this.startField = startField; this.fromField = fromField; @@ -99,6 +103,7 @@ protected GraphLookup( this.supportArray = supportArray; this.batchMode = batchMode; this.usePIT = usePIT; + this.filter = filter; } /** Returns the source table RelNode. */ @@ -181,6 +186,7 @@ public RelWriter explainTerms(RelWriter pw) { .item("bidirectional", bidirectional) .itemIf("supportArray", supportArray, supportArray) .itemIf("batchMode", batchMode, batchMode) - .itemIf("usePIT", usePIT, usePIT); + .itemIf("usePIT", usePIT, usePIT) + .itemIf("filter", filter, filter != null); } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java index b02bbec3742..94db3689f8c 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java @@ -12,6 +12,7 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; /** * Logical RelNode for graphLookup command. TODO: need to support trim fields and several transpose @@ -37,6 +38,7 @@ public class LogicalGraphLookup extends GraphLookup { * @param supportArray Whether to support array-typed fields * @param batchMode Whether to batch all source start values into a single unified BFS * @param usePIT Whether to use PIT (Point In Time) search for complete results + * @param filter Optional filter condition for lookup table documents */ protected LogicalGraphLookup( RelOptCluster cluster, @@ -52,7 +54,8 @@ protected LogicalGraphLookup( boolean bidirectional, boolean supportArray, boolean batchMode, - boolean usePIT) { + boolean usePIT, + @Nullable RexNode filter) { super( cluster, traitSet, @@ -67,7 +70,8 @@ protected LogicalGraphLookup( bidirectional, supportArray, batchMode, - usePIT); + usePIT, + filter); } /** @@ -85,6 +89,7 @@ protected LogicalGraphLookup( * @param supportArray Whether to support array-typed fields * @param batchMode Whether to batch all source start values into a single unified BFS * @param usePIT Whether to use PIT (Point In Time) search for complete results + * @param filter Optional filter condition for lookup table documents * @return A new LogicalGraphLookup instance */ public static LogicalGraphLookup create( @@ -99,7 +104,8 @@ public static LogicalGraphLookup create( boolean bidirectional, boolean supportArray, boolean batchMode, - boolean usePIT) { + boolean usePIT, + @Nullable RexNode filter) { RelOptCluster cluster = source.getCluster(); RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE); return new LogicalGraphLookup( @@ -116,7 +122,8 @@ public static LogicalGraphLookup create( bidirectional, supportArray, batchMode, - usePIT); + usePIT, + filter); } @Override @@ -135,6 +142,7 @@ public RelNode copy(RelTraitSet traitSet, List inputs) { bidirectional, supportArray, batchMode, - usePIT); + usePIT, + filter); } } diff --git a/docs/user/ppl/cmd/graphlookup.md b/docs/user/ppl/cmd/graphlookup.md index e768e02f6b8..2d6220edae4 100644 --- a/docs/user/ppl/cmd/graphlookup.md +++ b/docs/user/ppl/cmd/graphlookup.md @@ -1,5 +1,5 @@ -# graphLookup +# graphLookup (Experimental) The `graphLookup` command performs recursive graph traversal on a collection using a breadth-first search (BFS) algorithm. It searches for documents matching a start value and recursively traverses connections between documents based on specified fields. This is useful for hierarchical data like organizational charts, social networks, or routing graphs. @@ -8,7 +8,7 @@ The `graphLookup` command performs recursive graph traversal on a collection usi The `graphLookup` command has the following syntax: ```syntax -graphLookup startField= fromField= toField= [maxDepth=] [depthField=] [direction=(uni | bi)] [supportArray=(true | false)] [batchMode=(true | false)] [usePIT=(true | false)] as +graphLookup startField= fromField= toField= [maxDepth=] [depthField=] [direction=(uni | bi)] [supportArray=(true | false)] [batchMode=(true | false)] [usePIT=(true | false)] [filter=()] as ``` The following are examples of the `graphLookup` command syntax: @@ -20,6 +20,7 @@ source = employees | graphLookup employees startField=reportsTo fromField=report source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name direction=bi as connections source = travelers | graphLookup airports startField=nearestAirport fromField=connects toField=airport supportArray=true as reachableAirports source = airports | graphLookup airports startField=airport fromField=connects toField=airport supportArray=true as reachableAirports +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name filter=(status = 'active' AND age > 18) as reportingHierarchy ``` ## Parameters @@ -38,6 +39,7 @@ The `graphLookup` command supports the following parameters. | `supportArray=(true \| false)` | Optional | When `true`, disables early visited-node filter pushdown to OpenSearch. Default is `false`. Set to `true` when `fromField` or `toField` contains array values to ensure correct traversal behavior. See [Array Field Handling](#array-field-handling) for details. | | `batchMode=(true \| false)` | Optional | When `true`, collects all start values from all source rows and performs a single unified BFS traversal. Default is `false`. The output changes to two arrays: `[Array, Array]`. See [Batch Mode](#batch-mode) for details. | | `usePIT=(true \| false)` | Optional | When `true`, enables PIT (Point In Time) search for the lookup table, allowing paginated retrieval of complete results without the `max_result_window` size limit. Default is `false`. See [PIT Search](#pit-search) for details. | +| `filter=()` | Optional | A filter condition to restrict which lookup table documents participate in the graph traversal. Only documents matching the condition are considered as candidates during BFS. Parentheses around the condition are required. Example: `filter=(status = 'active' AND age > 18)`. | | `as ` | Required | The name of the output array field that will contain all documents found during the graph traversal. | ## How It Works @@ -329,6 +331,26 @@ source = employees as reportingHierarchy ``` +## Filtered Graph Traversal + +The `filter` parameter restricts which documents in the lookup table are considered during the BFS traversal. Only documents matching the filter condition participate as candidates at each traversal level. + +### Example + +The following query traverses only active employees in the reporting hierarchy: + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + filter=(status = 'active') + as reportingHierarchy +``` + +The filter is applied at the OpenSearch query level, so it combines efficiently with the BFS traversal queries. At each BFS level, the query sent to OpenSearch is effectively: `bool { filter: [user_filter, bfs_terms_query] }`. + ## Limitations - The source input, which provides the starting point for the traversal, has a limitation of 100 documents to avoid performance issues. diff --git a/docs/user/ppl/index.md b/docs/user/ppl/index.md index 12afe96eea0..718aa51f0fe 100644 --- a/docs/user/ppl/index.md +++ b/docs/user/ppl/index.md @@ -82,6 +82,8 @@ source=accounts | [addcoltotals command](cmd/addcoltotals.md) | 3.5 | stable (since 3.5) | Adds column values and appends a totals row. | | [transpose command](cmd/transpose.md) | 3.5 | stable (since 3.5) | Transpose rows to columns. | | [mvcombine command](cmd/mvcombine.md) | 3.5 | stable (since 3.4) | Combines values of a specified field across rows identical on all other fields. | +| [graphlookup command](cmd/graphlookup.md) | 3.5 | experimental (since 3.5) | Performs recursive graph traversal on a collection using a BFS algorithm.| + - [Syntax](cmd/syntax.md) - PPL query structure and command syntax formatting * **Functions** @@ -101,4 +103,4 @@ source=accounts * **Optimization** - [Optimization](../../user/optimization/optimization.rst) * **Limitations** - - [Limitations](limitations/limitations.md) \ No newline at end of file + - [Limitations](limitations/limitations.md) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java index 498b17dab91..fbaefdb8c3c 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java @@ -14,6 +14,7 @@ import static org.opensearch.sql.util.MatcherUtils.verifySchema; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import org.json.JSONObject; @@ -72,12 +73,12 @@ public void testEmployeeHierarchyBasicTraversal() throws IOException { schema("reportingHierarchy", "array")); verifyDataRows( result, - rows("Dev", "Eliot", 1, List.of("{Eliot, Ron, 2}")), - rows("Eliot", "Ron", 2, List.of("{Ron, Andrew, 3}")), - rows("Ron", "Andrew", 3, List.of("{Andrew, null, 4}")), + rows("Dev", "Eliot", 1, List.of(List.of("Eliot", "Ron", 2))), + rows("Eliot", "Ron", 2, List.of(List.of("Ron", "Andrew", 3))), + rows("Ron", "Andrew", 3, List.of(Arrays.asList("Andrew", null, 4))), rows("Andrew", null, 4, Collections.emptyList()), - rows("Asya", "Ron", 5, List.of("{Ron, Andrew, 3}")), - rows("Dan", "Andrew", 6, List.of("{Andrew, null, 4}"))); + rows("Asya", "Ron", 5, List.of(List.of("Ron", "Andrew", 3))), + rows("Dan", "Andrew", 6, List.of(Arrays.asList("Andrew", null, 4)))); } /** Test 2: Employee hierarchy traversal with depth field. */ @@ -103,12 +104,12 @@ public void testEmployeeHierarchyWithDepthField() throws IOException { schema("reportingHierarchy", "array")); verifyDataRows( result, - rows("Dev", "Eliot", 1, List.of("{Eliot, Ron, 2, 0}")), - rows("Eliot", "Ron", 2, List.of("{Ron, Andrew, 3, 0}")), - rows("Ron", "Andrew", 3, List.of("{Andrew, null, 4, 0}")), + rows("Dev", "Eliot", 1, List.of(List.of("Eliot", "Ron", 2, 0))), + rows("Eliot", "Ron", 2, List.of(List.of("Ron", "Andrew", 3, 0))), + rows("Ron", "Andrew", 3, List.of(Arrays.asList("Andrew", null, 4, 0))), rows("Andrew", null, 4, Collections.emptyList()), - rows("Asya", "Ron", 5, List.of("{Ron, Andrew, 3, 0}")), - rows("Dan", "Andrew", 6, List.of("{Andrew, null, 4, 0}"))); + rows("Asya", "Ron", 5, List.of(List.of("Ron", "Andrew", 3, 0))), + rows("Dan", "Andrew", 6, List.of(Arrays.asList("Andrew", null, 4, 0)))); } /** Test 3: Employee hierarchy with maxDepth=1 (allows 2 levels of traversal). */ @@ -134,12 +135,20 @@ public void testEmployeeHierarchyWithMaxDepth() throws IOException { schema("reportingHierarchy", "array")); verifyDataRows( result, - rows("Dev", "Eliot", 1, List.of("{Eliot, Ron, 2}", "{Ron, Andrew, 3}")), - rows("Eliot", "Ron", 2, List.of("{Ron, Andrew, 3}", "{Andrew, null, 4}")), - rows("Ron", "Andrew", 3, List.of("{Andrew, null, 4}")), + rows("Dev", "Eliot", 1, List.of(List.of("Eliot", "Ron", 2), List.of("Ron", "Andrew", 3))), + rows( + "Eliot", + "Ron", + 2, + List.of(List.of("Ron", "Andrew", 3), Arrays.asList("Andrew", null, 4))), + rows("Ron", "Andrew", 3, List.of(Arrays.asList("Andrew", null, 4))), rows("Andrew", null, 4, Collections.emptyList()), - rows("Asya", "Ron", 5, List.of("{Ron, Andrew, 3}", "{Andrew, null, 4}")), - rows("Dan", "Andrew", 6, List.of("{Andrew, null, 4}"))); + rows( + "Asya", + "Ron", + 5, + List.of(List.of("Ron", "Andrew", 3), Arrays.asList("Andrew", null, 4))), + rows("Dan", "Andrew", 6, List.of(Arrays.asList("Andrew", null, 4)))); } /** Test 4: Query Dev's complete reporting chain: Dev->Eliot->Ron->Andrew */ @@ -163,7 +172,7 @@ public void testEmployeeHierarchyForSpecificEmployee() throws IOException { schema("reportsTo", "string"), schema("id", "int"), schema("reportingHierarchy", "array")); - verifyDataRows(result, rows("Dev", "Eliot", 1, List.of("{Eliot, Ron, 2}"))); + verifyDataRows(result, rows("Dev", "Eliot", 1, List.of(List.of("Eliot", "Ron", 2)))); } // ==================== Airport Connections Tests ==================== @@ -190,11 +199,11 @@ public void testAirportConnections() throws IOException { schema("reachableAirports", "array")); verifyDataRows( result, - rows("JFK", List.of("BOS", "ORD"), List.of("{JFK, [BOS, ORD]}")), - rows("BOS", List.of("JFK", "PWM"), List.of("{BOS, [JFK, PWM]}")), - rows("ORD", List.of("JFK"), List.of("{ORD, [JFK]}")), - rows("PWM", List.of("BOS", "LHR"), List.of("{PWM, [BOS, LHR]}")), - rows("LHR", List.of("PWM"), List.of("{LHR, [PWM]}"))); + rows("JFK", List.of("BOS", "ORD"), List.of(List.of("JFK", List.of("BOS", "ORD")))), + rows("BOS", List.of("JFK", "PWM"), List.of(List.of("BOS", List.of("JFK", "PWM")))), + rows("ORD", List.of("JFK"), List.of(List.of("ORD", List.of("JFK")))), + rows("PWM", List.of("BOS", "LHR"), List.of(List.of("PWM", List.of("BOS", "LHR")))), + rows("LHR", List.of("PWM"), List.of(List.of("LHR", List.of("PWM"))))); } /** Test 6: Find airports reachable from JFK within maxDepth=1. */ @@ -221,7 +230,10 @@ public void testAirportConnectionsWithMaxDepth() throws IOException { schema("reachableAirports", "array")); verifyDataRows( result, - rows("JFK", List.of("BOS", "ORD"), List.of("{JFK, [BOS, ORD]}", "{BOS, [JFK, PWM]}"))); + rows( + "JFK", + List.of("BOS", "ORD"), + List.of(List.of("JFK", List.of("BOS", "ORD")), List.of("BOS", List.of("JFK", "PWM"))))); } /** Test 7: Find airports with default depth(=0) and start value of list */ @@ -244,7 +256,9 @@ public void testAirportConnectionsWithDepthField() throws IOException { schema("airport", "string"), schema("connects", "string"), schema("reachableAirports", "array")); - verifyDataRows(result, rows("JFK", List.of("BOS", "ORD"), List.of("{BOS, [JFK, PWM], 0}"))); + verifyDataRows( + result, + rows("JFK", List.of("BOS", "ORD"), List.of(List.of("BOS", List.of("JFK", "PWM"), 0)))); } /** @@ -271,9 +285,9 @@ public void testTravelersReachableAirports() throws IOException { schema("reachableAirports", "array")); verifyDataRows( result, - rows("Dev", "JFK", List.of("{JFK, [BOS, ORD]}")), - rows("Eliot", "JFK", List.of("{JFK, [BOS, ORD]}")), - rows("Jeff", "BOS", List.of("{BOS, [JFK, PWM]}"))); + rows("Dev", "JFK", List.of(List.of("JFK", List.of("BOS", "ORD")))), + rows("Eliot", "JFK", List.of(List.of("JFK", List.of("BOS", "ORD")))), + rows("Jeff", "BOS", List.of(List.of("BOS", List.of("JFK", "PWM"))))); } /** @@ -300,7 +314,7 @@ public void testTravelerReachableAirportsWithDepthField() throws IOException { schema("name", "string"), schema("nearestAirport", "string"), schema("reachableAirports", "array")); - verifyDataRows(result, rows("Dev", "JFK", List.of("{JFK, [BOS, ORD], 0}"))); + verifyDataRows(result, rows("Dev", "JFK", List.of(List.of("JFK", List.of("BOS", "ORD"), 0)))); } /** @@ -331,7 +345,12 @@ public void testTravelerReachableAirportsWithMaxDepth() throws IOException { verifyDataRows( result, rows( - "Jeff", "BOS", List.of("{BOS, [JFK, PWM]}", "{JFK, [BOS, ORD]}", "{PWM, [BOS, LHR]}"))); + "Jeff", + "BOS", + List.of( + List.of("BOS", List.of("JFK", "PWM")), + List.of("JFK", List.of("BOS", "ORD")), + List.of("PWM", List.of("BOS", "LHR"))))); } // ==================== Bidirectional Traversal Tests ==================== @@ -364,7 +383,10 @@ public void testBidirectionalEmployeeHierarchy() throws IOException { "Ron", "Andrew", 3, - List.of("{Ron, Andrew, 3}", "{Andrew, null, 4}", "{Dan, Andrew, 6}"))); + List.of( + List.of("Ron", "Andrew", 3), + Arrays.asList("Andrew", null, 4), + List.of("Dan", "Andrew", 6)))); } /** @@ -392,7 +414,117 @@ public void testBidirectionalAirportConnections() throws IOException { schema("connects", "string"), schema("allConnections", "array")); verifyDataRows( - result, rows("ORD", List.of("JFK"), List.of("{JFK, [BOS, ORD]}", "{BOS, [JFK, PWM]}"))); + result, + rows( + "ORD", + List.of("JFK"), + List.of(List.of("JFK", List.of("BOS", "ORD")), List.of("BOS", List.of("JFK", "PWM"))))); + } + + // ==================== Filter Tests ==================== + + /** + * Test: Filter employee hierarchy by id. Only lookup documents with id > 3 (Andrew=4, Asya=5, + * Dan=6) participate in traversal. Dev starts with reportsTo=Eliot, but Eliot (id=2) is excluded + * by filter, so Dev gets empty results. + */ + @Test + public void testEmployeeHierarchyWithFilter() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " filter=(id > 3)" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + // Only documents with id > 3 (Andrew=4, Asya=5, Dan=6) are in lookup table + // Dev: reportsTo=Eliot -> Eliot(id=2) is filtered out -> empty + // Eliot: reportsTo=Ron -> Ron(id=3) is filtered out -> empty + // Ron: reportsTo=Andrew -> Andrew(id=4) passes filter -> [{Andrew, null, 4}] + // Andrew: reportsTo=null -> empty + // Asya: reportsTo=Ron -> Ron(id=3) is filtered out -> empty + // Dan: reportsTo=Andrew -> Andrew(id=4) passes filter -> [{Andrew, null, 4}] + verifyDataRows( + result, + rows("Dev", "Eliot", 1, Collections.emptyList()), + rows("Eliot", "Ron", 2, Collections.emptyList()), + rows("Ron", "Andrew", 3, List.of(Arrays.asList("Andrew", null, 4))), + rows("Andrew", null, 4, Collections.emptyList()), + rows("Asya", "Ron", 5, Collections.emptyList()), + rows("Dan", "Andrew", 6, List.of(Arrays.asList("Andrew", null, 4)))); + } + + /** + * Test: Filter employee hierarchy with keyword match. Only employees whose name is NOT 'Andrew' + * participate in traversal. + */ + @Test + public void testEmployeeHierarchyWithKeywordFilter() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Ron'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " filter=(name != 'Andrew')" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + // Ron: reportsTo=Andrew -> Andrew is filtered out by name != 'Andrew' -> empty + verifyDataRows(result, rows("Ron", "Andrew", 3, Collections.emptyList())); + } + + /** + * Test: Filter with maxDepth combined. Dev traverses reporting chain but only considers lookup + * documents with id <= 3. + */ + @Test + public void testEmployeeHierarchyWithFilterAndMaxDepth() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Dev'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " maxDepth=3" + + " filter=(id <= 3)" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + // Dev: reportsTo=Eliot -> Eliot(id=2) passes -> then Eliot.reportsTo=Ron -> Ron(id=3) passes + // -> then Ron.reportsTo=Andrew -> Andrew(id=4) is filtered out -> stops + verifyDataRows( + result, + rows("Dev", "Eliot", 1, List.of(List.of("Eliot", "Ron", 2), List.of("Ron", "Andrew", 3)))); } // ==================== Edge Cases ==================== @@ -489,12 +621,12 @@ public void testGraphLookupWithFieldsProjection() throws IOException { verifySchema(result, schema("name", "string"), schema("reportingHierarchy", "array")); verifyDataRows( result, - rows("Dev", List.of("{Eliot, Ron, 2}")), - rows("Eliot", List.of("{Ron, Andrew, 3}")), - rows("Ron", List.of("{Andrew, null, 4}")), + rows("Dev", List.of(List.of("Eliot", "Ron", 2))), + rows("Eliot", List.of(List.of("Ron", "Andrew", 3))), + rows("Ron", List.of(Arrays.asList("Andrew", null, 4))), rows("Andrew", Collections.emptyList()), - rows("Asya", List.of("{Ron, Andrew, 3}")), - rows("Dan", List.of("{Andrew, null, 4}"))); + rows("Asya", List.of(List.of("Ron", "Andrew", 3))), + rows("Dan", List.of(Arrays.asList("Andrew", null, 4)))); } // ==================== Batch Mode Tests ==================== @@ -527,8 +659,8 @@ public void testBatchModeEmployeeHierarchy() throws IOException { verifyDataRows( result, rows( - List.of("{Dev, Eliot, 1}", "{Asya, Ron, 5}"), - List.of("{Ron, Andrew, 3, 0}", "{Andrew, null, 4, 1}"))); + List.of(List.of("Dev", "Eliot", 1), List.of("Asya", "Ron", 5)), + List.of(List.of("Ron", "Andrew", 3, 0), Arrays.asList("Andrew", null, 4, 1)))); } /** @@ -560,8 +692,11 @@ public void testBatchModeTravelersAirports() throws IOException { verifyDataRows( result, rows( - List.of("{Dev, JFK}", "{Eliot, JFK}", "{Jeff, BOS}"), - List.of("{JFK, [BOS, ORD], 0}", "{BOS, [JFK, PWM], 0}", "{PWM, [BOS, LHR], 1}"))); + List.of(List.of("Dev", "JFK"), List.of("Eliot", "JFK"), List.of("Jeff", "BOS")), + List.of( + List.of("JFK", List.of("BOS", "ORD"), 0), + List.of("BOS", List.of("JFK", "PWM"), 0), + List.of("PWM", List.of("BOS", "LHR"), 1)))); } /** @@ -592,12 +727,12 @@ public void testBatchModeBidirectional() throws IOException { verifyDataRows( result, rows( - List.of("{Dev, Eliot, 1}", "{Dan, Andrew, 6}"), + List.of(List.of("Dev", "Eliot", 1), List.of("Dan", "Andrew", 6)), List.of( - "{Dev, Eliot, 1, 0}", - "{Eliot, Ron, 2, 0}", - "{Andrew, null, 4, 0}", - "{Dan, Andrew, 6, 0}", - "{Asya, Ron, 5, 1}"))); + List.of("Dev", "Eliot", 1, 0), + List.of("Eliot", "Ron", 2, 0), + Arrays.asList("Andrew", null, 4, 0), + List.of("Dan", "Andrew", 6, 0), + List.of("Asya", "Ron", 5, 1)))); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index 58d797f4bf9..1f0f6d3fabf 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -11,6 +11,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -230,7 +231,7 @@ public void execute( * Process values recursively, handling geo points and nested maps. Geo points are converted to * OpenSearchExprGeoPointValue. Maps are recursively processed to handle nested structures. */ - private static Object processValue(Object value) { + private static Object processValue(Object value) throws SQLException { if (value == null) { return null; } @@ -247,7 +248,7 @@ private static Object processValue(Object value) { return convertedMap; } if (value instanceof StructImpl) { - return ((StructImpl) value).toString(); + return Arrays.asList(((StructImpl) value).getAttributes()); } if (value instanceof List) { List list = (List) value; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java index f76c90ab47d..e210095b480 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java @@ -101,6 +101,7 @@ public RelNode convert(RelNode rel) { graphLookup.isBidirectional(), graphLookup.isSupportArray(), graphLookup.isBatchMode(), - graphLookup.isUsePIT()); + graphLookup.isUsePIT(), + graphLookup.getFilter()); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java index 2cd0f4a746f..6307a741468 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java @@ -14,6 +14,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.Set; import lombok.Getter; @@ -32,6 +33,7 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.checkerframework.checker.nullness.qual.Nullable; @@ -39,6 +41,8 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.sql.calcite.plan.Scannable; import org.opensearch.sql.calcite.plan.rel.GraphLookup; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.opensearch.request.PredicateAnalyzer; import org.opensearch.sql.opensearch.request.PredicateAnalyzer.NamedFieldExpression; import org.opensearch.sql.opensearch.storage.scan.context.LimitDigest; import org.opensearch.sql.opensearch.storage.scan.context.OSRequestBuilderAction; @@ -74,6 +78,7 @@ public class CalciteEnumerableGraphLookup extends GraphLookup implements Enumera * @param supportArray Whether to support array-typed fields * @param batchMode Whether to batch all source start values into a single unified BFS * @param usePIT Whether to use PIT (Point In Time) search for complete results + * @param filter Optional filter condition for lookup table documents */ public CalciteEnumerableGraphLookup( RelOptCluster cluster, @@ -89,7 +94,8 @@ public CalciteEnumerableGraphLookup( boolean bidirectional, boolean supportArray, boolean batchMode, - boolean usePIT) { + boolean usePIT, + @Nullable RexNode filter) { super( cluster, traitSet, @@ -104,7 +110,8 @@ public CalciteEnumerableGraphLookup( bidirectional, supportArray, batchMode, - usePIT); + usePIT, + filter); } @Override @@ -123,7 +130,8 @@ public RelNode copy(RelTraitSet traitSet, List inputs) { bidirectional, supportArray, batchMode, - usePIT); + usePIT, + filter); } @Override @@ -209,6 +217,23 @@ private static class GraphLookupEnumerator implements Enumerator<@Nullable Objec this.startFieldIndex = sourceFields.indexOf(graphLookup.getStartField()); this.fromFieldIdx = lookupFields.indexOf(graphLookup.fromField); this.toFieldIdx = lookupFields.indexOf(graphLookup.toField); + + // Push down user-specified filter to the lookup scan + if (graphLookup.filter != null) { + List schema = graphLookup.getLookup().getRowType().getFieldNames(); + Map fieldTypes = this.lookupScan.getOsIndex().getAllFieldTypes(); + try { + QueryBuilder filterQuery = + PredicateAnalyzer.analyze(graphLookup.filter, schema, fieldTypes); + this.lookupScan.pushDownContext.add( + PushDownType.FILTER, + null, + (OSRequestBuilderAction) rb -> rb.pushDownFilterForCalcite(filterQuery)); + } catch (PredicateAnalyzer.ExpressionNotAnalyzableException e) { + throw new RuntimeException( + "Cannot push down filter for graphLookup: " + e.getMessage(), e); + } + } } @Override diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index d860874a1e9..7b9e1a0b5f4 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -641,6 +641,7 @@ graphLookupOption | (SUPPORT_ARRAY EQUAL booleanLiteral) | (BATCH_MODE EQUAL booleanLiteral) | (USE_PIT EQUAL booleanLiteral) + | (FILTER EQUAL LT_PRTHS logicalExpression RT_PRTHS) ; // clauses diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 721f13033f3..9c367d73985 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -1499,6 +1499,7 @@ public UnresolvedPlan visitGraphLookupCommand(OpenSearchPPLParser.GraphLookupCom boolean supportArray = false; boolean batchMode = false; boolean usePIT = false; + UnresolvedExpression filter = null; for (OpenSearchPPLParser.GraphLookupOptionContext option : ctx.graphLookupOption()) { if (option.FROM_FIELD() != null) { @@ -1531,6 +1532,9 @@ public UnresolvedPlan visitGraphLookupCommand(OpenSearchPPLParser.GraphLookupCom Literal literal = (Literal) internalVisitExpression(option.booleanLiteral()); usePIT = Boolean.TRUE.equals(literal.getValue()); } + if (option.FILTER() != null) { + filter = internalVisitExpression(option.logicalExpression()); + } } Field as = (Field) internalVisitExpression(ctx.outputField); @@ -1551,6 +1555,7 @@ public UnresolvedPlan visitGraphLookupCommand(OpenSearchPPLParser.GraphLookupCom .supportArray(supportArray) .batchMode(batchMode) .usePIT(usePIT) + .filter(filter) .build(); } } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 2402fac8700..90f4ce92724 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -251,6 +251,12 @@ public String visitGraphLookup(GraphLookup node, String context) { if (node.isUsePIT()) { command.append(" usePIT=true"); } + if (node.getFilter() != null) { + command + .append(" filter=(") + .append(expressionAnalyzer.analyze(node.getFilter(), context)) + .append(")"); + } command.append(" as ").append(MASK_COLUMN); return command.toString(); } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java index da169704925..2790f3aadd5 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java @@ -93,6 +93,42 @@ public void testGraphLookupWithMaxDepth() { verifyLogical(root, expectedLogical); } + @Test + public void testGraphLookupWithFilter() { + // Test graphLookup with filter parameter + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name filter=(id > 2) as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[0]," + + " bidirectional=[false], filter=[>($0, 2)])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testGraphLookupWithCompoundFilter() { + // Test graphLookup with compound filter condition + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name filter=(id > 1 AND name != 'Andrew') as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[0]," + + " bidirectional=[false], filter=[AND(>($0, 1), <>($1, 'Andrew'))])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + @Test public void testGraphLookupBidirectional() { // Test graphLookup with bidirectional traversal diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 943db6c2ba2..0398d30bf17 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -695,6 +695,20 @@ public void testGraphLookup() { anonymize( "source=t | graphLookup employees fromField=manager toField=name" + " batchMode=true as reportingHierarchy")); + // graphLookup with filter + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni filter=(identifier = ***) as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " filter=(status = 'active') as reportingHierarchy")); + // graphLookup with compound filter + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni filter=(identifier = *** and identifier > ***) as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " filter=(status = 'active' AND id > 2) as reportingHierarchy")); } @Test