diff --git a/crates/polyglot-sql/src/lineage.rs b/crates/polyglot-sql/src/lineage.rs index 75e2dd2..d61ba90 100644 --- a/crates/polyglot-sql/src/lineage.rs +++ b/crates/polyglot-sql/src/lineage.rs @@ -6,7 +6,7 @@ //! use crate::dialects::DialectType; -use crate::expressions::Expression; +use crate::expressions::{Expression, Identifier, Select}; use crate::optimizer::annotate_types::annotate_types; use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions}; use crate::schema::{normalize_name, Schema}; @@ -14,7 +14,7 @@ use crate::scope::{build_scope, Scope}; use crate::traversal::ExpressionWalk; use crate::{Error, Result}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; /// A node in the column lineage graph #[derive(Debug, Clone, Serialize, Deserialize)] @@ -119,7 +119,14 @@ pub fn lineage( dialect: Option, trim_selects: bool, ) -> Result { - lineage_from_expression(column, sql, dialect, trim_selects) + // Fast path: skip clone when there are no CTEs to expand + let has_with = matches!(sql, Expression::Select(s) if s.with.is_some()); + if !has_with { + return lineage_from_expression(column, sql, dialect, trim_selects); + } + let mut owned = sql.clone(); + expand_cte_stars(&mut owned, None); + lineage_from_expression(column, &owned, dialect, trim_selects) } /// Build the lineage graph for a column in a SQL query using optional schema metadata. @@ -161,6 +168,10 @@ pub fn lineage_with_schema( // Annotate types in-place so lineage nodes carry type information annotate_types(&mut qualified_expression, schema, dialect); + // Expand CTE stars on the already-owned expression (no extra clone). + // Pass schema so that stars from external tables can also be resolved. + expand_cte_stars(&mut qualified_expression, schema); + lineage_from_expression(column, &qualified_expression, dialect, trim_selects) } @@ -182,6 +193,355 @@ fn lineage_from_expression( ) } +// --------------------------------------------------------------------------- +// CTE star expansion +// --------------------------------------------------------------------------- + +/// Normalize an identifier for CTE name matching. +/// +/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased), +/// quoted identifiers preserve their original case. This matches sqlglot's +/// `normalize_identifiers` behavior. +fn normalize_cte_name(ident: &Identifier) -> String { + if ident.quoted { + ident.name.clone() + } else { + ident.name.to_lowercase() + } +} + +/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating +/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1)) +/// which qualify_columns cannot resolve because it processes each SELECT independently. +/// +/// When `schema` is provided, stars from external tables (not CTEs) are also resolved +/// by looking up column names in the schema. This enables correct expansion of patterns +/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`. +/// +/// CTE name matching follows SQL identifier semantics: unquoted names are compared +/// case-insensitively (lowercased), while quoted names preserve their original case. +/// This matches sqlglot's `normalize_identifiers` behavior. +pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) { + let select = match expr { + Expression::Select(s) => s, + _ => return, + }; + + let with = match &mut select.with { + Some(w) => w, + None => return, + }; + + // Skip recursive CTEs — column resolution is complex and not needed for dbt patterns + if with.recursive { + return; + } + + let mut resolved_cte_columns: HashMap> = HashMap::new(); + + for cte in &mut with.ctes { + let cte_name = normalize_cte_name(&cte.alias); + + // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that + if !cte.columns.is_empty() { + let cols: Vec = cte.columns.iter().map(|c| c.name.clone()).collect(); + resolved_cte_columns.insert(cte_name, cols); + continue; + } + + // Get the SELECT from the CTE body (handle UNION by taking left branch) + let body_select = match get_leftmost_select_mut(&mut cte.this) { + Some(s) => s, + None => continue, + }; + + let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema); + resolved_cte_columns.insert(cte_name, columns); + } + + // Also expand stars in the outer SELECT itself + rewrite_stars_in_select(select, &resolved_cte_columns, schema); +} + +/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT. +/// +/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT) +/// are determined by the left branch. This matches sqlglot's behavior. +fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> { + let mut current = expr; + for _ in 0..MAX_LINEAGE_DEPTH { + match current { + Expression::Select(s) => return Some(s), + Expression::Union(u) => current = &mut u.left, + Expression::Intersect(i) => current = &mut i.left, + Expression::Except(e) => current = &mut e.left, + Expression::Paren(p) => current = &mut p.this, + _ => return None, + } + } + None +} + +/// Rewrite star expressions in a SELECT using resolved CTE column lists. +/// Falls back to `schema` for external table column lookup. +/// Returns the list of output column names after expansion. +fn rewrite_stars_in_select( + select: &mut Select, + resolved_ctes: &HashMap>, + schema: Option<&dyn Schema>, +) -> Vec { + // The AST represents star expressions in two forms depending on syntax: + // - `SELECT *` → Expression::Star (unqualified star) + // - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star) + // Both must be checked to handle all star patterns. + let has_star = select + .expressions + .iter() + .any(|e| matches!(e, Expression::Star(_))); + let has_qualified_star = select + .expressions + .iter() + .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*")); + + if !has_star && !has_qualified_star { + // No stars — just extract column names without rewriting + return select + .expressions + .iter() + .filter_map(get_expression_output_name) + .collect(); + } + + let source_names = get_select_source_names(select); + let source_fq_names = get_select_source_fq_names(select); + let mut new_expressions = Vec::new(); + let mut result_columns = Vec::new(); + + for expr in &select.expressions { + match expr { + Expression::Star(star) => { + let qual = star.table.as_ref(); + if let Some(expanded) = expand_star_from_sources( + qual, + &source_names, + resolved_ctes, + schema, + &source_fq_names, + ) { + for (src_alias, col_name) in &expanded { + let table_id = Identifier::new(src_alias); + new_expressions.push(make_column_expr(col_name, Some(&table_id))); + result_columns.push(col_name.clone()); + } + } else { + new_expressions.push(expr.clone()); + result_columns.push("*".to_string()); + } + } + Expression::Column(c) if c.name.name == "*" => { + let qual = c.table.as_ref(); + if let Some(expanded) = expand_star_from_sources( + qual, + &source_names, + resolved_ctes, + schema, + &source_fq_names, + ) { + for (_src_alias, col_name) in &expanded { + // Keep the original table qualifier for qualified stars (table.*) + new_expressions.push(make_column_expr(col_name, c.table.as_ref())); + result_columns.push(col_name.clone()); + } + } else { + new_expressions.push(expr.clone()); + result_columns.push("*".to_string()); + } + } + _ => { + new_expressions.push(expr.clone()); + if let Some(name) = get_expression_output_name(expr) { + result_columns.push(name); + } + } + } + } + + select.expressions = new_expressions; + result_columns +} + +/// Try to expand a star expression by looking up source columns from resolved CTEs, +/// falling back to the schema for external tables. +/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers. +/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources. +/// `source_fq_names`: Fully-qualified table names for schema lookup, parallel to `source_names`. +fn expand_star_from_sources( + qualifier: Option<&Identifier>, + source_names: &[SourceName], + resolved_ctes: &HashMap>, + schema: Option<&dyn Schema>, + source_fq_names: &[String], +) -> Option> { + let mut expanded = Vec::new(); + + if let Some(qual) = qualifier { + // Qualified star: table.* + let qual_normalized = normalize_cte_name(qual); + for (i, src) in source_names.iter().enumerate() { + if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized { + // Try CTE first + if let Some(cols) = resolved_ctes.get(&src.normalized) { + expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone()))); + return Some(expanded); + } + // Fall back to schema + if let Some(cols) = lookup_schema_columns(schema, source_fq_names.get(i)) { + expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c))); + return Some(expanded); + } + } + } + None + } else { + // Unqualified star: expand all sources + let mut any_expanded = false; + for (i, src) in source_names.iter().enumerate() { + if let Some(cols) = resolved_ctes.get(&src.normalized) { + expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone()))); + any_expanded = true; + } else if let Some(cols) = lookup_schema_columns(schema, source_fq_names.get(i)) { + expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c))); + any_expanded = true; + } else { + // Source is not a resolved CTE and not in schema — can't fully expand. + // Intentionally conservative: partial expansion is not attempted because + // an incomplete column list would cause downstream lineage resolution to + // produce incorrect results (missing columns silently omitted). + // This matches sqlglot's behavior: it also requires all sources to be + // resolvable and raises SqlglotError when schema is missing. + return None; + } + } + if any_expanded { + Some(expanded) + } else { + None + } + } +} + +/// Look up column names for a table from the schema. +fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: Option<&String>) -> Option> { + let schema = schema?; + let name = fq_name?; + schema.column_names(name).ok().filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string())) +} + +/// Create a Column expression with the given name and optional table qualifier. +fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression { + Expression::Column(Box::new(crate::expressions::Column { + name: Identifier::new(name), + table: table.cloned(), + join_mark: false, + trailing_comments: Vec::new(), + span: None, + inferred_type: None, + })) +} + +/// Extract the output name of a SELECT expression. +fn get_expression_output_name(expr: &Expression) -> Option { + match expr { + Expression::Alias(a) => Some(a.alias.name.clone()), + Expression::Column(c) => Some(c.name.name.clone()), + Expression::Identifier(id) => Some(id.name.clone()), + Expression::Star(_) => Some("*".to_string()), + _ => None, + } +} + +/// Source name info with normalized name for CTE lookup (respects quoted vs unquoted). +struct SourceName { + alias: String, + /// The normalized name for CTE lookup: unquoted → lowercased, quoted → as-is. + normalized: String, +} + +/// Extract source names from a SELECT's FROM and JOIN clauses. +fn get_select_source_names(select: &Select) -> Vec { + let mut names = Vec::new(); + + fn extract_source(expr: &Expression) -> Option { + match expr { + Expression::Table(t) => { + let normalized = normalize_cte_name(&t.name); + let alias = t + .alias + .as_ref() + .map(|a| a.name.clone()) + .unwrap_or_else(|| t.name.name.clone()); + Some(SourceName { alias, normalized }) + } + Expression::Subquery(s) => { + let alias = s.alias.as_ref()?.name.clone(); + let normalized = alias.to_lowercase(); + Some(SourceName { alias: alias.clone(), normalized }) + } + Expression::Paren(p) => extract_source(&p.this), + _ => None, + } + } + + if let Some(from) = &select.from { + for expr in &from.expressions { + if let Some(pair) = extract_source(expr) { + names.push(pair); + } + } + } + for join in &select.joins { + if let Some(pair) = extract_source(&join.this) { + names.push(pair); + } + } + names +} + +/// Extract fully-qualified source names from a SELECT's FROM and JOIN clauses. +/// Returns names in the same order as `get_select_source_names`. +fn get_select_source_fq_names(select: &Select) -> Vec { + let mut names = Vec::new(); + + fn extract_fq_name(expr: &Expression) -> Option { + match expr { + Expression::Table(t) => { + let mut parts = Vec::new(); + if let Some(catalog) = &t.catalog { + parts.push(catalog.name.clone()); + } + if let Some(schema) = &t.schema { + parts.push(schema.name.clone()); + } + parts.push(t.name.name.clone()); + Some(parts.join(".")) + } + Expression::Subquery(s) => s.alias.as_ref().map(|a| a.name.clone()), + Expression::Paren(p) => extract_fq_name(&p.this), + _ => None, + } + } + + if let Some(from) = &select.from { + for expr in &from.expressions { + names.push(extract_fq_name(expr).unwrap_or_default()); + } + } + for join in &select.joins { + names.push(extract_fq_name(&join.this).unwrap_or_default()); + } + names +} + /// Get all source tables from a lineage graph pub fn get_source_tables(node: &LineageNode) -> HashSet { let mut tables = HashSet::new(); @@ -203,6 +563,10 @@ pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet) { // Core recursive lineage builder // --------------------------------------------------------------------------- +/// Maximum recursion depth for lineage tracing to prevent stack overflow +/// on circular or deeply nested CTE chains. +const MAX_LINEAGE_DEPTH: usize = 64; + /// Recursively build a lineage node for a column in a scope. fn to_node( column: ColumnRef<'_>, @@ -222,6 +586,7 @@ fn to_node( reference_node_name, trim_selects, &[], + 0, ) } @@ -234,7 +599,13 @@ fn to_node_inner( reference_node_name: &str, trim_selects: bool, ancestor_cte_scopes: &[Scope], + depth: usize, ) -> Result { + if depth > MAX_LINEAGE_DEPTH { + return Err(Error::internal(format!( + "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'" + ))); + } let scope_expr = &scope.expression; // Build combined CTE scopes: current scope's cte_scopes + ancestors @@ -273,6 +644,7 @@ fn to_node_inner( reference_node_name, trim_selects, ancestor_cte_scopes, + depth, ); } return handle_set_operation( @@ -284,6 +656,7 @@ fn to_node_inner( reference_node_name, trim_selects, ancestor_cte_scopes, + depth, ); } @@ -339,6 +712,7 @@ fn to_node_inner( "", trim_selects, ancestor_cte_scopes, + depth + 1, ) { node.downstream.push(child); } @@ -363,6 +737,7 @@ fn to_node_inner( &column_name, trim_selects, &all_cte_scopes, + depth, ); } else { resolve_unqualified_column( @@ -373,6 +748,7 @@ fn to_node_inner( &column_name, trim_selects, &all_cte_scopes, + depth, ); } } @@ -393,6 +769,7 @@ fn handle_set_operation( reference_node_name: &str, trim_selects: bool, ancestor_cte_scopes: &[Scope], + depth: usize, ) -> Result { let scope_expr = &scope.expression; @@ -422,6 +799,7 @@ fn handle_set_operation( "", trim_selects, ancestor_cte_scopes, + depth + 1, ) { node.downstream.push(child); } @@ -443,10 +821,21 @@ fn resolve_qualified_column( parent_name: &str, trim_selects: bool, all_cte_scopes: &[&Scope], + depth: usize, ) { - // Check if table is a CTE reference (cte_sources tracks CTE names) - if scope.cte_sources.contains_key(table) { - if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, table) { + // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`), + // resolve it to the actual CTE name so the CTE scope lookup succeeds. + let resolved_cte_name = resolve_cte_alias(scope, table); + let effective_table = resolved_cte_name.as_deref().unwrap_or(table); + + // Check if table is a CTE reference — check both the current scope's cte_sources + // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses). + let is_cte = scope.cte_sources.contains_key(effective_table) + || all_cte_scopes.iter().any(|s| { + matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table) + }); + if is_cte { + if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) { // Build ancestor CTE scopes from all_cte_scopes for the recursive call let ancestors: Vec = all_cte_scopes.iter().map(|s| (*s).clone()).collect(); if let Ok(child) = to_node_inner( @@ -454,10 +843,11 @@ fn resolve_qualified_column( child_scope, dialect, parent_name, - table, + effective_table, parent_name, trim_selects, &ancestors, + depth + 1, ) { node.downstream.push(child); return; @@ -479,6 +869,7 @@ fn resolve_qualified_column( parent_name, trim_selects, &ancestors, + depth + 1, ) { node.downstream.push(child); return; @@ -505,6 +896,30 @@ fn resolve_qualified_column( .push(make_table_column_node(table, col_name)); } +/// Resolve a FROM alias to the original CTE name. +/// +/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains +/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`. +/// This function checks if `name` is such an alias and returns the CTE name. +fn resolve_cte_alias(scope: &Scope, name: &str) -> Option { + // If it's already a known CTE name, no resolution needed + if scope.cte_sources.contains_key(name) { + return None; + } + // Check if the source's expression is a CTE — if so, extract the CTE name + if let Some(source_info) = scope.sources.get(name) { + if source_info.is_scope { + if let Expression::Cte(cte) = &source_info.expression { + let cte_name = &cte.alias.name; + if scope.cte_sources.contains_key(cte_name) { + return Some(cte_name.clone()); + } + } + } + } + None +} + fn resolve_unqualified_column( node: &mut LineageNode, scope: &Scope, @@ -513,6 +928,7 @@ fn resolve_unqualified_column( parent_name: &str, trim_selects: bool, all_cte_scopes: &[&Scope], + depth: usize, ) { // Try to find which source this column belongs to. // Build the source list from the actual FROM/JOIN clauses to avoid @@ -530,6 +946,7 @@ fn resolve_unqualified_column( parent_name, trim_selects, all_cte_scopes, + depth, ); return; } @@ -2669,4 +3086,582 @@ mod tests { names ); } + + // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) --- + + #[test] + fn test_lineage_cte_select_star() { + // Ported from sqlglot: test_lineage_source_with_star + // WITH y AS (SELECT * FROM x) SELECT a FROM y + // After star expansion: SELECT y.a AS a FROM y + let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y"); + let node = lineage("a", &expr, None, false).unwrap(); + + assert_eq!(node.name, "a"); + // Should successfully resolve column 'a' through the CTE + // (previously failed with "Cannot find column 'a' in query") + assert!( + !node.downstream.is_empty(), + "Expected downstream nodes tracing through CTE, got none" + ); + } + + #[test] + fn test_lineage_cte_select_star_renamed_column() { + // dbt standard pattern: CTE with column rename + outer SELECT * + // This is the primary use case for dbt projects (jaffle-shop etc.) + let expr = + parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed"); + let node = lineage("customer_id", &expr, None, false).unwrap(); + + assert_eq!(node.name, "customer_id"); + // Should trace customer_id → renamed CTE → source.id + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.len() >= 2, + "Expected at least 2 nodes (customer_id → source), got: {:?}", + all_names + ); + } + + #[test] + fn test_lineage_cte_select_star_multiple_columns() { + // CTE exposes multiple columns, outer SELECT * should resolve each + let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte"); + + for col in &["a", "b", "c"] { + let node = lineage(col, &expr, None, false).unwrap(); + assert_eq!(node.name, *col); + // Verify lineage resolves without error (star expanded to explicit columns) + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.len() >= 2, + "Expected at least 2 nodes for column {}, got: {:?}", + col, + all_names + ); + } + } + + #[test] + fn test_lineage_nested_cte_select_star() { + // Nested CTE star expansion: cte2 references cte1 via SELECT * + let expr = parse( + "WITH cte1 AS (SELECT a FROM t), \ + cte2 AS (SELECT * FROM cte1) \ + SELECT * FROM cte2", + ); + let node = lineage("a", &expr, None, false).unwrap(); + + assert_eq!(node.name, "a"); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.len() >= 3, + "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}", + all_names + ); + } + + #[test] + fn test_lineage_three_level_nested_cte_star() { + // Three-level nested CTE: cte3 → cte2 → cte1 → t + let expr = parse( + "WITH cte1 AS (SELECT x FROM t), \ + cte2 AS (SELECT * FROM cte1), \ + cte3 AS (SELECT * FROM cte2) \ + SELECT * FROM cte3", + ); + let node = lineage("x", &expr, None, false).unwrap(); + + assert_eq!(node.name, "x"); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.len() >= 4, + "Expected at least 4 nodes through 3-level CTE chain, got: {:?}", + all_names + ); + } + + #[test] + fn test_lineage_cte_union_star() { + // CTE with UNION body, outer SELECT * should resolve from left branch + let expr = parse( + "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \ + SELECT * FROM cte", + ); + let node = lineage("a", &expr, None, false).unwrap(); + + assert_eq!(node.name, "a"); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.len() >= 2, + "Expected at least 2 nodes for CTE union star, got: {:?}", + all_names + ); + } + + #[test] + fn test_lineage_cte_star_unknown_table() { + // When CTE references an unknown table, star expansion is skipped gracefully + // and lineage falls back to normal resolution (which may fail) + let expr = parse( + "WITH cte AS (SELECT * FROM unknown_table) \ + SELECT * FROM cte", + ); + // This should not panic — it may succeed or fail depending on resolution, + // but should not crash + let _result = lineage("x", &expr, None, false); + } + + #[test] + fn test_lineage_cte_explicit_columns() { + // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t) + let expr = parse( + "WITH cte(x, y) AS (SELECT a, b FROM t) \ + SELECT * FROM cte", + ); + let node = lineage("x", &expr, None, false).unwrap(); + assert_eq!(node.name, "x"); + } + + #[test] + fn test_lineage_cte_qualified_star() { + // Qualified star: SELECT cte.* FROM cte + let expr = parse( + "WITH cte AS (SELECT a, b FROM t) \ + SELECT cte.* FROM cte", + ); + for col in &["a", "b"] { + let node = lineage(col, &expr, None, false).unwrap(); + assert_eq!(node.name, *col); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.len() >= 2, + "Expected at least 2 nodes for qualified star column {}, got: {:?}", + col, + all_names + ); + } + } + + #[test] + fn test_lineage_subquery_select_star() { + // Ported from sqlglot: test_select_star + // SELECT x FROM (SELECT * FROM table_a) + let expr = parse("SELECT x FROM (SELECT * FROM table_a)"); + let node = lineage("x", &expr, None, false).unwrap(); + + assert_eq!(node.name, "x"); + assert!( + !node.downstream.is_empty(), + "Expected downstream nodes for subquery with SELECT *, got none" + ); + } + + #[test] + fn test_lineage_cte_star_with_schema_external_table() { + // CTE references an external table via SELECT * — schema enables expansion + let sql = r#"WITH orders AS (SELECT * FROM stg_orders) +SELECT * FROM orders"#; + let expr = parse(sql); + + let mut schema = MappingSchema::new(); + let cols = vec![ + ("order_id".to_string(), DataType::Unknown), + ("customer_id".to_string(), DataType::Unknown), + ("amount".to_string(), DataType::Unknown), + ]; + schema.add_table("stg_orders", &cols, None).unwrap(); + + let node = lineage_with_schema( + "order_id", + &expr, + Some(&schema as &dyn Schema), + None, + false, + ) + .unwrap(); + assert_eq!(node.name, "order_id"); + } + + #[test] + fn test_lineage_cte_star_with_schema_three_part_name() { + // CTE references an external table with fully-qualified 3-part name + let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders") +SELECT * FROM orders"#; + let expr = parse(sql); + + let mut schema = MappingSchema::new(); + let cols = vec![ + ("order_id".to_string(), DataType::Unknown), + ("customer_id".to_string(), DataType::Unknown), + ]; + schema.add_table("db.schema.stg_orders", &cols, None).unwrap(); + + let node = lineage_with_schema( + "customer_id", + &expr, + Some(&schema as &dyn Schema), + None, + false, + ) + .unwrap(); + assert_eq!(node.name, "customer_id"); + } + + #[test] + fn test_lineage_cte_star_with_schema_nested() { + // Nested CTEs: outer CTE references inner CTE with SELECT *, + // inner CTE references external table with SELECT * + let sql = r#"WITH + raw AS (SELECT * FROM external_table), + enriched AS (SELECT * FROM raw) + SELECT * FROM enriched"#; + let expr = parse(sql); + + let mut schema = MappingSchema::new(); + let cols = vec![ + ("id".to_string(), DataType::Unknown), + ("name".to_string(), DataType::Unknown), + ]; + schema.add_table("external_table", &cols, None).unwrap(); + + let node = lineage_with_schema( + "name", + &expr, + Some(&schema as &dyn Schema), + None, + false, + ) + .unwrap(); + assert_eq!(node.name, "name"); + } + + #[test] + fn test_lineage_cte_qualified_star_with_schema() { + // CTE uses qualified star (orders.*) from a CTE whose columns + // come from an external table via SELECT * + let sql = r#"WITH + orders AS (SELECT * FROM stg_orders), + enriched AS ( + SELECT orders.*, 'extra' AS extra + FROM orders + ) + SELECT * FROM enriched"#; + let expr = parse(sql); + + let mut schema = MappingSchema::new(); + let cols = vec![ + ("order_id".to_string(), DataType::Unknown), + ("total".to_string(), DataType::Unknown), + ]; + schema.add_table("stg_orders", &cols, None).unwrap(); + + let node = lineage_with_schema( + "order_id", + &expr, + Some(&schema as &dyn Schema), + None, + false, + ) + .unwrap(); + assert_eq!(node.name, "order_id"); + + // Also verify the extra column works + let extra = lineage_with_schema( + "extra", + &expr, + Some(&schema as &dyn Schema), + None, + false, + ) + .unwrap(); + assert_eq!(extra.name, "extra"); + } + + #[test] + fn test_lineage_cte_star_without_schema_still_works() { + // Without schema, CTE-to-CTE star expansion still works + let sql = r#"WITH + cte1 AS (SELECT id, name FROM raw_table), + cte2 AS (SELECT * FROM cte1) + SELECT * FROM cte2"#; + let expr = parse(sql); + + // No schema — should still resolve through CTE chain + let node = lineage("id", &expr, None, false).unwrap(); + assert_eq!(node.name, "id"); + } + + #[test] + fn test_lineage_nested_cte_star_with_join_and_schema() { + // Reproduces dbt pattern: CTE chain with qualified star and JOIN + // base_orders -> with_payments (JOIN) -> final -> outer SELECT + let sql = r#"WITH +base_orders AS ( + SELECT * FROM stg_orders +), +with_payments AS ( + SELECT + base_orders.*, + p.amount + FROM base_orders + LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id +), +final_cte AS ( + SELECT * FROM with_payments +) +SELECT * FROM final_cte"#; + let expr = parse(sql); + + let mut schema = MappingSchema::new(); + let order_cols = vec![ + ("order_id".to_string(), crate::expressions::DataType::Unknown), + ("customer_id".to_string(), crate::expressions::DataType::Unknown), + ("status".to_string(), crate::expressions::DataType::Unknown), + ]; + let pay_cols = vec![ + ("payment_id".to_string(), crate::expressions::DataType::Unknown), + ("order_id".to_string(), crate::expressions::DataType::Unknown), + ("amount".to_string(), crate::expressions::DataType::Unknown), + ]; + schema.add_table("stg_orders", &order_cols, None).unwrap(); + schema.add_table("stg_payments", &pay_cols, None).unwrap(); + + // order_id should trace back to stg_orders + let node = lineage_with_schema( + "order_id", &expr, Some(&schema as &dyn Schema), None, false, + ).unwrap(); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + + // The leaf should be "stg_orders.order_id" (not just "order_id") + let has_table_qualified = all_names.iter().any(|n| n.contains('.') && n.contains("order_id")); + assert!(has_table_qualified, + "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}", all_names); + + // amount should trace back to stg_payments + let node = lineage_with_schema( + "amount", &expr, Some(&schema as &dyn Schema), None, false, + ).unwrap(); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + + let has_table_qualified = all_names.iter().any(|n| n.contains('.') && n.contains("amount")); + assert!(has_table_qualified, + "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}", all_names); + } + + #[test] + fn test_lineage_cte_alias_resolution() { + // FROM cte_name AS alias pattern: alias should resolve through CTE to source table + let sql = r#"WITH import_stg_items AS ( + SELECT item_id, name, status FROM stg_items +) +SELECT base.item_id, base.status +FROM import_stg_items AS base"#; + let expr = parse(sql); + + let node = lineage("item_id", &expr, None, false).unwrap(); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id" + assert!( + all_names.iter().any(|n| n == "stg_items.item_id"), + "Expected leaf 'stg_items.item_id', got: {:?}", + all_names + ); + } + + #[test] + fn test_lineage_cte_alias_with_schema_and_star() { + // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body + let sql = r#"WITH import_stg AS ( + SELECT * FROM stg_items +) +SELECT base.item_id, base.status +FROM import_stg AS base"#; + let expr = parse(sql); + + let mut schema = MappingSchema::new(); + schema + .add_table( + "stg_items", + &[ + ("item_id".to_string(), DataType::Unknown), + ("name".to_string(), DataType::Unknown), + ("status".to_string(), DataType::Unknown), + ], + None, + ) + .unwrap(); + + let node = lineage_with_schema( + "item_id", + &expr, + Some(&schema as &dyn Schema), + None, + false, + ) + .unwrap(); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.iter().any(|n| n == "stg_items.item_id"), + "Expected leaf 'stg_items.item_id', got: {:?}", + all_names + ); + } + + #[test] + fn test_lineage_cte_alias_with_join() { + // Multiple CTE aliases in a JOIN: each should resolve independently + let sql = r#"WITH + import_users AS (SELECT id, name FROM users), + import_orders AS (SELECT id, user_id, amount FROM orders) +SELECT u.name, o.amount +FROM import_users AS u +LEFT JOIN import_orders AS o ON u.id = o.user_id"#; + let expr = parse(sql); + + let node = lineage("name", &expr, None, false).unwrap(); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.iter().any(|n| n == "users.name"), + "Expected leaf 'users.name', got: {:?}", + all_names + ); + + let node = lineage("amount", &expr, None, false).unwrap(); + let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect(); + assert!( + all_names.iter().any(|n| n == "orders.amount"), + "Expected leaf 'orders.amount', got: {:?}", + all_names + ); + } + + // ----------------------------------------------------------------------- + // Quoted CTE name tests — verifying SQL identifier case semantics + // ----------------------------------------------------------------------- + + #[test] + fn test_lineage_unquoted_cte_case_insensitive() { + // Unquoted CTE names are case-insensitive (both normalized to lowercase). + // MyCte and MYCTE should match. + let expr = parse( + "WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE", + ); + let node = lineage("col", &expr, None, false).unwrap(); + assert_eq!(node.name, "col"); + assert!( + !node.downstream.is_empty(), + "Unquoted CTE should resolve case-insensitively" + ); + } + + #[test] + fn test_lineage_quoted_cte_case_preserved() { + // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match. + let expr = parse( + r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#, + ); + let node = lineage("col", &expr, None, false).unwrap(); + assert_eq!(node.name, "col"); + assert!( + !node.downstream.is_empty(), + "Quoted CTE with matching case should resolve" + ); + } + + #[test] + fn test_lineage_quoted_cte_case_mismatch_no_expansion() { + // Quoted CTE "MyCte" referenced as "mycte" — case mismatch. + // sqlglot treats this as a table reference, not a CTE match. + // Star expansion should NOT resolve through the CTE. + let expr = parse( + r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#, + ); + // lineage("col", ...) should fail because "mycte" is treated as an external + // table (not matching CTE "MyCte"), and SELECT * cannot be expanded. + let result = lineage("col", &expr, None, false); + assert!( + result.is_err(), + "Quoted CTE with case mismatch should not expand star: {:?}", + result + ); + } + + #[test] + fn test_lineage_mixed_quoted_unquoted_cte() { + // Mix of unquoted and quoted CTEs in a nested chain. + let expr = parse( + r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#, + ); + let node = lineage("a", &expr, None, false).unwrap(); + assert_eq!(node.name, "a"); + assert!( + !node.downstream.is_empty(), + "Mixed quoted/unquoted CTE chain should resolve" + ); + } + + // ----------------------------------------------------------------------- + // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths + // ----------------------------------------------------------------------- + // + // expand_cte_stars correctly handles quoted vs unquoted CTE names via + // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope) + // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or + // direct string comparison for CTE name matching, ignoring the quoted status. + // + // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive + // and unquoted as case-insensitive. The scope system should do the same. + // + // Fixing these requires changes across scope.rs and lineage.rs CTE resolution, + // which is broader than the star expansion scope of this PR. + + #[test] + fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() { + // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for + // all identifiers including quoted ones, so quoted CTE "MyCte" referenced + // as "mycte" incorrectly resolves to the CTE. + // + // Per SQL semantics (and sqlglot behavior), quoted identifiers are + // case-sensitive: "mycte" should NOT match CTE "MyCte". + // + // This test asserts the CURRENT BUGGY behavior. When the bug is fixed, + // this test should fail — update the assertion to match correct behavior: + // child.source_name should be "" (table ref), not "MyCte" (CTE ref). + let expr = parse( + r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#, + ); + let node = lineage("col", &expr, None, false).unwrap(); + assert!(!node.downstream.is_empty()); + let child = &node.downstream[0]; + // BUG: "mycte" incorrectly resolves to CTE "MyCte" + assert_eq!( + child.source_name, "MyCte", + "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \ + If this fails, the bug may be fixed — update to assert source_name != \"MyCte\"" + ); + } + + #[test] + fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() { + // Known bug: same as above but with qualified column reference ("mycte".col). + // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for + // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte". + // + // This test asserts the CURRENT BUGGY behavior. When the bug is fixed, + // this test should fail — update to assert source_name != "MyCte". + let expr = parse( + r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#, + ); + let node = lineage("col", &expr, None, false).unwrap(); + assert!(!node.downstream.is_empty()); + let child = &node.downstream[0]; + // BUG: "mycte".col incorrectly resolves through CTE "MyCte" + assert_eq!( + child.source_name, "MyCte", + "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \ + If this fails, the bug may be fixed — update to assert source_name != \"MyCte\"" + ); + } }