From 1c348b44d7748585bb7dc5a40eae67fababccbc3 Mon Sep 17 00:00:00 2001 From: eitsupi Date: Wed, 18 Mar 2026 13:33:29 +0000 Subject: [PATCH 01/13] fix: expand CTE-based SELECT * in lineage() without schema lineage() (without schema) failed to resolve columns through CTEs when the outer query used SELECT *, because find_select_expr() could not match column names against the Star expression. Pre-qualify the AST with an empty MappingSchema to expand stars using CTE column metadata before building the lineage scope. A new `qualify_columns` option on QualifyColumnsOptions allows skipping column qualification while still performing star expansion. Known limitation: nested CTE star expansion (e.g., cte2 AS (SELECT * FROM cte1)) is not yet supported because qualify_columns processes each SELECT independently. --- crates/polyglot-sql/src/lineage.rs | 112 +++++++++++++++++- .../src/optimizer/qualify_columns.rs | 12 +- 2 files changed, 121 insertions(+), 3 deletions(-) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index 75e2dd2..a52aac3 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -9,7 +9,7 @@ use crate::dialects::DialectType; use crate::expressions::Expression; use crate::optimizer::annotate_types::annotate_types; use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions}; -use crate::schema::{normalize_name, Schema}; +use crate::schema::{normalize_name, MappingSchema, Schema}; use crate::scope::{build_scope, Scope}; use crate::traversal::ExpressionWalk; use crate::{Error, Result}; @@ -170,7 +170,22 @@ fn lineage_from_expression( dialect: Option, trim_selects: bool, ) -> Result { - let scope = build_scope(sql); + // Pre-qualify with an empty schema to expand CTE-based SELECT * expressions. + // This enables lineage() (without schema) to resolve columns through CTEs, + // matching the behavior of Python sqlglot. + // + // TODO: Nested CTE star expansion (e.g., cte2 AS (SELECT * FROM cte1)) + // does not work because qualify_columns processes each SELECT independently + // via transform_recursive, so inter-CTE references are not resolved. + // See dlin-mml.2.2 comments for detailed analysis. + let empty_schema = MappingSchema::new(); + let options = QualifyColumnsOptions::new() + .with_expand_stars(true) + .with_expand_alias_refs(false) + .with_qualify_columns(false); + let qualified = qualify_columns(sql.clone(), &empty_schema, &options).unwrap_or(sql.clone()); + + let scope = build_scope(&qualified); to_node( ColumnRef::Name(column), &scope, @@ -2669,4 +2684,97 @@ mod tests { names ); } + + // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) --- + + #[test] + fn test_lineage_cte_select_star() { + // Ported from sqlglot: test_lineage_source_with_star + // WITH y AS (SELECT * FROM x) SELECT a FROM y + // After star expansion: SELECT y.a AS a FROM y + let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y"); + let node = lineage("a", &expr, None, false).unwrap(); + + assert_eq!(node.name, "a"); + // Should successfully resolve column 'a' through the CTE + // (previously failed with "Cannot find column 'a' in query") + assert!( + !node.downstream.is_empty(), + "Expected downstream nodes tracing through CTE, got none" + ); + } + + #[test] + fn test_lineage_cte_select_star_renamed_column() { + // dbt standard pattern: CTE with column rename + outer SELECT * + // This is the primary use case for dbt projects (jaffle-shop etc.) + let expr = + parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed"); + let node = lineage("customer_id", &expr, None, false).unwrap(); + + assert_eq!(node.name, "customer_id"); + // Should trace customer_id → renamed CTE → source.id + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.len() >= 2, + "Expected at least 2 nodes (customer_id → source), got: {:?}", + all_names + ); + } + + #[test] + fn test_lineage_cte_select_star_multiple_columns() { + // CTE exposes multiple columns, outer SELECT * should resolve each + let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte"); + + for col in &["a", "b", "c"] { + let node = lineage(col, &expr, None, false).unwrap(); + assert_eq!(node.name, *col); + // Verify lineage resolves without error (star expanded to explicit columns) + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.len() >= 2, + "Expected at least 2 nodes for column {}, got: {:?}", + col, + all_names + ); + } + } + + #[test] + fn test_lineage_nested_cte_select_star() { + // TODO: Nested CTE star expansion is not yet supported. + // qualify_columns processes each SELECT independently via transform_recursive, + // so inter-CTE references (cte2 referencing cte1) are not resolved. + // See dlin-mml.2.2 comments for detailed analysis. + let expr = parse( + "WITH cte1 AS (SELECT a FROM t), \ + cte2 AS (SELECT * FROM cte1) \ + SELECT * FROM cte2", + ); + let result = lineage("a", &expr, None, false); + + // Currently fails — once nested CTE star expansion is implemented, + // this test should be updated to assert success and trace through + // cte2 → cte1 → t.a (at least 3 nodes). + assert!( + result.is_err(), + "Nested CTE star expansion is not yet supported; \ + update this test when it is implemented" + ); + } + + #[test] + fn test_lineage_subquery_select_star() { + // Ported from sqlglot: test_select_star + // SELECT x FROM (SELECT * FROM table_a) + let expr = parse("SELECT x FROM (SELECT * FROM table_a)"); + let node = lineage("x", &expr, None, false).unwrap(); + + assert_eq!(node.name, "x"); + assert!( + !node.downstream.is_empty(), + "Expected downstream nodes for subquery with SELECT *, got none" + ); + } } diff --git a/crates/polyglot-sql/src/optimizer/qualify_columns.rs b/crates/polyglot-sql/src/optimizer/qualify_columns.rs index cf1f3fb..3b2ac6a 100644 --- a/crates/polyglot-sql/src/optimizer/qualify_columns.rs +++ b/crates/polyglot-sql/src/optimizer/qualify_columns.rs @@ -57,6 +57,9 @@ pub struct QualifyColumnsOptions { pub infer_schema: Option, /// Whether to allow partial qualification pub allow_partial_qualification: bool, + /// Whether to qualify column references (add table qualifiers). + /// When false, only star expansion and alias ref expansion are performed. + pub qualify_columns: bool, /// The dialect for dialect-specific behavior pub dialect: Option, } @@ -69,6 +72,7 @@ impl QualifyColumnsOptions { expand_stars: true, infer_schema: None, allow_partial_qualification: false, + qualify_columns: true, dialect: None, } } @@ -96,6 +100,12 @@ impl QualifyColumnsOptions { self.allow_partial_qualification = allow; self } + + /// Set whether to qualify column references + pub fn with_qualify_columns(mut self, qualify: bool) -> Self { + self.qualify_columns = qualify; + self + } } /// Rewrite SQL AST to have fully qualified columns. @@ -150,7 +160,7 @@ pub fn qualify_columns( }; // 2. Qualify columns (add table qualifiers) - if first_error.borrow().is_none() { + if first_error.borrow().is_none() && options.qualify_columns { if let Err(err) = qualify_columns_in_scope( &mut select, &scope, From 49381f6d48e4c8597f435fd15a80dec8b17c1bb0 Mon Sep 17 00:00:00 2001 From: eitsupi Date: Wed, 18 Mar 2026 13:58:00 +0000 Subject: [PATCH 02/13] fix: resolve nested CTE SELECT * star expansion for column lineage Replace qualify_columns-based star expansion with a dedicated expand_cte_stars preprocessing step that walks CTEs in definition order and propagates resolved column lists. This enables nested CTE patterns like `cte2 AS (SELECT * FROM cte1)` which qualify_columns could not resolve because it processes each SELECT independently via transform_recursive. Also removes the now-unused `qualify_columns` flag from QualifyColumnsOptions since its only consumer was the replaced code path. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/polyglot-sql/src/lineage.rs | 389 ++++++++++++++++-- .../src/optimizer/qualify_columns.rs | 11 +- 2 files changed, 362 insertions(+), 38 deletions(-) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index a52aac3..3180f30 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -6,15 +6,15 @@ //! use crate::dialects::DialectType; -use crate::expressions::Expression; +use crate::expressions::{Expression, Identifier, Select}; use crate::optimizer::annotate_types::annotate_types; use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions}; -use crate::schema::{normalize_name, MappingSchema, Schema}; +use crate::schema::{normalize_name, Schema}; use crate::scope::{build_scope, Scope}; use crate::traversal::ExpressionWalk; use crate::{Error, Result}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; /// A node in the column lineage graph #[derive(Debug, Clone, Serialize, Deserialize)] @@ -170,20 +170,10 @@ fn lineage_from_expression( dialect: Option, trim_selects: bool, ) -> Result { - // Pre-qualify with an empty schema to expand CTE-based SELECT * expressions. - // This enables lineage() (without schema) to resolve columns through CTEs, - // matching the behavior of Python sqlglot. - // - // TODO: Nested CTE star expansion (e.g., cte2 AS (SELECT * FROM cte1)) - // does not work because qualify_columns processes each SELECT independently - // via transform_recursive, so inter-CTE references are not resolved. - // See dlin-mml.2.2 comments for detailed analysis. - let empty_schema = MappingSchema::new(); - let options = QualifyColumnsOptions::new() - .with_expand_stars(true) - .with_expand_alias_refs(false) - .with_qualify_columns(false); - let qualified = qualify_columns(sql.clone(), &empty_schema, &options).unwrap_or(sql.clone()); + // Expand CTE-based SELECT * expressions by walking CTEs in definition order + // and propagating resolved column lists, enabling nested CTE star resolution. + let mut qualified = sql.clone(); + expand_cte_stars(&mut qualified); let scope = build_scope(&qualified); to_node( @@ -197,6 +187,291 @@ fn lineage_from_expression( ) } +// --------------------------------------------------------------------------- +// CTE star expansion +// --------------------------------------------------------------------------- + +/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating +/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1)) +/// which qualify_columns cannot resolve because it processes each SELECT independently. +fn expand_cte_stars(expr: &mut Expression) { + let select = match expr { + Expression::Select(s) => s, + _ => return, + }; + + let with = match &mut select.with { + Some(w) => w, + None => return, + }; + + // Skip recursive CTEs — column resolution is complex and not needed for dbt patterns + if with.recursive { + return; + } + + let mut resolved_cte_columns: HashMap> = HashMap::new(); + + for cte in &mut with.ctes { + let cte_name = cte.alias.name.to_lowercase(); + + // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that + if !cte.columns.is_empty() { + let cols: Vec = cte.columns.iter().map(|c| c.name.clone()).collect(); + resolved_cte_columns.insert(cte_name, cols); + continue; + } + + // Get the SELECT from the CTE body (handle UNION by taking left branch) + let body_select = match get_leftmost_select_mut(&mut cte.this) { + Some(s) => s, + None => continue, + }; + + let columns = extract_and_expand_select_columns(body_select, &resolved_cte_columns); + resolved_cte_columns.insert(cte_name, columns); + } + + // Also expand stars in the outer SELECT itself + expand_select_stars(select, &resolved_cte_columns); +} + +/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT. +fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> { + match expr { + Expression::Select(s) => Some(s), + Expression::Union(u) => get_leftmost_select_mut(&mut u.left), + Expression::Intersect(i) => get_leftmost_select_mut(&mut i.left), + Expression::Except(e) => get_leftmost_select_mut(&mut e.left), + Expression::Paren(p) => get_leftmost_select_mut(&mut p.this), + _ => None, + } +} + +/// Extract column names from a SELECT, expanding any Star expressions using resolved CTEs. +/// If stars are found and can be expanded, the SELECT's expressions are rewritten in place. +fn extract_and_expand_select_columns( + select: &mut Select, + resolved_ctes: &HashMap>, +) -> Vec { + let has_star = select.expressions.iter().any(|e| matches!(e, Expression::Star(_))); + let has_qualified_star = select.expressions.iter().any(|e| { + matches!(e, Expression::Column(c) if c.name.name == "*") + }); + + if !has_star && !has_qualified_star { + // No stars — just extract column names + return select + .expressions + .iter() + .filter_map(|e| get_expression_output_name(e)) + .collect(); + } + + // Expand stars + let source_names = get_select_source_names(select); + let mut new_expressions = Vec::new(); + let mut result_columns = Vec::new(); + + for expr in &select.expressions { + match expr { + Expression::Star(star) => { + let qual = star.table.as_ref().map(|t| t.name.as_str()); + if let Some(expanded) = + expand_star_from_sources(qual, &source_names, resolved_ctes) + { + for col_name in &expanded { + new_expressions.push(make_column_expr(col_name, None)); + result_columns.push(col_name.clone()); + } + } else { + new_expressions.push(expr.clone()); + result_columns.push("*".to_string()); + } + } + Expression::Column(c) if c.name.name == "*" => { + let qual = c.table.as_ref().map(|t| t.name.as_str()); + if let Some(expanded) = + expand_star_from_sources(qual, &source_names, resolved_ctes) + { + for col_name in &expanded { + new_expressions + .push(make_column_expr(col_name, c.table.as_ref())); + result_columns.push(col_name.clone()); + } + } else { + new_expressions.push(expr.clone()); + result_columns.push("*".to_string()); + } + } + _ => { + new_expressions.push(expr.clone()); + if let Some(name) = get_expression_output_name(expr) { + result_columns.push(name); + } + } + } + } + + select.expressions = new_expressions; + result_columns +} + +/// Expand stars in the outer SELECT (not a CTE body). +fn expand_select_stars(select: &mut Select, resolved_ctes: &HashMap>) { + let has_star = select.expressions.iter().any(|e| matches!(e, Expression::Star(_))); + let has_qualified_star = select.expressions.iter().any(|e| { + matches!(e, Expression::Column(c) if c.name.name == "*") + }); + + if !has_star && !has_qualified_star { + return; + } + + let source_names = get_select_source_names(select); + let mut new_expressions = Vec::new(); + + for expr in &select.expressions { + match expr { + Expression::Star(star) => { + let qual = star.table.as_ref().map(|t| t.name.as_str()); + if let Some(expanded) = + expand_star_from_sources(qual, &source_names, resolved_ctes) + { + for col_name in &expanded { + new_expressions.push(make_column_expr(col_name, None)); + } + } else { + new_expressions.push(expr.clone()); + } + } + Expression::Column(c) if c.name.name == "*" => { + let qual = c.table.as_ref().map(|t| t.name.as_str()); + if let Some(expanded) = + expand_star_from_sources(qual, &source_names, resolved_ctes) + { + for col_name in &expanded { + new_expressions + .push(make_column_expr(col_name, c.table.as_ref())); + } + } else { + new_expressions.push(expr.clone()); + } + } + _ => { + new_expressions.push(expr.clone()); + } + } + } + + select.expressions = new_expressions; +} + +/// Try to expand a star expression by looking up source columns from resolved CTEs. +/// `qualifier`: Optional table qualifier name (for `table.*`). If None, expand all sources. +fn expand_star_from_sources( + qualifier: Option<&str>, + source_names: &[(String, String)], + resolved_ctes: &HashMap>, +) -> Option> { + let mut expanded = Vec::new(); + + if let Some(qual) = qualifier { + // Qualified star: table.* + let qual_lower = qual.to_lowercase(); + // Find which source this qualifier refers to + for (alias, original) in source_names { + if alias.to_lowercase() == qual_lower || original.to_lowercase() == qual_lower { + if let Some(cols) = resolved_ctes.get(&original.to_lowercase()) { + expanded.extend(cols.iter().cloned()); + return Some(expanded); + } + } + } + None + } else { + // Unqualified star: expand all CTE sources + let mut any_expanded = false; + for (_alias, original) in source_names { + if let Some(cols) = resolved_ctes.get(&original.to_lowercase()) { + expanded.extend(cols.iter().cloned()); + any_expanded = true; + } else { + // Source is not a resolved CTE — can't fully expand + return None; + } + } + if any_expanded { + Some(expanded) + } else { + None + } + } +} + +/// Create a Column expression with the given name and optional table qualifier. +fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression { + Expression::Column(Box::new(crate::expressions::Column { + name: Identifier::new(name), + table: table.cloned(), + join_mark: false, + trailing_comments: Vec::new(), + span: None, + inferred_type: None, + })) +} + +/// Extract the output name of a SELECT expression. +fn get_expression_output_name(expr: &Expression) -> Option { + match expr { + Expression::Alias(a) => Some(a.alias.name.clone()), + Expression::Column(c) => Some(c.name.name.clone()), + Expression::Identifier(id) => Some(id.name.clone()), + Expression::Star(_) => Some("*".to_string()), + _ => None, + } +} + +/// Extract source names from a SELECT's FROM and JOIN clauses. +/// Returns (alias_or_name, original_name) pairs. +fn get_select_source_names(select: &Select) -> Vec<(String, String)> { + let mut names = Vec::new(); + + fn extract_source(expr: &Expression) -> Option<(String, String)> { + match expr { + Expression::Table(t) => { + let original = t.name.name.clone(); + let alias = t + .alias + .as_ref() + .map(|a| a.name.clone()) + .unwrap_or_else(|| original.clone()); + Some((alias, original)) + } + Expression::Subquery(s) => { + let alias = s.alias.as_ref()?.name.clone(); + Some((alias.clone(), alias)) + } + Expression::Paren(p) => extract_source(&p.this), + _ => None, + } + } + + if let Some(from) = &select.from { + for expr in &from.expressions { + if let Some(pair) = extract_source(expr) { + names.push(pair); + } + } + } + for join in &select.joins { + if let Some(pair) = extract_source(&join.this) { + names.push(pair); + } + } + names +} + /// Get all source tables from a lineage graph pub fn get_source_tables(node: &LineageNode) -> HashSet { let mut tables = HashSet::new(); @@ -2743,27 +3018,85 @@ mod tests { #[test] fn test_lineage_nested_cte_select_star() { - // TODO: Nested CTE star expansion is not yet supported. - // qualify_columns processes each SELECT independently via transform_recursive, - // so inter-CTE references (cte2 referencing cte1) are not resolved. - // See dlin-mml.2.2 comments for detailed analysis. + // Nested CTE star expansion: cte2 references cte1 via SELECT * let expr = parse( "WITH cte1 AS (SELECT a FROM t), \ cte2 AS (SELECT * FROM cte1) \ SELECT * FROM cte2", ); - let result = lineage("a", &expr, None, false); + let node = lineage("a", &expr, None, false).unwrap(); - // Currently fails — once nested CTE star expansion is implemented, - // this test should be updated to assert success and trace through - // cte2 → cte1 → t.a (at least 3 nodes). + assert_eq!(node.name, "a"); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); assert!( - result.is_err(), - "Nested CTE star expansion is not yet supported; \ - update this test when it is implemented" + all_names.len() >= 3, + "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}", + all_names ); } + #[test] + fn test_lineage_three_level_nested_cte_star() { + // Three-level nested CTE: cte3 → cte2 → cte1 → t + let expr = parse( + "WITH cte1 AS (SELECT x FROM t), \ + cte2 AS (SELECT * FROM cte1), \ + cte3 AS (SELECT * FROM cte2) \ + SELECT * FROM cte3", + ); + let node = lineage("x", &expr, None, false).unwrap(); + + assert_eq!(node.name, "x"); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.len() >= 4, + "Expected at least 4 nodes through 3-level CTE chain, got: {:?}", + all_names + ); + } + + #[test] + fn test_lineage_cte_union_star() { + // CTE with UNION body, outer SELECT * should resolve from left branch + let expr = parse( + "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \ + SELECT * FROM cte", + ); + let node = lineage("a", &expr, None, false).unwrap(); + + assert_eq!(node.name, "a"); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.len() >= 2, + "Expected at least 2 nodes for CTE union star, got: {:?}", + all_names + ); + } + + #[test] + fn test_lineage_cte_star_unknown_table() { + // When CTE references an unknown table, star expansion is skipped gracefully + // and lineage falls back to normal resolution (which may fail) + let expr = parse( + "WITH cte AS (SELECT * FROM unknown_table) \ + SELECT * FROM cte", + ); + // This should not panic — it may succeed or fail depending on resolution, + // but should not crash + let _result = lineage("x", &expr, None, false); + } + + #[test] + fn test_lineage_cte_explicit_columns() { + // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t) + let expr = parse( + "WITH cte(x, y) AS (SELECT a, b FROM t) \ + SELECT * FROM cte", + ); + let node = lineage("x", &expr, None, false).unwrap(); + assert_eq!(node.name, "x"); + } + #[test] fn test_lineage_subquery_select_star() { // Ported from sqlglot: test_select_star diff --git a/crates/polyglot-sql/src/optimizer/qualify_columns.rs b/crates/polyglot-sql/src/optimizer/qualify_columns.rs index 3b2ac6a..f50160f 100644 --- a/crates/polyglot-sql/src/optimizer/qualify_columns.rs +++ b/crates/polyglot-sql/src/optimizer/qualify_columns.rs @@ -57,9 +57,6 @@ pub struct QualifyColumnsOptions { pub infer_schema: Option, /// Whether to allow partial qualification pub allow_partial_qualification: bool, - /// Whether to qualify column references (add table qualifiers). - /// When false, only star expansion and alias ref expansion are performed. - pub qualify_columns: bool, /// The dialect for dialect-specific behavior pub dialect: Option, } @@ -72,7 +69,6 @@ impl QualifyColumnsOptions { expand_stars: true, infer_schema: None, allow_partial_qualification: false, - qualify_columns: true, dialect: None, } } @@ -101,11 +97,6 @@ impl QualifyColumnsOptions { self } - /// Set whether to qualify column references - pub fn with_qualify_columns(mut self, qualify: bool) -> Self { - self.qualify_columns = qualify; - self - } } /// Rewrite SQL AST to have fully qualified columns. @@ -160,7 +151,7 @@ pub fn qualify_columns( }; // 2. Qualify columns (add table qualifiers) - if first_error.borrow().is_none() && options.qualify_columns { + if first_error.borrow().is_none() { if let Err(err) = qualify_columns_in_scope( &mut select, &scope, From 5e4553b2b8c844be7187bb9af079be936e22a129 Mon Sep 17 00:00:00 2001 From: eitsupi Date: Wed, 18 Mar 2026 14:03:00 +0000 Subject: [PATCH 03/13] fix: eliminate redundant AST clone and deduplicate star expansion logic - Move expand_cte_stars to public callers (lineage, lineage_with_schema) so it runs on already-owned expressions, removing a redundant deep clone in lineage_from_expression - Merge extract_and_expand_select_columns and expand_select_stars into a single rewrite_stars_in_select function - Add test for qualified star expansion (SELECT cte.* FROM cte) Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/polyglot-sql/src/lineage.rs | 97 ++++++++++-------------------- 1 file changed, 33 insertions(+), 64 deletions(-) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index 3180f30..3ca74ab 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -119,7 +119,9 @@ pub fn lineage( dialect: Option, trim_selects: bool, ) -> Result { - lineage_from_expression(column, sql, dialect, trim_selects) + let mut owned = sql.clone(); + expand_cte_stars(&mut owned); + lineage_from_expression(column, &owned, dialect, trim_selects) } /// Build the lineage graph for a column in a SQL query using optional schema metadata. @@ -161,6 +163,9 @@ pub fn lineage_with_schema( // Annotate types in-place so lineage nodes carry type information annotate_types(&mut qualified_expression, schema, dialect); + // Expand CTE stars on the already-owned expression (no extra clone) + expand_cte_stars(&mut qualified_expression); + lineage_from_expression(column, &qualified_expression, dialect, trim_selects) } @@ -170,12 +175,7 @@ fn lineage_from_expression( dialect: Option, trim_selects: bool, ) -> Result { - // Expand CTE-based SELECT * expressions by walking CTEs in definition order - // and propagating resolved column lists, enabling nested CTE star resolution. - let mut qualified = sql.clone(); - expand_cte_stars(&mut qualified); - - let scope = build_scope(&qualified); + let scope = build_scope(sql); to_node( ColumnRef::Name(column), &scope, @@ -228,12 +228,12 @@ fn expand_cte_stars(expr: &mut Expression) { None => continue, }; - let columns = extract_and_expand_select_columns(body_select, &resolved_cte_columns); + let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns); resolved_cte_columns.insert(cte_name, columns); } // Also expand stars in the outer SELECT itself - expand_select_stars(select, &resolved_cte_columns); + rewrite_stars_in_select(select, &resolved_cte_columns); } /// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT. @@ -248,9 +248,9 @@ fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> { } } -/// Extract column names from a SELECT, expanding any Star expressions using resolved CTEs. -/// If stars are found and can be expanded, the SELECT's expressions are rewritten in place. -fn extract_and_expand_select_columns( +/// Rewrite star expressions in a SELECT using resolved CTE column lists. +/// Returns the list of output column names after expansion. +fn rewrite_stars_in_select( select: &mut Select, resolved_ctes: &HashMap>, ) -> Vec { @@ -260,7 +260,7 @@ fn extract_and_expand_select_columns( }); if !has_star && !has_qualified_star { - // No stars — just extract column names + // No stars — just extract column names without rewriting return select .expressions .iter() @@ -268,7 +268,6 @@ fn extract_and_expand_select_columns( .collect(); } - // Expand stars let source_names = get_select_source_names(select); let mut new_expressions = Vec::new(); let mut result_columns = Vec::new(); @@ -317,56 +316,6 @@ fn extract_and_expand_select_columns( result_columns } -/// Expand stars in the outer SELECT (not a CTE body). -fn expand_select_stars(select: &mut Select, resolved_ctes: &HashMap>) { - let has_star = select.expressions.iter().any(|e| matches!(e, Expression::Star(_))); - let has_qualified_star = select.expressions.iter().any(|e| { - matches!(e, Expression::Column(c) if c.name.name == "*") - }); - - if !has_star && !has_qualified_star { - return; - } - - let source_names = get_select_source_names(select); - let mut new_expressions = Vec::new(); - - for expr in &select.expressions { - match expr { - Expression::Star(star) => { - let qual = star.table.as_ref().map(|t| t.name.as_str()); - if let Some(expanded) = - expand_star_from_sources(qual, &source_names, resolved_ctes) - { - for col_name in &expanded { - new_expressions.push(make_column_expr(col_name, None)); - } - } else { - new_expressions.push(expr.clone()); - } - } - Expression::Column(c) if c.name.name == "*" => { - let qual = c.table.as_ref().map(|t| t.name.as_str()); - if let Some(expanded) = - expand_star_from_sources(qual, &source_names, resolved_ctes) - { - for col_name in &expanded { - new_expressions - .push(make_column_expr(col_name, c.table.as_ref())); - } - } else { - new_expressions.push(expr.clone()); - } - } - _ => { - new_expressions.push(expr.clone()); - } - } - } - - select.expressions = new_expressions; -} - /// Try to expand a star expression by looking up source columns from resolved CTEs. /// `qualifier`: Optional table qualifier name (for `table.*`). If None, expand all sources. fn expand_star_from_sources( @@ -3097,6 +3046,26 @@ mod tests { assert_eq!(node.name, "x"); } + #[test] + fn test_lineage_cte_qualified_star() { + // Qualified star: SELECT cte.* FROM cte + let expr = parse( + "WITH cte AS (SELECT a, b FROM t) \ + SELECT cte.* FROM cte", + ); + for col in &["a", "b"] { + let node = lineage(col, &expr, None, false).unwrap(); + assert_eq!(node.name, *col); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.len() >= 2, + "Expected at least 2 nodes for qualified star column {}, got: {:?}", + col, + all_names + ); + } + } + #[test] fn test_lineage_subquery_select_star() { // Ported from sqlglot: test_select_star From 8c84cb2fc53b514d6bf46efe8ac7df8c126c0d1b Mon Sep 17 00:00:00 2001 From: eitsupi Date: Wed, 18 Mar 2026 14:16:07 +0000 Subject: [PATCH 04/13] style: apply cargo fmt and fix clippy warning Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/polyglot-sql/src/lineage.rs | 23 ++++++++++--------- .../src/optimizer/qualify_columns.rs | 1 - 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index 3ca74ab..bab8ff1 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -254,17 +254,21 @@ fn rewrite_stars_in_select( select: &mut Select, resolved_ctes: &HashMap>, ) -> Vec { - let has_star = select.expressions.iter().any(|e| matches!(e, Expression::Star(_))); - let has_qualified_star = select.expressions.iter().any(|e| { - matches!(e, Expression::Column(c) if c.name.name == "*") - }); + let has_star = select + .expressions + .iter() + .any(|e| matches!(e, Expression::Star(_))); + let has_qualified_star = select + .expressions + .iter() + .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*")); if !has_star && !has_qualified_star { // No stars — just extract column names without rewriting return select .expressions .iter() - .filter_map(|e| get_expression_output_name(e)) + .filter_map(get_expression_output_name) .collect(); } @@ -276,8 +280,7 @@ fn rewrite_stars_in_select( match expr { Expression::Star(star) => { let qual = star.table.as_ref().map(|t| t.name.as_str()); - if let Some(expanded) = - expand_star_from_sources(qual, &source_names, resolved_ctes) + if let Some(expanded) = expand_star_from_sources(qual, &source_names, resolved_ctes) { for col_name in &expanded { new_expressions.push(make_column_expr(col_name, None)); @@ -290,12 +293,10 @@ fn rewrite_stars_in_select( } Expression::Column(c) if c.name.name == "*" => { let qual = c.table.as_ref().map(|t| t.name.as_str()); - if let Some(expanded) = - expand_star_from_sources(qual, &source_names, resolved_ctes) + if let Some(expanded) = expand_star_from_sources(qual, &source_names, resolved_ctes) { for col_name in &expanded { - new_expressions - .push(make_column_expr(col_name, c.table.as_ref())); + new_expressions.push(make_column_expr(col_name, c.table.as_ref())); result_columns.push(col_name.clone()); } } else { diff --git a/crates/polyglot-sql/src/optimizer/qualify_columns.rs b/crates/polyglot-sql/src/optimizer/qualify_columns.rs index f50160f..cf1f3fb 100644 --- a/crates/polyglot-sql/src/optimizer/qualify_columns.rs +++ b/crates/polyglot-sql/src/optimizer/qualify_columns.rs @@ -96,7 +96,6 @@ impl QualifyColumnsOptions { self.allow_partial_qualification = allow; self } - } /// Rewrite SQL AST to have fully qualified columns. From 97c3be3e979eeb2a99165bae762dd4fc41fbc775 Mon Sep 17 00:00:00 2001 From: eitsupi Date: Wed, 18 Mar 2026 14:34:41 +0000 Subject: [PATCH 05/13] feat: make expand_cte_stars public for downstream use Allow callers to use expand_cte_stars() for pre-processing SQL expressions before extracting column information, enabling column inference from compiled SQL in tools that build schemas from dbt manifests. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/polyglot-sql/src/lineage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index bab8ff1..ebecf7f 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -194,7 +194,7 @@ fn lineage_from_expression( /// Expand SELECT * in CTEs by walking CTE definitions in order and propagating /// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1)) /// which qualify_columns cannot resolve because it processes each SELECT independently. -fn expand_cte_stars(expr: &mut Expression) { +pub fn expand_cte_stars(expr: &mut Expression) { let select = match expr { Expression::Select(s) => s, _ => return, From 3a5a772b519181b6b52de5be4183ebb0ae51e531 Mon Sep 17 00:00:00 2001 From: eitsupi Date: Wed, 18 Mar 2026 15:10:59 +0000 Subject: [PATCH 06/13] feat: pass schema to expand_cte_stars for external table star expansion When CTEs reference external tables via SELECT *, the star cannot be expanded using CTE-only resolution. By passing the optional schema to expand_cte_stars, external table columns can be looked up as a fallback, enabling correct lineage tracing through patterns like: WITH orders AS (SELECT * FROM stg_orders) SELECT orders.* FROM orders This is essential for dbt projects where compiled SQL uses fully-qualified table references (e.g., "db"."schema"."table") wrapped in CTEs. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/polyglot-sql/src/lineage.rs | 239 +++++++++++++++++++++++++++-- 1 file changed, 223 insertions(+), 16 deletions(-) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index ebecf7f..b790e08 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -120,7 +120,7 @@ pub fn lineage( trim_selects: bool, ) -> Result { let mut owned = sql.clone(); - expand_cte_stars(&mut owned); + expand_cte_stars(&mut owned, None); lineage_from_expression(column, &owned, dialect, trim_selects) } @@ -163,8 +163,9 @@ pub fn lineage_with_schema( // Annotate types in-place so lineage nodes carry type information annotate_types(&mut qualified_expression, schema, dialect); - // Expand CTE stars on the already-owned expression (no extra clone) - expand_cte_stars(&mut qualified_expression); + // Expand CTE stars on the already-owned expression (no extra clone). + // Pass schema so that stars from external tables can also be resolved. + expand_cte_stars(&mut qualified_expression, schema); lineage_from_expression(column, &qualified_expression, dialect, trim_selects) } @@ -194,7 +195,11 @@ fn lineage_from_expression( /// Expand SELECT * in CTEs by walking CTE definitions in order and propagating /// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1)) /// which qualify_columns cannot resolve because it processes each SELECT independently. -pub fn expand_cte_stars(expr: &mut Expression) { +/// +/// When `schema` is provided, stars from external tables (not CTEs) are also resolved +/// by looking up column names in the schema. This enables correct expansion of patterns +/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`. +pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) { let select = match expr { Expression::Select(s) => s, _ => return, @@ -228,12 +233,12 @@ pub fn expand_cte_stars(expr: &mut Expression) { None => continue, }; - let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns); + let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema); resolved_cte_columns.insert(cte_name, columns); } // Also expand stars in the outer SELECT itself - rewrite_stars_in_select(select, &resolved_cte_columns); + rewrite_stars_in_select(select, &resolved_cte_columns, schema); } /// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT. @@ -249,10 +254,12 @@ fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> { } /// Rewrite star expressions in a SELECT using resolved CTE column lists. +/// Falls back to `schema` for external table column lookup. /// Returns the list of output column names after expansion. fn rewrite_stars_in_select( select: &mut Select, resolved_ctes: &HashMap>, + schema: Option<&dyn Schema>, ) -> Vec { let has_star = select .expressions @@ -273,6 +280,7 @@ fn rewrite_stars_in_select( } let source_names = get_select_source_names(select); + let source_fq_names = get_select_source_fq_names(select); let mut new_expressions = Vec::new(); let mut result_columns = Vec::new(); @@ -280,8 +288,13 @@ fn rewrite_stars_in_select( match expr { Expression::Star(star) => { let qual = star.table.as_ref().map(|t| t.name.as_str()); - if let Some(expanded) = expand_star_from_sources(qual, &source_names, resolved_ctes) - { + if let Some(expanded) = expand_star_from_sources( + qual, + &source_names, + resolved_ctes, + schema, + &source_fq_names, + ) { for col_name in &expanded { new_expressions.push(make_column_expr(col_name, None)); result_columns.push(col_name.clone()); @@ -293,8 +306,13 @@ fn rewrite_stars_in_select( } Expression::Column(c) if c.name.name == "*" => { let qual = c.table.as_ref().map(|t| t.name.as_str()); - if let Some(expanded) = expand_star_from_sources(qual, &source_names, resolved_ctes) - { + if let Some(expanded) = expand_star_from_sources( + qual, + &source_names, + resolved_ctes, + schema, + &source_fq_names, + ) { for col_name in &expanded { new_expressions.push(make_column_expr(col_name, c.table.as_ref())); result_columns.push(col_name.clone()); @@ -317,37 +335,49 @@ fn rewrite_stars_in_select( result_columns } -/// Try to expand a star expression by looking up source columns from resolved CTEs. +/// Try to expand a star expression by looking up source columns from resolved CTEs, +/// falling back to the schema for external tables. /// `qualifier`: Optional table qualifier name (for `table.*`). If None, expand all sources. +/// `source_fq_names`: Fully-qualified table names for schema lookup, parallel to `source_names`. fn expand_star_from_sources( qualifier: Option<&str>, source_names: &[(String, String)], resolved_ctes: &HashMap>, + schema: Option<&dyn Schema>, + source_fq_names: &[String], ) -> Option> { let mut expanded = Vec::new(); if let Some(qual) = qualifier { // Qualified star: table.* let qual_lower = qual.to_lowercase(); - // Find which source this qualifier refers to - for (alias, original) in source_names { + for (i, (alias, original)) in source_names.iter().enumerate() { if alias.to_lowercase() == qual_lower || original.to_lowercase() == qual_lower { + // Try CTE first if let Some(cols) = resolved_ctes.get(&original.to_lowercase()) { expanded.extend(cols.iter().cloned()); return Some(expanded); } + // Fall back to schema + if let Some(cols) = lookup_schema_columns(schema, source_fq_names.get(i)) { + expanded.extend(cols); + return Some(expanded); + } } } None } else { - // Unqualified star: expand all CTE sources + // Unqualified star: expand all sources let mut any_expanded = false; - for (_alias, original) in source_names { + for (i, (_alias, original)) in source_names.iter().enumerate() { if let Some(cols) = resolved_ctes.get(&original.to_lowercase()) { expanded.extend(cols.iter().cloned()); any_expanded = true; + } else if let Some(cols) = lookup_schema_columns(schema, source_fq_names.get(i)) { + expanded.extend(cols); + any_expanded = true; } else { - // Source is not a resolved CTE — can't fully expand + // Source is not a resolved CTE and not in schema — can't fully expand return None; } } @@ -359,6 +389,13 @@ fn expand_star_from_sources( } } +/// Look up column names for a table from the schema. +fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: Option<&String>) -> Option> { + let schema = schema?; + let name = fq_name?; + schema.column_names(name).ok().filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string())) +} + /// Create a Column expression with the given name and optional table qualifier. fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression { Expression::Column(Box::new(crate::expressions::Column { @@ -422,6 +459,41 @@ fn get_select_source_names(select: &Select) -> Vec<(String, String)> { names } +/// Extract fully-qualified source names from a SELECT's FROM and JOIN clauses. +/// Returns names in the same order as `get_select_source_names`. +fn get_select_source_fq_names(select: &Select) -> Vec { + let mut names = Vec::new(); + + fn extract_fq_name(expr: &Expression) -> Option { + match expr { + Expression::Table(t) => { + let mut parts = Vec::new(); + if let Some(catalog) = &t.catalog { + parts.push(catalog.name.clone()); + } + if let Some(schema) = &t.schema { + parts.push(schema.name.clone()); + } + parts.push(t.name.name.clone()); + Some(parts.join(".")) + } + Expression::Subquery(s) => s.alias.as_ref().map(|a| a.name.clone()), + Expression::Paren(p) => extract_fq_name(&p.this), + _ => None, + } + } + + if let Some(from) = &select.from { + for expr in &from.expressions { + names.push(extract_fq_name(expr).unwrap_or_default()); + } + } + for join in &select.joins { + names.push(extract_fq_name(&join.this).unwrap_or_default()); + } + names +} + /// Get all source tables from a lineage graph pub fn get_source_tables(node: &LineageNode) -> HashSet { let mut tables = HashSet::new(); @@ -3080,4 +3152,139 @@ mod tests { "Expected downstream nodes for subquery with SELECT *, got none" ); } + + #[test] + fn test_lineage_cte_star_with_schema_external_table() { + // CTE references an external table via SELECT * — schema enables expansion + let sql = r#"WITH orders AS (SELECT * FROM stg_orders) +SELECT * FROM orders"#; + let expr = parse(sql); + + let mut schema = MappingSchema::new(); + let cols = vec![ + ("order_id".to_string(), DataType::Unknown), + ("customer_id".to_string(), DataType::Unknown), + ("amount".to_string(), DataType::Unknown), + ]; + schema.add_table("stg_orders", &cols, None).unwrap(); + + let node = lineage_with_schema( + "order_id", + &expr, + Some(&schema as &dyn Schema), + None, + false, + ) + .unwrap(); + assert_eq!(node.name, "order_id"); + } + + #[test] + fn test_lineage_cte_star_with_schema_three_part_name() { + // CTE references an external table with fully-qualified 3-part name + let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders") +SELECT * FROM orders"#; + let expr = parse(sql); + + let mut schema = MappingSchema::new(); + let cols = vec![ + ("order_id".to_string(), DataType::Unknown), + ("customer_id".to_string(), DataType::Unknown), + ]; + schema.add_table("db.schema.stg_orders", &cols, None).unwrap(); + + let node = lineage_with_schema( + "customer_id", + &expr, + Some(&schema as &dyn Schema), + None, + false, + ) + .unwrap(); + assert_eq!(node.name, "customer_id"); + } + + #[test] + fn test_lineage_cte_star_with_schema_nested() { + // Nested CTEs: outer CTE references inner CTE with SELECT *, + // inner CTE references external table with SELECT * + let sql = r#"WITH + raw AS (SELECT * FROM external_table), + enriched AS (SELECT * FROM raw) + SELECT * FROM enriched"#; + let expr = parse(sql); + + let mut schema = MappingSchema::new(); + let cols = vec![ + ("id".to_string(), DataType::Unknown), + ("name".to_string(), DataType::Unknown), + ]; + schema.add_table("external_table", &cols, None).unwrap(); + + let node = lineage_with_schema( + "name", + &expr, + Some(&schema as &dyn Schema), + None, + false, + ) + .unwrap(); + assert_eq!(node.name, "name"); + } + + #[test] + fn test_lineage_cte_qualified_star_with_schema() { + // CTE uses qualified star (orders.*) from a CTE whose columns + // come from an external table via SELECT * + let sql = r#"WITH + orders AS (SELECT * FROM stg_orders), + enriched AS ( + SELECT orders.*, 'extra' AS extra + FROM orders + ) + SELECT * FROM enriched"#; + let expr = parse(sql); + + let mut schema = MappingSchema::new(); + let cols = vec![ + ("order_id".to_string(), DataType::Unknown), + ("total".to_string(), DataType::Unknown), + ]; + schema.add_table("stg_orders", &cols, None).unwrap(); + + let node = lineage_with_schema( + "order_id", + &expr, + Some(&schema as &dyn Schema), + None, + false, + ) + .unwrap(); + assert_eq!(node.name, "order_id"); + + // Also verify the extra column works + let extra = lineage_with_schema( + "extra", + &expr, + Some(&schema as &dyn Schema), + None, + false, + ) + .unwrap(); + assert_eq!(extra.name, "extra"); + } + + #[test] + fn test_lineage_cte_star_without_schema_still_works() { + // Without schema, CTE-to-CTE star expansion still works + let sql = r#"WITH + cte1 AS (SELECT id, name FROM raw_table), + cte2 AS (SELECT * FROM cte1) + SELECT * FROM cte2"#; + let expr = parse(sql); + + // No schema — should still resolve through CTE chain + let node = lineage("id", &expr, None, false).unwrap(); + assert_eq!(node.name, "id"); + } } From 64f10d1448d165ba6ed78cb1a82c0835abbefb60 Mon Sep 17 00:00:00 2001 From: eitsupi Date: Wed, 18 Mar 2026 16:29:11 +0000 Subject: [PATCH 07/13] fix: preserve table qualifiers when expanding CTE stars in lineage When expand_cte_stars expanded SELECT * expressions, the resulting column references lost their source table qualifier. This caused lineage resolution to fail for nested CTE chains (e.g., base -> with_payments (JOIN) -> final -> outer SELECT) where resolve_unqualified_column could not determine which source a column belonged to when there were multiple FROM sources. Changes: - expand_star_from_sources now returns (source_alias, column_name) pairs instead of just column names - rewrite_stars_in_select sets the table qualifier on expanded columns from unqualified stars, enabling proper lineage tracing - resolve_qualified_column now checks ancestor CTE scopes in addition to current scope's cte_sources for sibling CTE references Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/polyglot-sql/src/lineage.rs | 87 ++++++++++++++++++++++++++---- 1 file changed, 76 insertions(+), 11 deletions(-) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index b790e08..bf678b8 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -295,8 +295,9 @@ fn rewrite_stars_in_select( schema, &source_fq_names, ) { - for col_name in &expanded { - new_expressions.push(make_column_expr(col_name, None)); + for (src_alias, col_name) in &expanded { + let table_id = Identifier::new(src_alias); + new_expressions.push(make_column_expr(col_name, Some(&table_id))); result_columns.push(col_name.clone()); } } else { @@ -313,7 +314,8 @@ fn rewrite_stars_in_select( schema, &source_fq_names, ) { - for col_name in &expanded { + for (_src_alias, col_name) in &expanded { + // Keep the original table qualifier for qualified stars (table.*) new_expressions.push(make_column_expr(col_name, c.table.as_ref())); result_columns.push(col_name.clone()); } @@ -337,6 +339,7 @@ fn rewrite_stars_in_select( /// Try to expand a star expression by looking up source columns from resolved CTEs, /// falling back to the schema for external tables. +/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers. /// `qualifier`: Optional table qualifier name (for `table.*`). If None, expand all sources. /// `source_fq_names`: Fully-qualified table names for schema lookup, parallel to `source_names`. fn expand_star_from_sources( @@ -345,7 +348,7 @@ fn expand_star_from_sources( resolved_ctes: &HashMap>, schema: Option<&dyn Schema>, source_fq_names: &[String], -) -> Option> { +) -> Option> { let mut expanded = Vec::new(); if let Some(qual) = qualifier { @@ -355,12 +358,12 @@ fn expand_star_from_sources( if alias.to_lowercase() == qual_lower || original.to_lowercase() == qual_lower { // Try CTE first if let Some(cols) = resolved_ctes.get(&original.to_lowercase()) { - expanded.extend(cols.iter().cloned()); + expanded.extend(cols.iter().map(|c| (alias.clone(), c.clone()))); return Some(expanded); } // Fall back to schema if let Some(cols) = lookup_schema_columns(schema, source_fq_names.get(i)) { - expanded.extend(cols); + expanded.extend(cols.into_iter().map(|c| (alias.clone(), c))); return Some(expanded); } } @@ -369,12 +372,12 @@ fn expand_star_from_sources( } else { // Unqualified star: expand all sources let mut any_expanded = false; - for (i, (_alias, original)) in source_names.iter().enumerate() { + for (i, (alias, original)) in source_names.iter().enumerate() { if let Some(cols) = resolved_ctes.get(&original.to_lowercase()) { - expanded.extend(cols.iter().cloned()); + expanded.extend(cols.iter().map(|c| (alias.clone(), c.clone()))); any_expanded = true; } else if let Some(cols) = lookup_schema_columns(schema, source_fq_names.get(i)) { - expanded.extend(cols); + expanded.extend(cols.into_iter().map(|c| (alias.clone(), c))); any_expanded = true; } else { // Source is not a resolved CTE and not in schema — can't fully expand @@ -756,8 +759,13 @@ fn resolve_qualified_column( trim_selects: bool, all_cte_scopes: &[&Scope], ) { - // Check if table is a CTE reference (cte_sources tracks CTE names) - if scope.cte_sources.contains_key(table) { + // Check if table is a CTE reference — check both the current scope's cte_sources + // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses). + let is_cte = scope.cte_sources.contains_key(table) + || all_cte_scopes.iter().any(|s| { + matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == table) + }); + if is_cte { if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, table) { // Build ancestor CTE scopes from all_cte_scopes for the recursive call let ancestors: Vec = all_cte_scopes.iter().map(|s| (*s).clone()).collect(); @@ -3287,4 +3295,61 @@ SELECT * FROM orders"#; let node = lineage("id", &expr, None, false).unwrap(); assert_eq!(node.name, "id"); } + + #[test] + fn test_lineage_nested_cte_star_with_join_and_schema() { + // Reproduces dbt pattern: CTE chain with qualified star and JOIN + // base_orders -> with_payments (JOIN) -> final -> outer SELECT + let sql = r#"WITH +base_orders AS ( + SELECT * FROM stg_orders +), +with_payments AS ( + SELECT + base_orders.*, + p.amount + FROM base_orders + LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id +), +final_cte AS ( + SELECT * FROM with_payments +) +SELECT * FROM final_cte"#; + let expr = parse(sql); + + let mut schema = MappingSchema::new(); + let order_cols = vec![ + ("order_id".to_string(), crate::expressions::DataType::Unknown), + ("customer_id".to_string(), crate::expressions::DataType::Unknown), + ("status".to_string(), crate::expressions::DataType::Unknown), + ]; + let pay_cols = vec![ + ("payment_id".to_string(), crate::expressions::DataType::Unknown), + ("order_id".to_string(), crate::expressions::DataType::Unknown), + ("amount".to_string(), crate::expressions::DataType::Unknown), + ]; + schema.add_table("stg_orders", &order_cols, None).unwrap(); + schema.add_table("stg_payments", &pay_cols, None).unwrap(); + + // order_id should trace back to stg_orders + let node = lineage_with_schema( + "order_id", &expr, Some(&schema as &dyn Schema), None, false, + ).unwrap(); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + + // The leaf should be "stg_orders.order_id" (not just "order_id") + let has_table_qualified = all_names.iter().any(|n| n.contains('.') && n.contains("order_id")); + assert!(has_table_qualified, + "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}", all_names); + + // amount should trace back to stg_payments + let node = lineage_with_schema( + "amount", &expr, Some(&schema as &dyn Schema), None, false, + ).unwrap(); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + + let has_table_qualified = all_names.iter().any(|n| n.contains('.') && n.contains("amount")); + assert!(has_table_qualified, + "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}", all_names); + } } From 264c33e7ee2e880dd42d3cc70eb3c07ee523b443 Mon Sep 17 00:00:00 2001 From: eitsupi Date: Wed, 18 Mar 2026 17:48:45 +0000 Subject: [PATCH 08/13] fix: add recursion depth limit to lineage tracing Add a MAX_LINEAGE_DEPTH (64) counter to to_node_inner and all recursive callers (handle_set_operation, resolve_qualified_column, resolve_unqualified_column) to prevent stack overflow on circular or deeply nested CTE chains. Returns an error instead of crashing when the depth limit is exceeded. Fixes stack overflow on queries with CTE+SELECT*+JOIN+CASE patterns such as jaffle-shop's orders model. --- crates/polyglot-sql/src/lineage.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index bf678b8..5904506 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -518,6 +518,10 @@ pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet) { // Core recursive lineage builder // --------------------------------------------------------------------------- +/// Maximum recursion depth for lineage tracing to prevent stack overflow +/// on circular or deeply nested CTE chains. +const MAX_LINEAGE_DEPTH: usize = 64; + /// Recursively build a lineage node for a column in a scope. fn to_node( column: ColumnRef<'_>, @@ -537,6 +541,7 @@ fn to_node( reference_node_name, trim_selects, &[], + 0, ) } @@ -549,7 +554,13 @@ fn to_node_inner( reference_node_name: &str, trim_selects: bool, ancestor_cte_scopes: &[Scope], + depth: usize, ) -> Result { + if depth > MAX_LINEAGE_DEPTH { + return Err(Error::internal(format!( + "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'" + ))); + } let scope_expr = &scope.expression; // Build combined CTE scopes: current scope's cte_scopes + ancestors @@ -588,6 +599,7 @@ fn to_node_inner( reference_node_name, trim_selects, ancestor_cte_scopes, + depth, ); } return handle_set_operation( @@ -599,6 +611,7 @@ fn to_node_inner( reference_node_name, trim_selects, ancestor_cte_scopes, + depth, ); } @@ -654,6 +667,7 @@ fn to_node_inner( "", trim_selects, ancestor_cte_scopes, + depth + 1, ) { node.downstream.push(child); } @@ -678,6 +692,7 @@ fn to_node_inner( &column_name, trim_selects, &all_cte_scopes, + depth, ); } else { resolve_unqualified_column( @@ -688,6 +703,7 @@ fn to_node_inner( &column_name, trim_selects, &all_cte_scopes, + depth, ); } } @@ -708,6 +724,7 @@ fn handle_set_operation( reference_node_name: &str, trim_selects: bool, ancestor_cte_scopes: &[Scope], + depth: usize, ) -> Result { let scope_expr = &scope.expression; @@ -737,6 +754,7 @@ fn handle_set_operation( "", trim_selects, ancestor_cte_scopes, + depth + 1, ) { node.downstream.push(child); } @@ -758,6 +776,7 @@ fn resolve_qualified_column( parent_name: &str, trim_selects: bool, all_cte_scopes: &[&Scope], + depth: usize, ) { // Check if table is a CTE reference — check both the current scope's cte_sources // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses). @@ -778,6 +797,7 @@ fn resolve_qualified_column( parent_name, trim_selects, &ancestors, + depth + 1, ) { node.downstream.push(child); return; @@ -799,6 +819,7 @@ fn resolve_qualified_column( parent_name, trim_selects, &ancestors, + depth + 1, ) { node.downstream.push(child); return; @@ -833,6 +854,7 @@ fn resolve_unqualified_column( parent_name: &str, trim_selects: bool, all_cte_scopes: &[&Scope], + depth: usize, ) { // Try to find which source this column belongs to. // Build the source list from the actual FROM/JOIN clauses to avoid @@ -850,6 +872,7 @@ fn resolve_unqualified_column( parent_name, trim_selects, all_cte_scopes, + depth, ); return; } From 0da044c7086b5a9c88bc038a63a59bb9a20cd89c Mon Sep 17 00:00:00 2001 From: eitsupi Date: Thu, 19 Mar 2026 14:30:58 +0000 Subject: [PATCH 09/13] fix: resolve CTE aliases in lineage column tracing When a query uses `FROM my_cte AS alias`, the scope's `sources` map stores the alias name as the key, but `cte_sources` only contains the original CTE name. This caused `resolve_qualified_column` to fail the CTE check and fall through to a terminal node, stopping lineage tracing at the alias instead of tracing through the CTE body. Add `resolve_cte_alias()` to detect when a source name is an alias for a CTE by checking if the source's expression is a CTE expression, and extract the original CTE name for scope lookup. This mirrors the behavior of Python sqlglot where `scope.sources[alias]` directly returns the CTE Scope object, making alias resolution implicit. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/polyglot-sql/src/lineage.rs | 124 ++++++++++++++++++++++++++++- 1 file changed, 120 insertions(+), 4 deletions(-) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index 5904506..a09d9c3 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -778,14 +778,19 @@ fn resolve_qualified_column( all_cte_scopes: &[&Scope], depth: usize, ) { + // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`), + // resolve it to the actual CTE name so the CTE scope lookup succeeds. + let resolved_cte_name = resolve_cte_alias(scope, table); + let effective_table = resolved_cte_name.as_deref().unwrap_or(table); + // Check if table is a CTE reference — check both the current scope's cte_sources // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses). - let is_cte = scope.cte_sources.contains_key(table) + let is_cte = scope.cte_sources.contains_key(effective_table) || all_cte_scopes.iter().any(|s| { - matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == table) + matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table) }); if is_cte { - if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, table) { + if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) { // Build ancestor CTE scopes from all_cte_scopes for the recursive call let ancestors: Vec = all_cte_scopes.iter().map(|s| (*s).clone()).collect(); if let Ok(child) = to_node_inner( @@ -793,7 +798,7 @@ fn resolve_qualified_column( child_scope, dialect, parent_name, - table, + effective_table, parent_name, trim_selects, &ancestors, @@ -846,6 +851,30 @@ fn resolve_qualified_column( .push(make_table_column_node(table, col_name)); } +/// Resolve a FROM alias to the original CTE name. +/// +/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains +/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`. +/// This function checks if `name` is such an alias and returns the CTE name. +fn resolve_cte_alias(scope: &Scope, name: &str) -> Option { + // If it's already a known CTE name, no resolution needed + if scope.cte_sources.contains_key(name) { + return None; + } + // Check if the source's expression is a CTE — if so, extract the CTE name + if let Some(source_info) = scope.sources.get(name) { + if source_info.is_scope { + if let Expression::Cte(cte) = &source_info.expression { + let cte_name = &cte.alias.name; + if scope.cte_sources.contains_key(cte_name) { + return Some(cte_name.clone()); + } + } + } + } + None +} + fn resolve_unqualified_column( node: &mut LineageNode, scope: &Scope, @@ -3375,4 +3404,91 @@ SELECT * FROM final_cte"#; assert!(has_table_qualified, "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}", all_names); } + + #[test] + fn test_lineage_cte_alias_resolution() { + // FROM cte_name AS alias pattern: alias should resolve through CTE to source table + let sql = r#"WITH import_stg_items AS ( + SELECT item_id, name, status FROM stg_items +) +SELECT base.item_id, base.status +FROM import_stg_items AS base"#; + let expr = parse(sql); + + let node = lineage("item_id", &expr, None, false).unwrap(); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id" + assert!( + all_names.iter().any(|n| n == "stg_items.item_id"), + "Expected leaf 'stg_items.item_id', got: {:?}", + all_names + ); + } + + #[test] + fn test_lineage_cte_alias_with_schema_and_star() { + // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body + let sql = r#"WITH import_stg AS ( + SELECT * FROM stg_items +) +SELECT base.item_id, base.status +FROM import_stg AS base"#; + let expr = parse(sql); + + let mut schema = MappingSchema::new(); + schema + .add_table( + "stg_items", + &[ + ("item_id".to_string(), DataType::Unknown), + ("name".to_string(), DataType::Unknown), + ("status".to_string(), DataType::Unknown), + ], + None, + ) + .unwrap(); + + let node = lineage_with_schema( + "item_id", + &expr, + Some(&schema as &dyn Schema), + None, + false, + ) + .unwrap(); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.iter().any(|n| n == "stg_items.item_id"), + "Expected leaf 'stg_items.item_id', got: {:?}", + all_names + ); + } + + #[test] + fn test_lineage_cte_alias_with_join() { + // Multiple CTE aliases in a JOIN: each should resolve independently + let sql = r#"WITH + import_users AS (SELECT id, name FROM users), + import_orders AS (SELECT id, user_id, amount FROM orders) +SELECT u.name, o.amount +FROM import_users AS u +LEFT JOIN import_orders AS o ON u.id = o.user_id"#; + let expr = parse(sql); + + let node = lineage("name", &expr, None, false).unwrap(); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.iter().any(|n| n == "users.name"), + "Expected leaf 'users.name', got: {:?}", + all_names + ); + + let node = lineage("amount", &expr, None, false).unwrap(); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.iter().any(|n| n == "orders.amount"), + "Expected leaf 'orders.amount', got: {:?}", + all_names + ); + } } From 94926363e545f18d42fc781d9f2265e0640fcf50 Mon Sep 17 00:00:00 2001 From: eitsupi Date: Fri, 20 Mar 2026 13:22:44 +0000 Subject: [PATCH 10/13] fix: address review findings for CTE star expansion - Skip unnecessary clone in lineage() when no WITH clause is present - Document intentional conservative behavior for partial star expansion - Add comments for UNION left-branch column resolution (SQL standard) - Note case-insensitive CTE matching as a known limitation - Explain dual Star AST representation (Star vs Column{name:"*"}) Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/polyglot-sql/src/lineage.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index a09d9c3..fd01f11 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -119,6 +119,11 @@ pub fn lineage( dialect: Option, trim_selects: bool, ) -> Result { + // Fast path: skip clone when there are no CTEs to expand + let has_with = matches!(sql, Expression::Select(s) if s.with.is_some()); + if !has_with { + return lineage_from_expression(column, sql, dialect, trim_selects); + } let mut owned = sql.clone(); expand_cte_stars(&mut owned, None); lineage_from_expression(column, &owned, dialect, trim_selects) @@ -199,6 +204,10 @@ fn lineage_from_expression( /// When `schema` is provided, stars from external tables (not CTEs) are also resolved /// by looking up column names in the schema. This enables correct expansion of patterns /// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`. +/// +/// **Known limitation:** CTE name lookups use case-insensitive matching (`.to_lowercase()`). +/// This is correct for most SQL dialects but may be surprising for case-sensitive dialects +/// (e.g., Postgres or DuckDB with quoted identifiers). pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) { let select = match expr { Expression::Select(s) => s, @@ -242,6 +251,9 @@ pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) { } /// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT. +/// +/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT) +/// are determined by the left branch. This matches sqlglot's behavior. fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> { match expr { Expression::Select(s) => Some(s), @@ -261,6 +273,10 @@ fn rewrite_stars_in_select( resolved_ctes: &HashMap>, schema: Option<&dyn Schema>, ) -> Vec { + // The AST represents star expressions in two forms depending on syntax: + // - `SELECT *` → Expression::Star (unqualified star) + // - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star) + // Both must be checked to handle all star patterns. let has_star = select .expressions .iter() @@ -380,7 +396,10 @@ fn expand_star_from_sources( expanded.extend(cols.into_iter().map(|c| (alias.clone(), c))); any_expanded = true; } else { - // Source is not a resolved CTE and not in schema — can't fully expand + // Source is not a resolved CTE and not in schema — can't fully expand. + // Intentionally conservative: partial expansion is not attempted because + // an incomplete column list would cause downstream lineage resolution to + // produce incorrect results (missing columns silently omitted). return None; } } From a627cee779fe96678faffb9261dcc1a55b93b52d Mon Sep 17 00:00:00 2001 From: eitsupi Date: Fri, 20 Mar 2026 13:48:05 +0000 Subject: [PATCH 11/13] fix: implement SQL identifier case semantics for CTE name matching Unquoted CTE names are compared case-insensitively (lowercased), quoted names preserve their original case. This matches sqlglot's normalize_identifiers behavior. - Add normalize_cte_name() helper that respects Identifier.quoted - Introduce SourceName struct to carry normalized names through star expansion pipeline - Update expand_star_from_sources to accept Identifier qualifiers - Add tests for quoted/unquoted/mixed CTE name matching Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/polyglot-sql/src/lineage.rs | 138 +++++++++++++++++++++++------ 1 file changed, 113 insertions(+), 25 deletions(-) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index fd01f11..938b3d3 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -197,6 +197,19 @@ fn lineage_from_expression( // CTE star expansion // --------------------------------------------------------------------------- +/// Normalize an identifier for CTE name matching. +/// +/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased), +/// quoted identifiers preserve their original case. This matches sqlglot's +/// `normalize_identifiers` behavior. +fn normalize_cte_name(ident: &Identifier) -> String { + if ident.quoted { + ident.name.clone() + } else { + ident.name.to_lowercase() + } +} + /// Expand SELECT * in CTEs by walking CTE definitions in order and propagating /// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1)) /// which qualify_columns cannot resolve because it processes each SELECT independently. @@ -205,9 +218,9 @@ fn lineage_from_expression( /// by looking up column names in the schema. This enables correct expansion of patterns /// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`. /// -/// **Known limitation:** CTE name lookups use case-insensitive matching (`.to_lowercase()`). -/// This is correct for most SQL dialects but may be surprising for case-sensitive dialects -/// (e.g., Postgres or DuckDB with quoted identifiers). +/// CTE name matching follows SQL identifier semantics: unquoted names are compared +/// case-insensitively (lowercased), while quoted names preserve their original case. +/// This matches sqlglot's `normalize_identifiers` behavior. pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) { let select = match expr { Expression::Select(s) => s, @@ -227,7 +240,7 @@ pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) { let mut resolved_cte_columns: HashMap> = HashMap::new(); for cte in &mut with.ctes { - let cte_name = cte.alias.name.to_lowercase(); + let cte_name = normalize_cte_name(&cte.alias); // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that if !cte.columns.is_empty() { @@ -303,7 +316,7 @@ fn rewrite_stars_in_select( for expr in &select.expressions { match expr { Expression::Star(star) => { - let qual = star.table.as_ref().map(|t| t.name.as_str()); + let qual = star.table.as_ref(); if let Some(expanded) = expand_star_from_sources( qual, &source_names, @@ -322,7 +335,7 @@ fn rewrite_stars_in_select( } } Expression::Column(c) if c.name.name == "*" => { - let qual = c.table.as_ref().map(|t| t.name.as_str()); + let qual = c.table.as_ref(); if let Some(expanded) = expand_star_from_sources( qual, &source_names, @@ -356,11 +369,11 @@ fn rewrite_stars_in_select( /// Try to expand a star expression by looking up source columns from resolved CTEs, /// falling back to the schema for external tables. /// Returns (source_alias, column_name) pairs so the caller can set table qualifiers. -/// `qualifier`: Optional table qualifier name (for `table.*`). If None, expand all sources. +/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources. /// `source_fq_names`: Fully-qualified table names for schema lookup, parallel to `source_names`. fn expand_star_from_sources( - qualifier: Option<&str>, - source_names: &[(String, String)], + qualifier: Option<&Identifier>, + source_names: &[SourceName], resolved_ctes: &HashMap>, schema: Option<&dyn Schema>, source_fq_names: &[String], @@ -369,17 +382,17 @@ fn expand_star_from_sources( if let Some(qual) = qualifier { // Qualified star: table.* - let qual_lower = qual.to_lowercase(); - for (i, (alias, original)) in source_names.iter().enumerate() { - if alias.to_lowercase() == qual_lower || original.to_lowercase() == qual_lower { + let qual_normalized = normalize_cte_name(qual); + for (i, src) in source_names.iter().enumerate() { + if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized { // Try CTE first - if let Some(cols) = resolved_ctes.get(&original.to_lowercase()) { - expanded.extend(cols.iter().map(|c| (alias.clone(), c.clone()))); + if let Some(cols) = resolved_ctes.get(&src.normalized) { + expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone()))); return Some(expanded); } // Fall back to schema if let Some(cols) = lookup_schema_columns(schema, source_fq_names.get(i)) { - expanded.extend(cols.into_iter().map(|c| (alias.clone(), c))); + expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c))); return Some(expanded); } } @@ -388,18 +401,20 @@ fn expand_star_from_sources( } else { // Unqualified star: expand all sources let mut any_expanded = false; - for (i, (alias, original)) in source_names.iter().enumerate() { - if let Some(cols) = resolved_ctes.get(&original.to_lowercase()) { - expanded.extend(cols.iter().map(|c| (alias.clone(), c.clone()))); + for (i, src) in source_names.iter().enumerate() { + if let Some(cols) = resolved_ctes.get(&src.normalized) { + expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone()))); any_expanded = true; } else if let Some(cols) = lookup_schema_columns(schema, source_fq_names.get(i)) { - expanded.extend(cols.into_iter().map(|c| (alias.clone(), c))); + expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c))); any_expanded = true; } else { // Source is not a resolved CTE and not in schema — can't fully expand. // Intentionally conservative: partial expansion is not attempted because // an incomplete column list would cause downstream lineage resolution to // produce incorrect results (missing columns silently omitted). + // This matches sqlglot's behavior: it also requires all sources to be + // resolvable and raises SqlglotError when schema is missing. return None; } } @@ -443,23 +458,31 @@ fn get_expression_output_name(expr: &Expression) -> Option { /// Extract source names from a SELECT's FROM and JOIN clauses. /// Returns (alias_or_name, original_name) pairs. -fn get_select_source_names(select: &Select) -> Vec<(String, String)> { +/// Source name info with normalized name for CTE lookup (respects quoted vs unquoted). +struct SourceName { + alias: String, + /// The normalized name for CTE lookup: unquoted → lowercased, quoted → as-is. + normalized: String, +} + +fn get_select_source_names(select: &Select) -> Vec { let mut names = Vec::new(); - fn extract_source(expr: &Expression) -> Option<(String, String)> { + fn extract_source(expr: &Expression) -> Option { match expr { Expression::Table(t) => { - let original = t.name.name.clone(); + let normalized = normalize_cte_name(&t.name); let alias = t .alias .as_ref() .map(|a| a.name.clone()) - .unwrap_or_else(|| original.clone()); - Some((alias, original)) + .unwrap_or_else(|| t.name.name.clone()); + Some(SourceName { alias, normalized }) } Expression::Subquery(s) => { let alias = s.alias.as_ref()?.name.clone(); - Some((alias.clone(), alias)) + let normalized = alias.to_lowercase(); + Some(SourceName { alias: alias.clone(), normalized }) } Expression::Paren(p) => extract_source(&p.this), _ => None, @@ -3510,4 +3533,69 @@ LEFT JOIN import_orders AS o ON u.id = o.user_id"#; all_names ); } + + // ----------------------------------------------------------------------- + // Quoted CTE name tests — verifying SQL identifier case semantics + // ----------------------------------------------------------------------- + + #[test] + fn test_lineage_unquoted_cte_case_insensitive() { + // Unquoted CTE names are case-insensitive (both normalized to lowercase). + // MyCte and MYCTE should match. + let expr = parse( + "WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE", + ); + let node = lineage("col", &expr, None, false).unwrap(); + assert_eq!(node.name, "col"); + assert!( + !node.downstream.is_empty(), + "Unquoted CTE should resolve case-insensitively" + ); + } + + #[test] + fn test_lineage_quoted_cte_case_preserved() { + // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match. + let expr = parse( + r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#, + ); + let node = lineage("col", &expr, None, false).unwrap(); + assert_eq!(node.name, "col"); + assert!( + !node.downstream.is_empty(), + "Quoted CTE with matching case should resolve" + ); + } + + #[test] + fn test_lineage_quoted_cte_case_mismatch_no_expansion() { + // Quoted CTE "MyCte" referenced as "mycte" — case mismatch. + // sqlglot treats this as a table reference, not a CTE match. + // Star expansion should NOT resolve through the CTE. + let expr = parse( + r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#, + ); + // lineage("col", ...) should fail because "mycte" is treated as an external + // table (not matching CTE "MyCte"), and SELECT * cannot be expanded. + let result = lineage("col", &expr, None, false); + assert!( + result.is_err(), + "Quoted CTE with case mismatch should not expand star: {:?}", + result + ); + } + + #[test] + fn test_lineage_mixed_quoted_unquoted_cte() { + // Mix of unquoted and quoted CTEs in a nested chain. + let expr = parse( + r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#, + ); + let node = lineage("a", &expr, None, false).unwrap(); + assert_eq!(node.name, "a"); + assert!( + !node.downstream.is_empty(), + "Mixed quoted/unquoted CTE chain should resolve" + ); + } } From 6ee6726737b1d7b04f11344d36e742cd1b7e4b1a Mon Sep 17 00:00:00 2001 From: eitsupi Date: Fri, 20 Mar 2026 14:05:49 +0000 Subject: [PATCH 12/13] test: add known-bug tests for quoted CTE case sensitivity in scope/lineage Add two tests that assert the current buggy behavior where scope.rs add_table_to_scope uses eq_ignore_ascii_case for all identifiers including quoted ones. Per SQL semantics (and sqlglot behavior), quoted identifiers should be case-sensitive: "mycte" should NOT match CTE "MyCte". These tests document the bug and will fail when it is fixed, prompting the assertions to be updated to correct behavior. The fix requires changes across scope.rs and lineage.rs CTE resolution, which is broader than the star expansion scope. --- crates/polyglot-sql/src/lineage.rs | 63 ++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index 938b3d3..00fc187 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -3598,4 +3598,67 @@ LEFT JOIN import_orders AS o ON u.id = o.user_id"#; "Mixed quoted/unquoted CTE chain should resolve" ); } + + // ----------------------------------------------------------------------- + // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths + // ----------------------------------------------------------------------- + // + // expand_cte_stars correctly handles quoted vs unquoted CTE names via + // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope) + // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or + // direct string comparison for CTE name matching, ignoring the quoted status. + // + // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive + // and unquoted as case-insensitive. The scope system should do the same. + // + // Fixing these requires changes across scope.rs and lineage.rs CTE resolution, + // which is broader than the star expansion scope of this PR. + + #[test] + fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() { + // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for + // all identifiers including quoted ones, so quoted CTE "MyCte" referenced + // as "mycte" incorrectly resolves to the CTE. + // + // Per SQL semantics (and sqlglot behavior), quoted identifiers are + // case-sensitive: "mycte" should NOT match CTE "MyCte". + // + // This test asserts the CURRENT BUGGY behavior. When the bug is fixed, + // this test should fail — update the assertion to match correct behavior: + // child.source_name should be "" (table ref), not "MyCte" (CTE ref). + let expr = parse( + r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#, + ); + let node = lineage("col", &expr, None, false).unwrap(); + assert!(!node.downstream.is_empty()); + let child = &node.downstream[0]; + // BUG: "mycte" incorrectly resolves to CTE "MyCte" + assert_eq!( + child.source_name, "MyCte", + "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \ + If this fails, the bug may be fixed — update to assert source_name != \"MyCte\"" + ); + } + + #[test] + fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() { + // Known bug: same as above but with qualified column reference ("mycte".col). + // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for + // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte". + // + // This test asserts the CURRENT BUGGY behavior. When the bug is fixed, + // this test should fail — update to assert source_name != "MyCte". + let expr = parse( + r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#, + ); + let node = lineage("col", &expr, None, false).unwrap(); + assert!(!node.downstream.is_empty()); + let child = &node.downstream[0]; + // BUG: "mycte".col incorrectly resolves through CTE "MyCte" + assert_eq!( + child.source_name, "MyCte", + "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \ + If this fails, the bug may be fixed — update to assert source_name != \"MyCte\"" + ); + } } From a5b0009279620d038c2575628ca308409c0f2f7d Mon Sep 17 00:00:00 2001 From: eitsupi Date: Fri, 20 Mar 2026 14:07:48 +0000 Subject: [PATCH 13/13] fix: convert get_leftmost_select_mut to iterative loop and fix doc comment placement - Convert recursive get_leftmost_select_mut to iterative loop with MAX_LINEAGE_DEPTH guard to prevent stack overflow on pathologically deep set operation nesting. - Move doc comment from SourceName struct to get_select_source_names function where it belongs. --- crates/polyglot-sql/src/lineage.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index 00fc187..d61ba90 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -268,14 +268,18 @@ pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) { /// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT) /// are determined by the left branch. This matches sqlglot's behavior. fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> { - match expr { - Expression::Select(s) => Some(s), - Expression::Union(u) => get_leftmost_select_mut(&mut u.left), - Expression::Intersect(i) => get_leftmost_select_mut(&mut i.left), - Expression::Except(e) => get_leftmost_select_mut(&mut e.left), - Expression::Paren(p) => get_leftmost_select_mut(&mut p.this), - _ => None, + let mut current = expr; + for _ in 0..MAX_LINEAGE_DEPTH { + match current { + Expression::Select(s) => return Some(s), + Expression::Union(u) => current = &mut u.left, + Expression::Intersect(i) => current = &mut i.left, + Expression::Except(e) => current = &mut e.left, + Expression::Paren(p) => current = &mut p.this, + _ => return None, + } } + None } /// Rewrite star expressions in a SELECT using resolved CTE column lists. @@ -456,8 +460,6 @@ fn get_expression_output_name(expr: &Expression) -> Option { } } -/// Extract source names from a SELECT's FROM and JOIN clauses. -/// Returns (alias_or_name, original_name) pairs. /// Source name info with normalized name for CTE lookup (respects quoted vs unquoted). struct SourceName { alias: String, @@ -465,6 +467,7 @@ struct SourceName { normalized: String, } +/// Extract source names from a SELECT's FROM and JOIN clauses. fn get_select_source_names(select: &Select) -> Vec { let mut names = Vec::new();