Example: source=employees | graphLookup employees fromField=manager toField=name maxDepth=3
+ * depthField=level direction=uni as hierarchy
+ */
+@Getter
+@Setter
+@ToString
+@EqualsAndHashCode(callSuper = false)
+@RequiredArgsConstructor
+@AllArgsConstructor
+@Builder(toBuilder = true)
+public class GraphLookup extends UnresolvedPlan {
+ /** Direction mode for graph traversal. */
+ public enum Direction {
+ /** Unidirectional - traverse edges in one direction only. */
+ UNI,
+ /** Bidirectional - traverse edges in both directions. */
+ BI
+ }
+
+ /** Target table for graph traversal lookup. */
+ private final UnresolvedPlan fromTable;
+
+ /** Field in sourceTable to start with. */
+ private final Field startField;
+
+ /** Field in fromTable that represents the outgoing edge. */
+ private final Field fromField;
+
+ /** Field in input/fromTable to match against for traversal. */
+ private final Field toField;
+
+ /** Output field name for collected traversal results. */
+ private final Field as;
+
+ /** Maximum traversal depth. Zero means no limit. */
+ private final Literal maxDepth;
+
+ /** Optional field name to include recursion depth in output. */
+ private @Nullable final Field depthField;
+
+ /** Direction mode: UNI (default) or BIO for bidirectional. */
+ private final Direction direction;
+
+ /** Whether to support array-typed fields without early filter pushdown. */
+ private final boolean supportArray;
+
+ /** Whether to batch all source start values into a single unified BFS traversal. */
+ private final boolean batchMode;
+
+ /** Whether to use PIT (Point In Time) search for the lookup table to get complete results. */
+ private final boolean usePIT;
+
+ /**
+ * Optional filter condition to restrict which lookup table documents participate in traversal.
+ */
+ private @Nullable final UnresolvedExpression filter;
+
+ private UnresolvedPlan child;
+
+ public String getDepthFieldName() {
+ return depthField == null ? null : depthField.getField().toString();
+ }
+
+ @Override
+ public UnresolvedPlan attach(UnresolvedPlan child) {
+ this.child = child;
+ return this;
+ }
+
+ @Override
+ public List getChild() {
+ return child == null ? ImmutableList.of() : ImmutableList.of(child);
+ }
+
+ @Override
+ public T accept(AbstractNodeVisitor visitor, C context) {
+ return visitor.visitGraphLookup(this, context);
+ }
+}
diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java
index 68a700b66b..8854c80ae6 100644
--- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java
+++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java
@@ -118,6 +118,8 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Flatten;
+import org.opensearch.sql.ast.tree.GraphLookup;
+import org.opensearch.sql.ast.tree.GraphLookup.Direction;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Kmeans;
@@ -151,6 +153,7 @@
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
+import org.opensearch.sql.calcite.plan.rel.LogicalGraphLookup;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.utils.BinUtils;
@@ -2524,6 +2527,67 @@ public RelNode visitAddColTotals(AddColTotals node, CalcitePlanContext context)
context, fieldsToAggregate, false, true, null, labelField, label);
}
+ @Override
+ public RelNode visitGraphLookup(GraphLookup node, CalcitePlanContext context) {
+ // 1. Visit source (child) table
+ visitChildren(node, context);
+ RelBuilder builder = context.relBuilder;
+ // TODO: Limit the number of source rows to 100 for now, make it configurable.
+ builder.limit(0, 100);
+ if (node.isBatchMode()) {
+ tryToRemoveMetaFields(context, true);
+ }
+ RelNode sourceTable = builder.build();
+
+ // 2. Extract parameters
+ String startFieldName = node.getStartField().getField().toString();
+ String fromFieldName = node.getFromField().getField().toString();
+ String toFieldName = node.getToField().getField().toString();
+ String outputFieldName = node.getAs().getField().toString();
+ String depthFieldName = node.getDepthFieldName();
+ boolean bidirectional = node.getDirection() == Direction.BI;
+
+ RexLiteral maxDepthNode = (RexLiteral) rexVisitor.analyze(node.getMaxDepth(), context);
+ Integer maxDepthValue = maxDepthNode.getValueAs(Integer.class);
+ maxDepthValue = maxDepthValue == null ? 0 : maxDepthValue;
+ boolean supportArray = node.isSupportArray();
+ boolean batchMode = node.isBatchMode();
+ boolean usePIT = node.isUsePIT();
+
+ // 3. Visit and materialize lookup table
+ analyze(node.getFromTable(), context);
+ tryToRemoveMetaFields(context, true);
+
+ // 4. Convert filter expression to RexNode against lookup table schema
+ RexNode filterRex = null;
+ if (node.getFilter() != null) {
+ filterRex = rexVisitor.analyze(node.getFilter(), context);
+ }
+
+ RelNode lookupTable = builder.build();
+
+ // 5. Create LogicalGraphLookup RelNode
+ // The conversion rule will extract the OpenSearchIndex from the lookup table
+ RelNode graphLookup =
+ LogicalGraphLookup.create(
+ sourceTable,
+ lookupTable,
+ startFieldName,
+ fromFieldName,
+ toFieldName,
+ outputFieldName,
+ depthFieldName,
+ maxDepthValue,
+ bidirectional,
+ supportArray,
+ batchMode,
+ usePIT,
+ filterRex);
+
+ builder.push(graphLookup);
+ return builder.peek();
+ }
+
/**
* Cast integer sum to long, real/float to double to avoid ClassCastException
*
diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java
new file mode 100644
index 0000000000..02ed97faf0
--- /dev/null
+++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.calcite.plan.rel;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import lombok.Getter;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Abstract RelNode for graphLookup command.
+ *
+ *
Has two inputs:
+ *
+ *
+ *
source: source table (rows to start BFS from)
+ *
lookup: lookup table (graph edges to traverse)
+ *
+ *
+ *
At execution time, performs BFS by dynamically querying OpenSearch with filter pushdown
+ * instead of loading all lookup data into memory.
+ *
+ *
This is a storage-agnostic logical node. Storage-specific implementations (e.g., for
+ * OpenSearch) should extract the necessary index information from the lookup RelNode during
+ * conversion to the physical plan.
+ */
+@Getter
+public abstract class GraphLookup extends BiRel {
+
+ // TODO: use RexInputRef instead of String for there fields
+ protected final String startField; // Field in source table (start entities)
+ protected final String fromField; // Field in lookup table (edge source)
+ protected final String toField; // Field in lookup table (edge target)
+ protected final String outputField; // Name of output array field
+ @Nullable protected final String depthField; // Name of output array field
+
+ // TODO: add limitation on the maxDepth and input rows count
+ protected final int maxDepth; // -1 = unlimited
+ protected final boolean bidirectional;
+ protected final boolean supportArray;
+ protected final boolean batchMode;
+ protected final boolean usePIT;
+ @Nullable protected final RexNode filter;
+
+ private RelDataType outputRowType;
+
+ /**
+ * Creates a LogicalGraphLookup.
+ *
+ * @param cluster Cluster
+ * @param traitSet Trait set
+ * @param source Source table RelNode
+ * @param lookup Lookup table RelNode
+ * @param startField Field name for start entities
+ * @param fromField Field name for outgoing edges
+ * @param toField Field name for incoming edges
+ * @param outputField Name of the output array field
+ * @param depthField Name of the depth field
+ * @param maxDepth Maximum traversal depth (-1 for unlimited)
+ * @param bidirectional Whether to traverse edges in both directions
+ * @param supportArray Whether to support array-typed fields (disables early visited filter
+ * pushdown)
+ * @param batchMode Whether to batch all source start values into a single unified BFS
+ * @param usePIT Whether to use PIT (Point In Time) search for complete results
+ * @param filter Optional filter condition for lookup table documents
+ */
+ protected GraphLookup(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode source,
+ RelNode lookup,
+ String startField,
+ String fromField,
+ String toField,
+ String outputField,
+ @Nullable String depthField,
+ int maxDepth,
+ boolean bidirectional,
+ boolean supportArray,
+ boolean batchMode,
+ boolean usePIT,
+ @Nullable RexNode filter) {
+ super(cluster, traitSet, source, lookup);
+ this.startField = startField;
+ this.fromField = fromField;
+ this.toField = toField;
+ this.outputField = outputField;
+ this.depthField = depthField;
+ this.maxDepth = maxDepth;
+ this.bidirectional = bidirectional;
+ this.supportArray = supportArray;
+ this.batchMode = batchMode;
+ this.usePIT = usePIT;
+ this.filter = filter;
+ }
+
+ /** Returns the source table RelNode. */
+ public RelNode getSource() {
+ return left;
+ }
+
+ /** Returns the lookup table RelNode. */
+ public RelNode getLookup() {
+ return right;
+ }
+
+ @Override
+ public abstract RelNode copy(RelTraitSet traitSet, List inputs);
+
+ @Override
+ protected RelDataType deriveRowType() {
+ if (outputRowType == null) {
+ RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder();
+
+ if (batchMode) {
+ // Batch mode: Output = [Array, Array]
+ // First field: aggregated source rows as array
+ RelDataType sourceRowType = getSource().getRowType();
+ RelDataType sourceArrayType =
+ getCluster().getTypeFactory().createArrayType(sourceRowType, -1);
+ builder.add(startField, sourceArrayType);
+
+ // Second field: aggregated lookup rows as array
+ RelDataType lookupRowType = getLookup().getRowType();
+ if (this.depthField != null) {
+ final RelDataTypeFactory.Builder lookupBuilder = getCluster().getTypeFactory().builder();
+ lookupBuilder.addAll(lookupRowType.getFieldList());
+ RelDataType depthType = getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER);
+ lookupBuilder.add(this.depthField, depthType);
+ lookupRowType = lookupBuilder.build();
+ }
+ RelDataType lookupArrayType =
+ getCluster().getTypeFactory().createArrayType(lookupRowType, -1);
+ builder.add(outputField, lookupArrayType);
+ } else {
+ // Normal mode: Output = source fields + output array field
+ // Add all source fields
+ for (var field : getSource().getRowType().getFieldList()) {
+ builder.add(field);
+ }
+
+ // Add output field (ARRAY type containing lookup row struct)
+ RelDataType lookupRowType = getLookup().getRowType();
+ if (this.depthField != null) {
+ final RelDataTypeFactory.Builder lookupBuilder = getCluster().getTypeFactory().builder();
+ lookupBuilder.addAll(lookupRowType.getFieldList());
+ RelDataType depthType = getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER);
+ lookupBuilder.add(this.depthField, depthType);
+ lookupRowType = lookupBuilder.build();
+ }
+ RelDataType arrayType = getCluster().getTypeFactory().createArrayType(lookupRowType, -1);
+ builder.add(outputField, arrayType);
+ }
+
+ outputRowType = builder.build();
+ }
+ return outputRowType;
+ }
+
+ @Override
+ public double estimateRowCount(RelMetadataQuery mq) {
+ // Batch mode aggregates all source rows into a single output row
+ return batchMode ? 1 : getSource().estimateRowCount(mq);
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw)
+ .item("fromField", fromField)
+ .item("toField", toField)
+ .item("outputField", outputField)
+ .item("depthField", depthField)
+ .item("maxDepth", maxDepth)
+ .item("bidirectional", bidirectional)
+ .itemIf("supportArray", supportArray, supportArray)
+ .itemIf("batchMode", batchMode, batchMode)
+ .itemIf("usePIT", usePIT, usePIT)
+ .itemIf("filter", filter, filter != null);
+ }
+}
diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java
new file mode 100644
index 0000000000..94db3689f8
--- /dev/null
+++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.calcite.plan.rel;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import lombok.Getter;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * Logical RelNode for graphLookup command. TODO: need to support trim fields and several transpose
+ * rules for this new added RelNode
+ */
+@Getter
+public class LogicalGraphLookup extends GraphLookup {
+
+ /**
+ * Creates a LogicalGraphLookup.
+ *
+ * @param cluster Cluster
+ * @param traitSet Trait set
+ * @param source Source table RelNode
+ * @param lookup Lookup table RelNode
+ * @param startField Field name for start entities
+ * @param fromField Field name for outgoing edges
+ * @param toField Field name for incoming edges
+ * @param outputField Name of the output array field
+ * @param depthField Name of the depth field
+ * @param maxDepth Maximum traversal depth (-1 for unlimited)
+ * @param bidirectional Whether to traverse edges in both directions
+ * @param supportArray Whether to support array-typed fields
+ * @param batchMode Whether to batch all source start values into a single unified BFS
+ * @param usePIT Whether to use PIT (Point In Time) search for complete results
+ * @param filter Optional filter condition for lookup table documents
+ */
+ protected LogicalGraphLookup(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode source,
+ RelNode lookup,
+ String startField,
+ String fromField,
+ String toField,
+ String outputField,
+ @Nullable String depthField,
+ int maxDepth,
+ boolean bidirectional,
+ boolean supportArray,
+ boolean batchMode,
+ boolean usePIT,
+ @Nullable RexNode filter) {
+ super(
+ cluster,
+ traitSet,
+ source,
+ lookup,
+ startField,
+ fromField,
+ toField,
+ outputField,
+ depthField,
+ maxDepth,
+ bidirectional,
+ supportArray,
+ batchMode,
+ usePIT,
+ filter);
+ }
+
+ /**
+ * Creates a LogicalGraphLookup with Convention.NONE.
+ *
+ * @param source Source table RelNode
+ * @param lookup Lookup table RelNode
+ * @param startField Field name for start entities
+ * @param fromField Field name for outgoing edges
+ * @param toField Field name for incoming edges
+ * @param outputField Name of the output array field
+ * @param depthField Named of the output depth field
+ * @param maxDepth Maximum traversal depth (-1 for unlimited)
+ * @param bidirectional Whether to traverse edges in both directions
+ * @param supportArray Whether to support array-typed fields
+ * @param batchMode Whether to batch all source start values into a single unified BFS
+ * @param usePIT Whether to use PIT (Point In Time) search for complete results
+ * @param filter Optional filter condition for lookup table documents
+ * @return A new LogicalGraphLookup instance
+ */
+ public static LogicalGraphLookup create(
+ RelNode source,
+ RelNode lookup,
+ String startField,
+ String fromField,
+ String toField,
+ String outputField,
+ @Nullable String depthField,
+ int maxDepth,
+ boolean bidirectional,
+ boolean supportArray,
+ boolean batchMode,
+ boolean usePIT,
+ @Nullable RexNode filter) {
+ RelOptCluster cluster = source.getCluster();
+ RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE);
+ return new LogicalGraphLookup(
+ cluster,
+ traitSet,
+ source,
+ lookup,
+ startField,
+ fromField,
+ toField,
+ outputField,
+ depthField,
+ maxDepth,
+ bidirectional,
+ supportArray,
+ batchMode,
+ usePIT,
+ filter);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List inputs) {
+ return new LogicalGraphLookup(
+ getCluster(),
+ traitSet,
+ inputs.get(0),
+ inputs.get(1),
+ startField,
+ fromField,
+ toField,
+ outputField,
+ depthField,
+ maxDepth,
+ bidirectional,
+ supportArray,
+ batchMode,
+ usePIT,
+ filter);
+ }
+}
diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java
index 17d99fb4fb..8dfe963081 100644
--- a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java
+++ b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java
@@ -223,7 +223,7 @@ public static ExprType convertSqlTypeNameToExprType(SqlTypeName sqlTypeName) {
case BIGINT -> LONG;
case FLOAT, REAL -> FLOAT;
case DOUBLE, DECIMAL -> DOUBLE; // TODO the decimal is only used for literal
- case CHAR, VARCHAR -> STRING;
+ case CHAR, VARCHAR, MULTISET -> STRING; // call toString() for MULTISET
case BOOLEAN -> BOOLEAN;
case DATE -> DATE;
case TIME, TIME_TZ, TIME_WITH_LOCAL_TIME_ZONE -> TIME;
diff --git a/docs/user/ppl/cmd/graphlookup.md b/docs/user/ppl/cmd/graphlookup.md
new file mode 100644
index 0000000000..2d6220edae
--- /dev/null
+++ b/docs/user/ppl/cmd/graphlookup.md
@@ -0,0 +1,357 @@
+
+# graphLookup (Experimental)
+
+The `graphLookup` command performs recursive graph traversal on a collection using a breadth-first search (BFS) algorithm. It searches for documents matching a start value and recursively traverses connections between documents based on specified fields. This is useful for hierarchical data like organizational charts, social networks, or routing graphs.
+
+## Syntax
+
+The `graphLookup` command has the following syntax:
+
+```syntax
+graphLookup startField= fromField= toField= [maxDepth=] [depthField=] [direction=(uni | bi)] [supportArray=(true | false)] [batchMode=(true | false)] [usePIT=(true | false)] [filter=()] as
+```
+
+The following are examples of the `graphLookup` command syntax:
+
+```syntax
+source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name as reportingHierarchy
+source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name maxDepth=2 as reportingHierarchy
+source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name depthField=level as reportingHierarchy
+source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name direction=bi as connections
+source = travelers | graphLookup airports startField=nearestAirport fromField=connects toField=airport supportArray=true as reachableAirports
+source = airports | graphLookup airports startField=airport fromField=connects toField=airport supportArray=true as reachableAirports
+source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name filter=(status = 'active' AND age > 18) as reportingHierarchy
+```
+
+## Parameters
+
+The `graphLookup` command supports the following parameters.
+
+| Parameter | Required/Optional | Description |
+| --- | --- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `` | Required | The name of the index to perform the graph traversal on. Can be the same as the source index for self-referential graphs. |
+| `startField=` | Required | The field in the source documents whose value is used to start the recursive search. The value of this field is matched against `toField` in the lookup index. We support both single value and array values as starting points. |
+| `fromField=` | Required | The field in the lookup index documents that contains the value to recurse on. After matching a document, the value of this field is used to find the next set of documents. It supports both single value and array values. |
+| `toField=` | Required | The field in the lookup index documents to match against. Documents where `toField` equals the current traversal value are included in the results. |
+| `maxDepth=` | Optional | The maximum recursion depth of hops. Default is `0`. A value of `0` means only the direct connections to the statr values are returned. A value of `1` means 1 hop connections (initial match plus one recursive step), and so on. |
+| `depthField=` | Optional | The name of the field to add to each traversed document indicating its recursion depth. If not specified, no depth field is added. Depth starts at `0` for the first level of matches. |
+| `direction=(uni \| bi)` | Optional | The traversal direction. `uni` (default) performs unidirectional traversal following edges in the forward direction only. `bi` performs bidirectional traversal, following edges in both directions. |
+| `supportArray=(true \| false)` | Optional | When `true`, disables early visited-node filter pushdown to OpenSearch. Default is `false`. Set to `true` when `fromField` or `toField` contains array values to ensure correct traversal behavior. See [Array Field Handling](#array-field-handling) for details. |
+| `batchMode=(true \| false)` | Optional | When `true`, collects all start values from all source rows and performs a single unified BFS traversal. Default is `false`. The output changes to two arrays: `[Array, Array]`. See [Batch Mode](#batch-mode) for details. |
+| `usePIT=(true \| false)` | Optional | When `true`, enables PIT (Point In Time) search for the lookup table, allowing paginated retrieval of complete results without the `max_result_window` size limit. Default is `false`. See [PIT Search](#pit-search) for details. |
+| `filter=()` | Optional | A filter condition to restrict which lookup table documents participate in the graph traversal. Only documents matching the condition are considered as candidates during BFS. Parentheses around the condition are required. Example: `filter=(status = 'active' AND age > 18)`. |
+| `as ` | Required | The name of the output array field that will contain all documents found during the graph traversal. |
+
+## How It Works
+
+The `graphLookup` command performs a breadth-first search (BFS) traversal:
+
+1. For each source document, extract the value of `startField`
+2. Query the lookup index to find documents where `toField` matches the start value
+3. Add matched documents to the result array
+4. Extract `fromField` values from matched documents to continue traversal
+5. Repeat steps 2-4 until no new documents are found or `maxDepth` is reached
+
+For bidirectional traversal (`direction=bi`), the algorithm also follows edges in the reverse direction by additionally matching `fromField` values.
+
+## Example 1: Employee Hierarchy Traversal
+
+Given an `employees` index with the following documents:
+
+| id | name | reportsTo |
+|----|------|-----------|
+| 1 | Dev | Eliot |
+| 2 | Eliot | Ron |
+| 3 | Ron | Andrew |
+| 4 | Andrew | null |
+| 5 | Asya | Ron |
+| 6 | Dan | Andrew |
+
+The following query finds the reporting chain for each employee:
+
+```ppl ignore
+source = employees
+ | graphLookup employees
+ startField=reportsTo
+ fromField=reportsTo
+ toField=name
+ as reportingHierarchy
+```
+
+The query returns the following results:
+
+```text
++--------+----------+----+---------------------+
+| name | reportsTo| id | reportingHierarchy |
++--------+----------+----+---------------------+
+| Dev | Eliot | 1 | [{Eliot, Ron, 2}] |
+| Eliot | Ron | 2 | [{Ron, Andrew, 3}] |
+| Ron | Andrew | 3 | [{Andrew, null, 4}] |
+| Andrew | null | 4 | [] |
+| Asya | Ron | 5 | [{Ron, Andrew, 3}] |
+| Dan | Andrew | 6 | [{Andrew, null, 4}] |
++--------+----------+----+---------------------+
+```
+
+For Dev, the traversal starts with `reportsTo="Eliot"`, finds the Eliot record, and returns it in the `reportingHierarchy` array.
+
+## Example 2: Employee Hierarchy with Depth Tracking
+
+The following query adds a depth field to track how many levels each manager is from the employee:
+
+```ppl ignore
+source = employees
+ | graphLookup employees
+ startField=reportsTo
+ fromField=reportsTo
+ toField=name
+ depthField=level
+ as reportingHierarchy
+```
+
+The query returns the following results:
+
+```text
++--------+----------+----+------------------------+
+| name | reportsTo| id | reportingHierarchy |
++--------+----------+----+------------------------+
+| Dev | Eliot | 1 | [{Eliot, Ron, 2, 0}] |
+| Eliot | Ron | 2 | [{Ron, Andrew, 3, 0}] |
+| Ron | Andrew | 3 | [{Andrew, null, 4, 0}] |
+| Andrew | null | 4 | [] |
+| Asya | Ron | 5 | [{Ron, Andrew, 3, 0}] |
+| Dan | Andrew | 6 | [{Andrew, null, 4, 0}] |
++--------+----------+----+------------------------+
+```
+
+The depth field `level` is appended to each document in the result array. A value of `0` indicates the first level of matches.
+
+## Example 3: Limited Depth Traversal
+
+The following query limits traversal to 2 levels using `maxDepth=1`:
+
+```ppl ignore
+source = employees
+ | graphLookup employees
+ startField=reportsTo
+ fromField=reportsTo
+ toField=name
+ maxDepth=1
+ as reportingHierarchy
+```
+
+The query returns the following results:
+
+```text
++--------+----------+----+--------------------------------------+
+| name | reportsTo| id | reportingHierarchy |
++--------+----------+----+--------------------------------------+
+| Dev | Eliot | 1 | [{Eliot, Ron, 2}, {Ron, Andrew, 3}] |
+| Eliot | Ron | 2 | [{Ron, Andrew, 3}, {Andrew, null, 4}]|
+| Ron | Andrew | 3 | [{Andrew, null, 4}] |
+| Andrew | null | 4 | [] |
+| Asya | Ron | 5 | [{Ron, Andrew, 3}, {Andrew, null, 4}]|
+| Dan | Andrew | 6 | [{Andrew, null, 4}] |
++--------+----------+----+--------------------------------------+
+```
+
+With `maxDepth=1`, the traversal goes two levels deep (depth 0 and depth 1).
+
+## Example 4: Airport Connections Graph
+
+Given an `airports` index with the following documents:
+
+| airport | connects |
+|---------|----------|
+| JFK | [BOS, ORD] |
+| BOS | [JFK, PWM] |
+| ORD | [JFK] |
+| PWM | [BOS, LHR] |
+| LHR | [PWM] |
+
+The following query finds reachable airports from each airport:
+
+```ppl ignore
+source = airports
+ | graphLookup airports
+ startField=airport
+ fromField=connects
+ toField=airport
+ as reachableAirports
+```
+
+The query returns the following results:
+
+```text
++---------+------------+---------------------+
+| airport | connects | reachableAirports |
++---------+------------+---------------------+
+| JFK | [BOS, ORD] | [{JFK, [BOS, ORD]}] |
+| BOS | [JFK, PWM] | [{BOS, [JFK, PWM]}] |
+| ORD | [JFK] | [{ORD, [JFK]}] |
+| PWM | [BOS, LHR] | [{PWM, [BOS, LHR]}] |
+| LHR | [PWM] | [{LHR, [PWM]}] |
++---------+------------+---------------------+
+```
+
+## Example 5: Cross-Index Graph Lookup
+
+The `graphLookup` command can use different source and lookup indexes. Given a `travelers` index:
+
+| name | nearestAirport |
+|------|----------------|
+| Dev | JFK |
+| Eliot | JFK |
+| Jeff | BOS |
+
+The following query finds reachable airports for each traveler:
+
+```ppl ignore
+source = travelers
+ | graphLookup airports
+ startField=nearestAirport
+ fromField=connects
+ toField=airport
+ as reachableAirports
+```
+
+The query returns the following results:
+
+```text
++-------+----------------+---------------------+
+| name | nearestAirport | reachableAirports |
++-------+----------------+---------------------+
+| Dev | JFK | [{JFK, [BOS, ORD]}] |
+| Eliot | JFK | [{JFK, [BOS, ORD]}] |
+| Jeff | BOS | [{BOS, [JFK, PWM]}] |
++-------+----------------+---------------------+
+```
+
+## Example 6: Bidirectional Traversal
+
+The following query performs bidirectional traversal to find both managers and colleagues who share the same manager:
+
+```ppl ignore
+source = employees
+ | where name = 'Ron'
+ | graphLookup employees
+ startField=reportsTo
+ fromField=reportsTo
+ toField=name
+ direction=bi
+ as connections
+```
+
+The query returns the following results:
+
+```text
++------+----------+----+------------------------------------------------+
+| name | reportsTo| id | connections |
++------+----------+----+------------------------------------------------+
+| Ron | Andrew | 3 | [{Ron, Andrew, 3}, {Andrew, null, 4}, {Dan, Andrew, 6}] |
++------+----------+----+------------------------------------------------+
+```
+
+With bidirectional traversal, Ron's connections include:
+- His own record (Ron reports to Andrew)
+- His manager (Andrew)
+- His peer (Dan, who also reports to Andrew)
+
+## Batch Mode
+
+When `batchMode=true`, the `graphLookup` command collects all start values from all source rows and performs a single unified BFS traversal instead of separate traversals per row.
+
+### Output Format Change
+
+In batch mode, the output is a **single row** with two arrays:
+- First array: All source rows collected
+- Second array: All lookup results from the unified BFS traversal
+
+### When to Use Batch Mode
+
+Use `batchMode=true` when:
+- You want to find all nodes reachable from **any** of the source start values
+- You need a global view of the graph connectivity from multiple starting points
+- You want to avoid duplicate traversals when multiple source rows share overlapping paths
+
+### Example
+
+```ppl ignore
+source = travelers
+ | graphLookup airports
+ startField=nearestAirport
+ fromField=connects
+ toField=airport
+ batchMode=true
+ maxDepth=2
+ as reachableAirports
+```
+
+**Normal mode** (default): Each traveler gets their own list of reachable airports
+```text
+| name | nearestAirport | reachableAirports |
+|-------|----------------|-------------------|
+| Dev | JFK | [JFK, BOS, ORD] |
+| Jeff | BOS | [BOS, JFK, PWM] |
+```
+
+**Batch mode**: A single row with all travelers and all reachable airports combined
+```text
+| travelers | reachableAirports |
+|----------------------------------------|-----------------------------|
+| [{Dev, JFK}, {Jeff, BOS}] | [JFK, BOS, ORD, PWM, ...] |
+```
+
+## Array Field Handling
+
+When the `fromField` or `toField` contains array values, you should set `supportArray=true` to ensure correct traversal behavior.
+
+## PIT Search
+
+By default, each level of BFS traversal limits the number of returned documents to the `max_result_window` setting of the lookup index (typically 10,000). This avoids the overhead of PIT (Point In Time) search but may return incomplete results when a single traversal level matches more documents than the limit.
+
+When `usePIT=true`, this limit is removed and the lookup table uses PIT-based pagination, which ensures all matching documents are retrieved at each traversal level. This provides complete and accurate results at the cost of additional search overhead.
+
+### When to Use PIT Search
+
+Use `usePIT=true` when:
+- The graph has high-degree nodes where a single traversal level may match more than `max_result_window` documents
+- Result completeness is more important than query performance
+- You observe incomplete or missing results with the default setting
+
+### Example
+
+```ppl ignore
+source = employees
+ | graphLookup employees
+ startField=reportsTo
+ fromField=reportsTo
+ toField=name
+ usePIT=true
+ as reportingHierarchy
+```
+
+## Filtered Graph Traversal
+
+The `filter` parameter restricts which documents in the lookup table are considered during the BFS traversal. Only documents matching the filter condition participate as candidates at each traversal level.
+
+### Example
+
+The following query traverses only active employees in the reporting hierarchy:
+
+```ppl ignore
+source = employees
+ | graphLookup employees
+ startField=reportsTo
+ fromField=reportsTo
+ toField=name
+ filter=(status = 'active')
+ as reportingHierarchy
+```
+
+The filter is applied at the OpenSearch query level, so it combines efficiently with the BFS traversal queries. At each BFS level, the query sent to OpenSearch is effectively: `bool { filter: [user_filter, bfs_terms_query] }`.
+
+## Limitations
+
+- The source input, which provides the starting point for the traversal, has a limitation of 100 documents to avoid performance issues.
+- When `usePIT=false` (default), each level of traversal search returns documents up to the `max_result_window` of the lookup index, which may result in incomplete data. Set `usePIT=true` to retrieve complete results.
diff --git a/docs/user/ppl/index.md b/docs/user/ppl/index.md
index 12afe96eea..718aa51f0f 100644
--- a/docs/user/ppl/index.md
+++ b/docs/user/ppl/index.md
@@ -82,6 +82,8 @@ source=accounts
| [addcoltotals command](cmd/addcoltotals.md) | 3.5 | stable (since 3.5) | Adds column values and appends a totals row. |
| [transpose command](cmd/transpose.md) | 3.5 | stable (since 3.5) | Transpose rows to columns. |
| [mvcombine command](cmd/mvcombine.md) | 3.5 | stable (since 3.4) | Combines values of a specified field across rows identical on all other fields. |
+| [graphlookup command](cmd/graphlookup.md) | 3.5 | experimental (since 3.5) | Performs recursive graph traversal on a collection using a BFS algorithm.|
+
- [Syntax](cmd/syntax.md) - PPL query structure and command syntax formatting
* **Functions**
@@ -101,4 +103,4 @@ source=accounts
* **Optimization**
- [Optimization](../../user/optimization/optimization.rst)
* **Limitations**
- - [Limitations](limitations/limitations.md)
\ No newline at end of file
+ - [Limitations](limitations/limitations.md)
diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java
new file mode 100644
index 0000000000..fbaefdb8c3
--- /dev/null
+++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java
@@ -0,0 +1,738 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.calcite.remote;
+
+import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_AIRPORTS;
+import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_EMPLOYEES;
+import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_TRAVELERS;
+import static org.opensearch.sql.util.MatcherUtils.rows;
+import static org.opensearch.sql.util.MatcherUtils.schema;
+import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
+import static org.opensearch.sql.util.MatcherUtils.verifySchema;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.json.JSONObject;
+import org.junit.Test;
+import org.opensearch.sql.ppl.PPLIntegTestCase;
+
+/**
+ * Integration tests for graphLookup command. Test cases are inspired by MongoDB's $graphLookup
+ * examples.
+ *
+ *