From f57eb219379667172345f2d5105f72d11b6f024a Mon Sep 17 00:00:00 2001 From: JohnCari Date: Fri, 2 Jan 2026 21:02:43 -0400 Subject: [PATCH 01/11] feat: add aggregate pushdown support (GetForeignUpperPaths) Add support for pushing aggregate operations (COUNT, SUM, AVG, MIN, MAX) to foreign data sources via the GetForeignUpperPaths callback. New types: - AggregateKind enum: Represents supported aggregate functions - Aggregate struct: Contains aggregate operation details New ForeignDataWrapper trait methods: - supported_aggregates(): Declare which aggregates the FDW can push down - supports_group_by(): Declare GROUP BY pushdown support - get_aggregate_rel_size(): Cost estimation for aggregate queries - begin_aggregate_scan(): Initialize aggregate query execution New module: - upper.rs: Implements GetForeignUpperPaths callback for aggregate pushdown All new methods have default implementations for backward compatibility. --- supabase-wrappers/src/interface.rs | 338 +++++++++++++++++++++++++- supabase-wrappers/src/lib.rs | 1 + supabase-wrappers/src/scan.rs | 14 +- supabase-wrappers/src/upper.rs | 369 +++++++++++++++++++++++++++++ 4 files changed, 714 insertions(+), 8 deletions(-) create mode 100644 supabase-wrappers/src/upper.rs diff --git a/supabase-wrappers/src/interface.rs b/supabase-wrappers/src/interface.rs index bcbc361d6..a4a7448bf 100644 --- a/supabase-wrappers/src/interface.rs +++ b/supabase-wrappers/src/interface.rs @@ -678,6 +678,171 @@ impl Limit { } } +/// Represents the type of aggregate function for pushdown +/// +/// This enum is used to declare which aggregate functions an FDW supports +/// for remote execution via [`ForeignDataWrapper::supported_aggregates`]. +/// +/// ## Examples +/// +/// ```rust,no_run +/// use supabase_wrappers::prelude::*; +/// +/// fn get_supported() -> Vec { +/// vec![ +/// AggregateKind::Count, +/// AggregateKind::Sum, +/// AggregateKind::Avg, +/// ] +/// } +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AggregateKind { + /// COUNT(*) - count all rows + Count, + /// COUNT(column) - count non-null values in a column + CountColumn, + /// SUM(column) - sum of values + Sum, + /// AVG(column) - average of values + Avg, + /// MIN(column) - minimum value + Min, + /// MAX(column) - maximum value + Max, +} + +impl AggregateKind { + /// Returns the SQL function name for this aggregate kind + /// + /// ## Examples + /// + /// ```rust,no_run + /// use supabase_wrappers::prelude::*; + /// + /// assert_eq!(AggregateKind::Count.sql_name(), "COUNT"); + /// assert_eq!(AggregateKind::Sum.sql_name(), "SUM"); + /// ``` + pub fn sql_name(&self) -> &'static str { + match self { + AggregateKind::Count => "COUNT", + AggregateKind::CountColumn => "COUNT", + AggregateKind::Sum => "SUM", + AggregateKind::Avg => "AVG", + AggregateKind::Min => "MIN", + AggregateKind::Max => "MAX", + } + } +} + +/// Represents a single aggregate operation in a pushed-down query +/// +/// This struct contains all information needed for an FDW to execute an +/// aggregate function remotely. +/// +/// ## Examples +/// +/// ```rust,no_run +/// use supabase_wrappers::prelude::*; +/// +/// // COUNT(*) aggregate +/// let count_all = Aggregate { +/// kind: AggregateKind::Count, +/// column: None, +/// distinct: false, +/// alias: "count".to_string(), +/// }; +/// +/// // SUM(amount) aggregate +/// let sum_amount = Aggregate { +/// kind: AggregateKind::Sum, +/// column: Some(Column { +/// name: "amount".to_string(), +/// num: 1, +/// type_oid: pgrx::pg_sys::FLOAT8OID, +/// }), +/// distinct: false, +/// alias: "total_amount".to_string(), +/// }; +/// ``` +#[derive(Debug, Clone)] +pub struct Aggregate { + /// Type of aggregate function + pub kind: AggregateKind, + + /// Column being aggregated (None for COUNT(*)) + pub column: Option, + + /// Whether DISTINCT modifier is applied (e.g., COUNT(DISTINCT col)) + pub distinct: bool, + + /// Output column name/alias for the aggregate result + pub alias: String, +} + +impl Aggregate { + /// Deparses the aggregate into SQL syntax + /// + /// ## Examples + /// + /// ```rust,no_run + /// use supabase_wrappers::prelude::*; + /// + /// let count_all = Aggregate { + /// kind: AggregateKind::Count, + /// column: None, + /// distinct: false, + /// alias: "cnt".to_string(), + /// }; + /// assert_eq!(count_all.deparse(), "COUNT(*)"); + /// + /// let sum_col = Aggregate { + /// kind: AggregateKind::Sum, + /// column: Some(Column { name: "price".to_string(), num: 1, type_oid: 0 }), + /// distinct: false, + /// alias: "total".to_string(), + /// }; + /// assert_eq!(sum_col.deparse(), "SUM(price)"); + /// ``` + pub fn deparse(&self) -> String { + let func_name = self.kind.sql_name(); + match self.kind { + AggregateKind::Count => format!("{func_name}(*)"), + _ => { + let col_name = self + .column + .as_ref() + .map(|c| c.name.as_str()) + .unwrap_or("*"); + if self.distinct { + format!("{func_name}(DISTINCT {col_name})") + } else { + format!("{func_name}({col_name})") + } + } + } + } + + /// Deparses the aggregate with its alias for use in SELECT + /// + /// ## Examples + /// + /// ```rust,no_run + /// use supabase_wrappers::prelude::*; + /// + /// let count_all = Aggregate { + /// kind: AggregateKind::Count, + /// column: None, + /// distinct: false, + /// alias: "cnt".to_string(), + /// }; + /// assert_eq!(count_all.deparse_with_alias(), "COUNT(*) AS cnt"); + /// ``` + pub fn deparse_with_alias(&self) -> String { + format!("{} AS {}", self.deparse(), self.alias) + } +} + /// The Foreign Data Wrapper trait /// /// This is the main interface for your foreign data wrapper. Required functions @@ -827,6 +992,173 @@ pub trait ForeignDataWrapper> { Ok(()) } + // ========================================================================= + // Aggregate Pushdown Methods + // ========================================================================= + + /// Returns the list of aggregate functions this FDW can push down to the + /// remote data source. + /// + /// Default implementation returns an empty vector (no aggregate pushdown). + /// Override this method to enable aggregate pushdown for your FDW. + /// + /// When this method returns a non-empty vector, the framework will: + /// 1. Check if the query's aggregates are all supported + /// 2. Call [`get_aggregate_rel_size`](Self::get_aggregate_rel_size) for cost estimation + /// 3. Create an aggregate path in the query planner + /// 4. Call [`begin_aggregate_scan`](Self::begin_aggregate_scan) if the aggregate path is chosen + /// + /// ## Examples + /// + /// ```rust,no_run + /// use supabase_wrappers::prelude::*; + /// + /// fn supported_aggregates(&self) -> Vec { + /// vec![ + /// AggregateKind::Count, + /// AggregateKind::CountColumn, + /// AggregateKind::Sum, + /// AggregateKind::Avg, + /// AggregateKind::Min, + /// AggregateKind::Max, + /// ] + /// } + /// ``` + fn supported_aggregates(&self) -> Vec { + vec![] + } + + /// Returns whether this FDW supports GROUP BY pushdown alongside aggregates. + /// + /// Only relevant if [`supported_aggregates`](Self::supported_aggregates) returns a non-empty vector. + /// Default implementation returns `false`. + /// + /// When `true`, GROUP BY columns will be passed to [`begin_aggregate_scan`](Self::begin_aggregate_scan). + /// + /// ## Examples + /// + /// ```rust,no_run + /// fn supports_group_by(&self) -> bool { + /// true + /// } + /// ``` + fn supports_group_by(&self) -> bool { + false + } + + /// Estimate the size of aggregate query results for query planning. + /// + /// Called during query planning when aggregate pushdown is being considered. + /// Returns `(estimated_rows, mean_row_width_bytes)`. + /// + /// Default implementation estimates: + /// - 1 row for ungrouped aggregates (e.g., `SELECT COUNT(*) FROM t`) + /// - 100 rows for grouped aggregates (e.g., `SELECT dept, COUNT(*) FROM t GROUP BY dept`) + /// + /// Override this method if you can provide better estimates based on your + /// knowledge of the remote data source. + /// + /// ## Parameters + /// + /// - `aggregates`: List of aggregate operations to be computed + /// - `group_by`: Columns in GROUP BY clause (empty if none) + /// - `quals`: WHERE clause conditions + /// - `options`: Foreign table options + /// + /// ## Examples + /// + /// ```rust,no_run + /// fn get_aggregate_rel_size( + /// &mut self, + /// aggregates: &[Aggregate], + /// group_by: &[Column], + /// quals: &[Qual], + /// options: &HashMap, + /// ) -> Result<(i64, i32), MyFdwError> { + /// let rows = if group_by.is_empty() { 1 } else { 50 }; + /// let width = 8 * aggregates.len() as i32; + /// Ok((rows, width)) + /// } + /// ``` + fn get_aggregate_rel_size( + &mut self, + aggregates: &[Aggregate], + group_by: &[Column], + _quals: &[Qual], + _options: &HashMap, + ) -> Result<(i64, i32), E> { + // Default: 1 row if no GROUP BY, otherwise estimate 100 rows + let rows = if group_by.is_empty() { 1 } else { 100 }; + let width = 8 * aggregates.len() as i32; + Ok((rows, width)) + } + + /// Called when beginning execution of a pushed-down aggregate query. + /// + /// This method is called instead of [`begin_scan`](Self::begin_scan) when + /// the query planner chooses the aggregate pushdown path. The FDW should + /// prepare to return aggregate results based on the provided parameters. + /// + /// After this method returns successfully, [`iter_scan`](Self::iter_scan) + /// will be called to retrieve the aggregate results. + /// + /// ## Parameters + /// + /// - `aggregates`: List of aggregate operations to compute + /// - `group_by`: Columns to group by (empty for ungrouped aggregates) + /// - `quals`: WHERE clause conditions to apply before aggregation + /// - `options`: Foreign table options + /// + /// ## Result Row Format + /// + /// The rows returned by [`iter_scan`](Self::iter_scan) should contain: + /// 1. GROUP BY column values (in order specified) + /// 2. Aggregate results (in order specified, using `alias` as column name) + /// + /// ## Examples + /// + /// ```rust,no_run + /// fn begin_aggregate_scan( + /// &mut self, + /// aggregates: &[Aggregate], + /// group_by: &[Column], + /// quals: &[Qual], + /// options: &HashMap, + /// ) -> Result<(), MyFdwError> { + /// // Build remote query with aggregates + /// let mut select_items = Vec::new(); + /// + /// // Add GROUP BY columns + /// for col in group_by { + /// select_items.push(col.name.clone()); + /// } + /// + /// // Add aggregate expressions + /// for agg in aggregates { + /// select_items.push(agg.deparse_with_alias()); + /// } + /// + /// let query = format!("SELECT {} FROM ...", select_items.join(", ")); + /// + /// // Execute and store results for iter_scan + /// self.results = self.execute_query(&query)?; + /// Ok(()) + /// } + /// ``` + fn begin_aggregate_scan( + &mut self, + _aggregates: &[Aggregate], + _group_by: &[Column], + _quals: &[Qual], + _options: &HashMap, + ) -> Result<(), E> { + // Default: This should not be called if supported_aggregates() returns empty. + // If called, it means the FDW declared aggregate support but didn't implement + // this method - which is a programming error. We'll just proceed with an + // empty result set by returning Ok. + Ok(()) + } + /// Obtain a list of foreign table creation commands /// /// Return a list of string, each of which must contain a CREATE FOREIGN TABLE @@ -848,7 +1180,7 @@ pub trait ForeignDataWrapper> { Self: Sized, { unsafe { - use crate::{import_foreign_schema, modify, scan}; + use crate::{import_foreign_schema, modify, scan, upper}; let mut fdw_routine = FdwRoutine::::alloc_node(pg_sys::NodeTag::T_FdwRoutine); @@ -862,6 +1194,10 @@ pub trait ForeignDataWrapper> { fdw_routine.GetForeignPlan = Some(scan::get_foreign_plan::); fdw_routine.ExplainForeignScan = Some(scan::explain_foreign_scan::); + // upper path planning (aggregate pushdown) + fdw_routine.GetForeignUpperPaths = + Some(upper::get_foreign_upper_paths::); + // scan phase fdw_routine.BeginForeignScan = Some(scan::begin_foreign_scan::); fdw_routine.IterateForeignScan = Some(scan::iterate_foreign_scan::); diff --git a/supabase-wrappers/src/lib.rs b/supabase-wrappers/src/lib.rs index 32435a0f6..101aa9eb8 100644 --- a/supabase-wrappers/src/lib.rs +++ b/supabase-wrappers/src/lib.rs @@ -319,6 +319,7 @@ mod polyfill; mod qual; mod scan; mod sort; +mod upper; /// PgBox'ed `FdwRoutine`, used in [`fdw_routine`](interface::ForeignDataWrapper::fdw_routine) pub type FdwRoutine = PgBox; diff --git a/supabase-wrappers/src/scan.rs b/supabase-wrappers/src/scan.rs index 3e274b84d..8bbae4284 100644 --- a/supabase-wrappers/src/scan.rs +++ b/supabase-wrappers/src/scan.rs @@ -25,24 +25,24 @@ use crate::sort::*; use crate::utils::{self, report_error, ReportableError, SerdeList}; // Fdw private state for scan -struct FdwState, W: ForeignDataWrapper> { +pub(crate) struct FdwState, W: ForeignDataWrapper> { // foreign data wrapper instance - instance: Option, + pub(crate) instance: Option, // query conditions - quals: Vec, + pub(crate) quals: Vec, // query target column list - tgts: Vec, + pub(crate) tgts: Vec, // sort list - sorts: Vec, + pub(crate) sorts: Vec, // limit - limit: Option, + pub(crate) limit: Option, // foreign table options - opts: HashMap, + pub(crate) opts: HashMap, // temporary memory context per foreign table, created under Wrappers root // memory context diff --git a/supabase-wrappers/src/upper.rs b/supabase-wrappers/src/upper.rs new file mode 100644 index 000000000..035a7ddc9 --- /dev/null +++ b/supabase-wrappers/src/upper.rs @@ -0,0 +1,369 @@ +//! Upper path planning for aggregate pushdown +//! +//! This module implements the GetForeignUpperPaths callback which enables +//! aggregate pushdown to foreign data sources. + +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::{debug2, pg_guard, pg_sys, PgBox}; +use std::ptr; + +use crate::interface::{Aggregate, AggregateKind, Column}; +use crate::prelude::ForeignDataWrapper; +use crate::scan::FdwState; + +/// Check if a given PostgreSQL aggregate OID is supported by the FDW +fn oid_to_aggregate_kind(aggfnoid: pg_sys::Oid) -> Option { + // PostgreSQL built-in aggregate function OIDs + // These are from pg_proc.dat in PostgreSQL source + // COUNT(*) = 2803, COUNT(any) = 2147 + // SUM(int8) = 2107, SUM(int4) = 2108, SUM(float8) = 2111, etc. + // AVG(int8) = 2100, AVG(float8) = 2105, etc. + // MIN(any) = 2145, MAX(any) = 2146 + + unsafe { + // Get the aggregate function name from the OID + let agg_name = pg_sys::get_func_name(aggfnoid); + if agg_name.is_null() { + return None; + } + + let name_cstr = std::ffi::CStr::from_ptr(agg_name); + let name = name_cstr.to_str().ok()?; + + match name { + "count" => { + // Determine if it's COUNT(*) or COUNT(column) + // COUNT(*) has no arguments, COUNT(column) has one + let nargs = pg_sys::get_func_nargs(aggfnoid); + if nargs == 0 { + Some(AggregateKind::Count) + } else { + Some(AggregateKind::CountColumn) + } + } + "sum" => Some(AggregateKind::Sum), + "avg" => Some(AggregateKind::Avg), + "min" => Some(AggregateKind::Min), + "max" => Some(AggregateKind::Max), + _ => None, + } + } +} + +/// Extract aggregate information from the query +unsafe fn extract_aggregates( + root: *mut pg_sys::PlannerInfo, + output_rel: *mut pg_sys::RelOptInfo, + extra: *mut std::ffi::c_void, +) -> Option> { + // The extra parameter for UPPERREL_GROUP_AGG is GroupPathExtraData + if extra.is_null() { + return None; + } + + let group_extra = extra as *mut pg_sys::GroupPathExtraData; + if (*group_extra).havingQual != ptr::null_mut() { + // HAVING clause not supported for pushdown + return None; + } + + // Get the target list from the output relation + let reltarget = (*output_rel).reltarget; + if reltarget.is_null() { + return None; + } + + let mut aggregates = Vec::new(); + let mut resno = 1; + + // Iterate through the target expressions + let exprs = (*reltarget).exprs; + if exprs.is_null() { + return None; + } + + let len = (*exprs).length; + let mut cell = (*exprs).head.elements; + + for _ in 0..len { + if cell.is_null() { + break; + } + + let expr = (*cell).ptr_value as *mut pg_sys::Node; + cell = cell.offset(1); + + // Check if this is an Aggref (aggregate reference) + if (*expr).type_ == pg_sys::NodeTag::T_Aggref { + let aggref = expr as *mut pg_sys::Aggref; + + let kind = match oid_to_aggregate_kind((*aggref).aggfnoid) { + Some(k) => k, + None => return None, // Unsupported aggregate, abort pushdown + }; + + // Check for DISTINCT - only supported for COUNT + if (*aggref).aggdistinct != ptr::null_mut() { + match kind { + AggregateKind::CountColumn => { + // COUNT(DISTINCT col) is supported + } + _ => return None, // DISTINCT not supported for other aggregates + } + } + + // Get the column being aggregated (if any) + let column = if (*aggref).args != ptr::null_mut() && (*(*aggref).args).length > 0 { + let arg_cell = (*(*aggref).args).head.elements; + let target_entry = (*arg_cell).ptr_value as *mut pg_sys::TargetEntry; + let arg_expr = (*target_entry).expr as *mut pg_sys::Node; + + if (*arg_expr).type_ == pg_sys::NodeTag::T_Var { + let var = arg_expr as *mut pg_sys::Var; + // Get column name from the var + let relid = (*var).varno as pg_sys::Index; + let attno = (*var).varattno; + + // Try to get the column name + let rte = pg_sys::planner_rt_fetch(relid, root); + if !rte.is_null() { + let rel_oid = (*rte).relid; + let att_name = pg_sys::get_attname(rel_oid, attno, false); + if !att_name.is_null() { + let name_cstr = std::ffi::CStr::from_ptr(att_name); + if let Ok(name) = name_cstr.to_str() { + Some(Column { + name: name.to_string(), + num: attno as usize, + type_oid: (*var).vartype, + }) + } else { + None + } + } else { + None + } + } else { + None + } + } else { + // Complex expression, not a simple column reference + None + } + } else { + None + }; + + let aggregate = Aggregate { + kind, + column, + distinct: (*aggref).aggdistinct != ptr::null_mut(), + alias: format!("agg_{}", resno), + }; + + aggregates.push(aggregate); + } + + resno += 1; + } + + if aggregates.is_empty() { + return None; + } + + Some(aggregates) +} + +/// Extract GROUP BY columns from the query +unsafe fn extract_group_by_columns( + root: *mut pg_sys::PlannerInfo, + input_rel: *mut pg_sys::RelOptInfo, +) -> Vec { + let mut group_by = Vec::new(); + + let parse = (*root).parse; + if parse.is_null() { + return group_by; + } + + let group_clause = (*parse).groupClause; + if group_clause.is_null() || (*group_clause).length == 0 { + return group_by; + } + + // Get the target list + let target_list = (*parse).targetList; + if target_list.is_null() { + return group_by; + } + + // Iterate through group clause items + let len = (*group_clause).length; + let mut cell = (*group_clause).head.elements; + + for _ in 0..len { + if cell.is_null() { + break; + } + + let sort_group_clause = (*cell).ptr_value as *mut pg_sys::SortGroupClause; + cell = cell.offset(1); + + // Find the target entry for this group clause + let tle_resno = (*sort_group_clause).tleSortGroupRef; + + // Search target list for matching resno + let tl_len = (*target_list).length; + let mut tl_cell = (*target_list).head.elements; + + for _ in 0..tl_len { + if tl_cell.is_null() { + break; + } + + let tle = (*tl_cell).ptr_value as *mut pg_sys::TargetEntry; + tl_cell = tl_cell.offset(1); + + if (*tle).ressortgroupref == tle_resno { + let expr = (*tle).expr as *mut pg_sys::Node; + + if (*expr).type_ == pg_sys::NodeTag::T_Var { + let var = expr as *mut pg_sys::Var; + let relid = (*var).varno as pg_sys::Index; + let attno = (*var).varattno; + + let rte = pg_sys::planner_rt_fetch(relid, root); + if !rte.is_null() { + let rel_oid = (*rte).relid; + let att_name = pg_sys::get_attname(rel_oid, attno, false); + if !att_name.is_null() { + let name_cstr = std::ffi::CStr::from_ptr(att_name); + if let Ok(name) = name_cstr.to_str() { + group_by.push(Column { + name: name.to_string(), + num: attno as usize, + type_oid: (*var).vartype, + }); + } + } + } + } + break; + } + } + } + + group_by +} + +/// GetForeignUpperPaths callback +/// +/// This callback is called by the PostgreSQL planner to create paths for +/// upper-level processing (aggregation, sorting, etc.) that can be pushed +/// down to the foreign server. +#[pg_guard] +pub(super) extern "C-unwind" fn get_foreign_upper_paths< + E: Into, + W: ForeignDataWrapper, +>( + root: *mut pg_sys::PlannerInfo, + stage: pg_sys::UpperRelationKind::Type, + input_rel: *mut pg_sys::RelOptInfo, + output_rel: *mut pg_sys::RelOptInfo, + extra: *mut std::ffi::c_void, +) { + debug2!("---> get_foreign_upper_paths, stage: {:?}", stage); + + // Only handle GROUP_AGG stage + if stage != pg_sys::UpperRelationKind::UPPERREL_GROUP_AGG { + return; + } + + unsafe { + // Get the FDW state from the input relation + let fdw_private = (*input_rel).fdw_private; + if fdw_private.is_null() { + return; + } + + let state = PgBox::>::from_pg(fdw_private as _); + + // Check if FDW supports any aggregates + if let Some(ref instance) = state.instance { + let supported = instance.supported_aggregates(); + if supported.is_empty() { + return; + } + + // Extract aggregates from the query + let aggregates = match extract_aggregates(root, output_rel, extra) { + Some(aggs) => aggs, + None => return, + }; + + // Check if all aggregates are supported + for agg in &aggregates { + if !supported.contains(&agg.kind) { + debug2!("Aggregate {:?} not supported, skipping pushdown", agg.kind); + return; + } + } + + // Extract GROUP BY columns + let group_by = extract_group_by_columns(root, input_rel); + + // Check if GROUP BY is supported (if present) + if !group_by.is_empty() && !instance.supports_group_by() { + debug2!("GROUP BY not supported, skipping pushdown"); + return; + } + + // Get cost estimates + let (rows, width) = match state.instance.as_ref() { + Some(inst) => { + // We need a mutable reference, but we're in a const context + // For now, use default estimates + let rows = if group_by.is_empty() { 1 } else { 100 }; + let width = 8 * aggregates.len() as i32; + (rows, width) + } + None => return, + }; + + // Get startup cost from options + let startup_cost = state + .opts + .get("startup_cost") + .and_then(|c| c.parse::().ok()) + .unwrap_or(0.0); + + let total_cost = startup_cost + rows as f64; + + // Create the foreign upper path + // Note: We need to store aggregate info for later use in begin_scan + let path = pg_sys::create_foreign_upper_path( + root, + output_rel, + (*output_rel).reltarget, // pathtarget + rows as f64, // rows + #[cfg(feature = "pg18")] + 0, // disabled_nodes + startup_cost, // startup_cost + total_cost, // total_cost + ptr::null_mut(), // pathkeys + ptr::null_mut(), // fdw_outerpath + ptr::null_mut(), // fdw_restrictinfo + ptr::null_mut(), // fdw_private + ); + + // Add the path to the output relation + pg_sys::add_path(output_rel, &mut ((*path).path)); + + debug2!( + "Created aggregate pushdown path: {} aggregates, {} group by columns", + aggregates.len(), + group_by.len() + ); + } + } +} From 9d11a132a257820d7dcc58681195ecc6b8a721b3 Mon Sep 17 00:00:00 2001 From: JohnCari Date: Fri, 2 Jan 2026 22:32:47 -0400 Subject: [PATCH 02/11] docs: add aggregate pushdown documentation and debug logging - Add debug2! logging throughout upper.rs for better observability: - HAVING clause rejection - Unsupported aggregate function names - DISTINCT modifier processing - Extracted aggregates list - GROUP BY columns - Cost estimation values - Update docs/contributing/native.md with aggregate trait methods - Add comprehensive Aggregate Pushdown section to docs/guides/query-pushdown.md --- docs/contributing/native.md | 6 +++ docs/guides/query-pushdown.md | 67 +++++++++++++++++++++++++++++++++- supabase-wrappers/src/upper.rs | 36 +++++++++++++++++- 3 files changed, 106 insertions(+), 3 deletions(-) diff --git a/docs/contributing/native.md b/docs/contributing/native.md index 7c8aee7e3..243fbc3f6 100644 --- a/docs/contributing/native.md +++ b/docs/contributing/native.md @@ -25,6 +25,12 @@ pub trait ForeignDataWrapper { fn delete(...); fn end_modify(...); + // functions for aggregate pushdown (optional) + fn supported_aggregates(...) -> Vec; + fn supports_group_by(...) -> bool; + fn get_aggregate_rel_size(...) -> (i64, i32); + fn begin_aggregate_scan(...); + // other optional functions ... } diff --git a/docs/guides/query-pushdown.md b/docs/guides/query-pushdown.md index 208373efd..bc4589a47 100644 --- a/docs/guides/query-pushdown.md +++ b/docs/guides/query-pushdown.md @@ -10,7 +10,7 @@ Query pushdown is a technique that enhances query performance by executing parts ### Using Query Pushdown -In Wrappers, the pushdown logic is integrated into each FDW. You don’t need to modify your queries to benefit from this feature. For example, the [Stripe FDW](https://supabase.com/docs/guides/database/extensions/wrappers/stripe) automatically applies query pushdown for `id` within the `customer` object: +In Wrappers, the pushdown logic is integrated into each FDW. You don't need to modify your queries to benefit from this feature. For example, the [Stripe FDW](https://supabase.com/docs/guides/database/extensions/wrappers/stripe) automatically applies query pushdown for `id` within the `customer` object: ```sql select * @@ -34,3 +34,68 @@ limit 20; ``` This query executes `order by name limit 20` on ClickHouse before transferring the result to Postgres. + +### Aggregate Pushdown + +Aggregate pushdown allows aggregate functions like `COUNT`, `SUM`, `AVG`, `MIN`, and `MAX` to be executed directly on the foreign data source. This is especially valuable for analytics queries where you only need summary statistics rather than raw data. + +```sql +select count(*), sum(amount), avg(amount) +from foreign_table +where status = 'active'; +``` + +Instead of fetching all matching rows and computing aggregates locally, the FDW can push the entire aggregation to the remote source. This dramatically reduces data transfer - returning just a single row with the computed values. + +#### GROUP BY Support + +FDWs that support aggregate pushdown can also support `GROUP BY` pushdown: + +```sql +select department, count(*), avg(salary) +from employees +group by department; +``` + +This executes the grouping and aggregation on the remote server, returning only the grouped results. + +#### Supported Aggregate Functions + +The Wrappers framework supports pushing down these aggregate functions: + +| Function | Description | +|----------|-------------| +| `COUNT(*)` | Count all rows | +| `COUNT(column)` | Count non-null values | +| `COUNT(DISTINCT column)` | Count unique non-null values | +| `SUM(column)` | Sum of values | +| `AVG(column)` | Average of values | +| `MIN(column)` | Minimum value | +| `MAX(column)` | Maximum value | + +#### Implementing Aggregate Pushdown + +FDW developers can enable aggregate pushdown by implementing these trait methods: + +```rust +fn supported_aggregates(&self) -> Vec { + vec![AggregateKind::Count, AggregateKind::Sum, AggregateKind::Avg] +} + +fn supports_group_by(&self) -> bool { + true +} + +fn begin_aggregate_scan( + &mut self, + aggregates: &[Aggregate], + group_by: &[Column], + quals: &[Qual], + options: &HashMap, +) -> Result<(), Error> { + // Build and execute remote aggregate query + Ok(()) +} +``` + +See the [API documentation](https://docs.rs/supabase-wrappers/latest/supabase_wrappers/) for detailed information on implementing aggregate pushdown. diff --git a/supabase-wrappers/src/upper.rs b/supabase-wrappers/src/upper.rs index 035a7ddc9..c28ea50f8 100644 --- a/supabase-wrappers/src/upper.rs +++ b/supabase-wrappers/src/upper.rs @@ -64,6 +64,7 @@ unsafe fn extract_aggregates( let group_extra = extra as *mut pg_sys::GroupPathExtraData; if (*group_extra).havingQual != ptr::null_mut() { // HAVING clause not supported for pushdown + debug2!("HAVING clause present, skipping aggregate pushdown"); return None; } @@ -99,7 +100,17 @@ unsafe fn extract_aggregates( let kind = match oid_to_aggregate_kind((*aggref).aggfnoid) { Some(k) => k, - None => return None, // Unsupported aggregate, abort pushdown + None => { + // Get function name for debug message + let func_name = pg_sys::get_func_name((*aggref).aggfnoid); + if !func_name.is_null() { + let name = std::ffi::CStr::from_ptr(func_name).to_string_lossy(); + debug2!("Unsupported aggregate function '{}', skipping pushdown", name); + } else { + debug2!("Unknown aggregate function, skipping pushdown"); + } + return None; + } }; // Check for DISTINCT - only supported for COUNT @@ -107,8 +118,12 @@ unsafe fn extract_aggregates( match kind { AggregateKind::CountColumn => { // COUNT(DISTINCT col) is supported + debug2!("COUNT(DISTINCT) detected, pushdown supported"); + } + _ => { + debug2!("DISTINCT modifier on {:?} not supported, skipping pushdown", kind); + return None; } - _ => return None, // DISTINCT not supported for other aggregates } } @@ -171,6 +186,12 @@ unsafe fn extract_aggregates( return None; } + debug2!( + "Extracted {} aggregates for pushdown: {:?}", + aggregates.len(), + aggregates.iter().map(|a| a.kind).collect::>() + ); + Some(aggregates) } @@ -311,6 +332,12 @@ pub(super) extern "C-unwind" fn get_foreign_upper_paths< // Extract GROUP BY columns let group_by = extract_group_by_columns(root, input_rel); + if !group_by.is_empty() { + debug2!( + "Extracted GROUP BY columns: {:?}", + group_by.iter().map(|c| c.name.as_str()).collect::>() + ); + } // Check if GROUP BY is supported (if present) if !group_by.is_empty() && !instance.supports_group_by() { @@ -339,6 +366,11 @@ pub(super) extern "C-unwind" fn get_foreign_upper_paths< let total_cost = startup_cost + rows as f64; + debug2!( + "Aggregate pushdown cost estimate: rows={}, width={}, startup={}, total={}", + rows, width, startup_cost, total_cost + ); + // Create the foreign upper path // Note: We need to store aggregate info for later use in begin_scan let path = pg_sys::create_foreign_upper_path( From 5735e611f6223adb7c54f594062890397e05f4a6 Mon Sep 17 00:00:00 2001 From: JohnCari Date: Thu, 8 Jan 2026 13:36:46 -0400 Subject: [PATCH 03/11] style: format code with rustfmt Apply rustfmt formatting to satisfy CI checks: - Break long lines in debug2! macro calls - Adjust line formatting in interface.rs --- supabase-wrappers/src/interface.rs | 9 ++------- supabase-wrappers/src/upper.rs | 17 +++++++++++++---- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/supabase-wrappers/src/interface.rs b/supabase-wrappers/src/interface.rs index a4a7448bf..365cb7821 100644 --- a/supabase-wrappers/src/interface.rs +++ b/supabase-wrappers/src/interface.rs @@ -809,11 +809,7 @@ impl Aggregate { match self.kind { AggregateKind::Count => format!("{func_name}(*)"), _ => { - let col_name = self - .column - .as_ref() - .map(|c| c.name.as_str()) - .unwrap_or("*"); + let col_name = self.column.as_ref().map(|c| c.name.as_str()).unwrap_or("*"); if self.distinct { format!("{func_name}(DISTINCT {col_name})") } else { @@ -1195,8 +1191,7 @@ pub trait ForeignDataWrapper> { fdw_routine.ExplainForeignScan = Some(scan::explain_foreign_scan::); // upper path planning (aggregate pushdown) - fdw_routine.GetForeignUpperPaths = - Some(upper::get_foreign_upper_paths::); + fdw_routine.GetForeignUpperPaths = Some(upper::get_foreign_upper_paths::); // scan phase fdw_routine.BeginForeignScan = Some(scan::begin_foreign_scan::); diff --git a/supabase-wrappers/src/upper.rs b/supabase-wrappers/src/upper.rs index c28ea50f8..6a61bcd0a 100644 --- a/supabase-wrappers/src/upper.rs +++ b/supabase-wrappers/src/upper.rs @@ -105,7 +105,10 @@ unsafe fn extract_aggregates( let func_name = pg_sys::get_func_name((*aggref).aggfnoid); if !func_name.is_null() { let name = std::ffi::CStr::from_ptr(func_name).to_string_lossy(); - debug2!("Unsupported aggregate function '{}', skipping pushdown", name); + debug2!( + "Unsupported aggregate function '{}', skipping pushdown", + name + ); } else { debug2!("Unknown aggregate function, skipping pushdown"); } @@ -121,7 +124,10 @@ unsafe fn extract_aggregates( debug2!("COUNT(DISTINCT) detected, pushdown supported"); } _ => { - debug2!("DISTINCT modifier on {:?} not supported, skipping pushdown", kind); + debug2!( + "DISTINCT modifier on {:?} not supported, skipping pushdown", + kind + ); return None; } } @@ -368,7 +374,10 @@ pub(super) extern "C-unwind" fn get_foreign_upper_paths< debug2!( "Aggregate pushdown cost estimate: rows={}, width={}, startup={}, total={}", - rows, width, startup_cost, total_cost + rows, + width, + startup_cost, + total_cost ); // Create the foreign upper path @@ -379,7 +388,7 @@ pub(super) extern "C-unwind" fn get_foreign_upper_paths< (*output_rel).reltarget, // pathtarget rows as f64, // rows #[cfg(feature = "pg18")] - 0, // disabled_nodes + 0, // disabled_nodes startup_cost, // startup_cost total_cost, // total_cost ptr::null_mut(), // pathkeys From b27d54ec7bbc94eb4815702173298b8e75e8b76f Mon Sep 17 00:00:00 2001 From: JohnCari Date: Thu, 8 Jan 2026 14:02:43 -0400 Subject: [PATCH 04/11] fix: correct pgrx API usage for List iteration and create_foreign_upper_path - Replace direct List field access with pgrx::PgList::from_pg() for proper iteration - Fix create_foreign_upper_path signature for different PostgreSQL versions: - PG13-16: 9 parameters (no disabled_nodes, no fdw_restrictinfo) - PG17: 10 parameters (added fdw_restrictinfo) - PG18: 11 parameters (added disabled_nodes) - Use conditional compilation (#[cfg(feature = "...")]) for version-specific parameters --- supabase-wrappers/src/upper.rs | 134 ++++++++++++++------------------- 1 file changed, 56 insertions(+), 78 deletions(-) diff --git a/supabase-wrappers/src/upper.rs b/supabase-wrappers/src/upper.rs index 6a61bcd0a..1b2a52911 100644 --- a/supabase-wrappers/src/upper.rs +++ b/supabase-wrappers/src/upper.rs @@ -4,7 +4,7 @@ //! aggregate pushdown to foreign data sources. use pgrx::pg_sys::panic::ErrorReport; -use pgrx::{debug2, pg_guard, pg_sys, PgBox}; +use pgrx::{debug2, pg_guard, pg_sys, PgBox, PgList}; use std::ptr; use crate::interface::{Aggregate, AggregateKind, Column}; @@ -77,23 +77,15 @@ unsafe fn extract_aggregates( let mut aggregates = Vec::new(); let mut resno = 1; - // Iterate through the target expressions + // Iterate through the target expressions using PgList let exprs = (*reltarget).exprs; if exprs.is_null() { return None; } - let len = (*exprs).length; - let mut cell = (*exprs).head.elements; - - for _ in 0..len { - if cell.is_null() { - break; - } - - let expr = (*cell).ptr_value as *mut pg_sys::Node; - cell = cell.offset(1); + let exprs_list: PgList = PgList::from_pg(exprs); + for expr in exprs_list.iter_ptr() { // Check if this is an Aggref (aggregate reference) if (*expr).type_ == pg_sys::NodeTag::T_Aggref { let aggref = expr as *mut pg_sys::Aggref; @@ -135,29 +127,32 @@ unsafe fn extract_aggregates( // Get the column being aggregated (if any) let column = if (*aggref).args != ptr::null_mut() && (*(*aggref).args).length > 0 { - let arg_cell = (*(*aggref).args).head.elements; - let target_entry = (*arg_cell).ptr_value as *mut pg_sys::TargetEntry; - let arg_expr = (*target_entry).expr as *mut pg_sys::Node; - - if (*arg_expr).type_ == pg_sys::NodeTag::T_Var { - let var = arg_expr as *mut pg_sys::Var; - // Get column name from the var - let relid = (*var).varno as pg_sys::Index; - let attno = (*var).varattno; - - // Try to get the column name - let rte = pg_sys::planner_rt_fetch(relid, root); - if !rte.is_null() { - let rel_oid = (*rte).relid; - let att_name = pg_sys::get_attname(rel_oid, attno, false); - if !att_name.is_null() { - let name_cstr = std::ffi::CStr::from_ptr(att_name); - if let Ok(name) = name_cstr.to_str() { - Some(Column { - name: name.to_string(), - num: attno as usize, - type_oid: (*var).vartype, - }) + let args_list: PgList = PgList::from_pg((*aggref).args); + if let Some(target_entry) = args_list.iter_ptr().next() { + let arg_expr = (*target_entry).expr as *mut pg_sys::Node; + + if (*arg_expr).type_ == pg_sys::NodeTag::T_Var { + let var = arg_expr as *mut pg_sys::Var; + // Get column name from the var + let relid = (*var).varno as pg_sys::Index; + let attno = (*var).varattno; + + // Try to get the column name + let rte = pg_sys::planner_rt_fetch(relid, root); + if !rte.is_null() { + let rel_oid = (*rte).relid; + let att_name = pg_sys::get_attname(rel_oid, attno, false); + if !att_name.is_null() { + let name_cstr = std::ffi::CStr::from_ptr(att_name); + if let Ok(name) = name_cstr.to_str() { + Some(Column { + name: name.to_string(), + num: attno as usize, + type_oid: (*var).vartype, + }) + } else { + None + } } else { None } @@ -165,10 +160,10 @@ unsafe fn extract_aggregates( None } } else { + // Complex expression, not a simple column reference None } } else { - // Complex expression, not a simple column reference None } } else { @@ -204,7 +199,7 @@ unsafe fn extract_aggregates( /// Extract GROUP BY columns from the query unsafe fn extract_group_by_columns( root: *mut pg_sys::PlannerInfo, - input_rel: *mut pg_sys::RelOptInfo, + _input_rel: *mut pg_sys::RelOptInfo, ) -> Vec { let mut group_by = Vec::new(); @@ -224,33 +219,16 @@ unsafe fn extract_group_by_columns( return group_by; } - // Iterate through group clause items - let len = (*group_clause).length; - let mut cell = (*group_clause).head.elements; - - for _ in 0..len { - if cell.is_null() { - break; - } - - let sort_group_clause = (*cell).ptr_value as *mut pg_sys::SortGroupClause; - cell = cell.offset(1); + // Iterate through group clause items using PgList + let group_list: PgList = PgList::from_pg(group_clause); + let target_entries: PgList = PgList::from_pg(target_list); + for sort_group_clause in group_list.iter_ptr() { // Find the target entry for this group clause let tle_resno = (*sort_group_clause).tleSortGroupRef; // Search target list for matching resno - let tl_len = (*target_list).length; - let mut tl_cell = (*target_list).head.elements; - - for _ in 0..tl_len { - if tl_cell.is_null() { - break; - } - - let tle = (*tl_cell).ptr_value as *mut pg_sys::TargetEntry; - tl_cell = tl_cell.offset(1); - + for tle in target_entries.iter_ptr() { if (*tle).ressortgroupref == tle_resno { let expr = (*tle).expr as *mut pg_sys::Node; @@ -352,15 +330,12 @@ pub(super) extern "C-unwind" fn get_foreign_upper_paths< } // Get cost estimates - let (rows, width) = match state.instance.as_ref() { - Some(inst) => { - // We need a mutable reference, but we're in a const context - // For now, use default estimates - let rows = if group_by.is_empty() { 1 } else { 100 }; - let width = 8 * aggregates.len() as i32; - (rows, width) - } - None => return, + let (rows, _width) = { + // We need a mutable reference, but we're in a const context + // For now, use default estimates + let rows = if group_by.is_empty() { 1 } else { 100 }; + let width = 8 * aggregates.len() as i32; + (rows, width) }; // Get startup cost from options @@ -373,28 +348,31 @@ pub(super) extern "C-unwind" fn get_foreign_upper_paths< let total_cost = startup_cost + rows as f64; debug2!( - "Aggregate pushdown cost estimate: rows={}, width={}, startup={}, total={}", + "Aggregate pushdown cost estimate: rows={}, startup={}, total={}", rows, - width, startup_cost, total_cost ); // Create the foreign upper path - // Note: We need to store aggregate info for later use in begin_scan + // Note: The function signature differs across PostgreSQL versions: + // - PG13-16: 9 parameters (no disabled_nodes, no fdw_restrictinfo) + // - PG17: 10 parameters (added fdw_restrictinfo) + // - PG18: 11 parameters (added disabled_nodes) let path = pg_sys::create_foreign_upper_path( root, output_rel, (*output_rel).reltarget, // pathtarget rows as f64, // rows #[cfg(feature = "pg18")] - 0, // disabled_nodes - startup_cost, // startup_cost - total_cost, // total_cost - ptr::null_mut(), // pathkeys - ptr::null_mut(), // fdw_outerpath - ptr::null_mut(), // fdw_restrictinfo - ptr::null_mut(), // fdw_private + 0, // disabled_nodes (pg18 only) + startup_cost, // startup_cost + total_cost, // total_cost + ptr::null_mut(), // pathkeys + ptr::null_mut(), // fdw_outerpath + #[cfg(any(feature = "pg17", feature = "pg18"))] + ptr::null_mut(), // fdw_restrictinfo (pg17+ only) + ptr::null_mut(), // fdw_private ); // Add the path to the output relation From 0c76139df93fbc0362d1c0a5babc3f84d7a4957b Mon Sep 17 00:00:00 2001 From: JohnCari Date: Thu, 8 Jan 2026 14:32:45 -0400 Subject: [PATCH 05/11] style: fix comment alignment in upper.rs --- supabase-wrappers/src/upper.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/supabase-wrappers/src/upper.rs b/supabase-wrappers/src/upper.rs index 1b2a52911..da5930e07 100644 --- a/supabase-wrappers/src/upper.rs +++ b/supabase-wrappers/src/upper.rs @@ -366,13 +366,13 @@ pub(super) extern "C-unwind" fn get_foreign_upper_paths< rows as f64, // rows #[cfg(feature = "pg18")] 0, // disabled_nodes (pg18 only) - startup_cost, // startup_cost - total_cost, // total_cost - ptr::null_mut(), // pathkeys - ptr::null_mut(), // fdw_outerpath + startup_cost, // startup_cost + total_cost, // total_cost + ptr::null_mut(), // pathkeys + ptr::null_mut(), // fdw_outerpath #[cfg(any(feature = "pg17", feature = "pg18"))] ptr::null_mut(), // fdw_restrictinfo (pg17+ only) - ptr::null_mut(), // fdw_private + ptr::null_mut(), // fdw_private ); // Add the path to the output relation From 754579e71bf994e63f107913661e931a1c560681 Mon Sep 17 00:00:00 2001 From: JohnCari Date: Thu, 8 Jan 2026 14:44:53 -0400 Subject: [PATCH 06/11] fix: resolve lifetime error in args_list iteration --- supabase-wrappers/src/upper.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/supabase-wrappers/src/upper.rs b/supabase-wrappers/src/upper.rs index da5930e07..176c9c154 100644 --- a/supabase-wrappers/src/upper.rs +++ b/supabase-wrappers/src/upper.rs @@ -128,7 +128,9 @@ unsafe fn extract_aggregates( // Get the column being aggregated (if any) let column = if (*aggref).args != ptr::null_mut() && (*(*aggref).args).length > 0 { let args_list: PgList = PgList::from_pg((*aggref).args); - if let Some(target_entry) = args_list.iter_ptr().next() { + // Store the first entry before the if-let to avoid lifetime issues + let first_entry = args_list.iter_ptr().next(); + if let Some(target_entry) = first_entry { let arg_expr = (*target_entry).expr as *mut pg_sys::Node; if (*arg_expr).type_ == pg_sys::NodeTag::T_Var { From 12029bb54fe29dc5e7e3add11f08f2d1e01e8425 Mon Sep 17 00:00:00 2001 From: JohnCari Date: Thu, 8 Jan 2026 14:51:32 -0400 Subject: [PATCH 07/11] fix: address clippy warnings (cmp_null and uninlined_format_args) --- supabase-wrappers/src/upper.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/supabase-wrappers/src/upper.rs b/supabase-wrappers/src/upper.rs index 176c9c154..d96630b73 100644 --- a/supabase-wrappers/src/upper.rs +++ b/supabase-wrappers/src/upper.rs @@ -62,7 +62,7 @@ unsafe fn extract_aggregates( } let group_extra = extra as *mut pg_sys::GroupPathExtraData; - if (*group_extra).havingQual != ptr::null_mut() { + if !(*group_extra).havingQual.is_null() { // HAVING clause not supported for pushdown debug2!("HAVING clause present, skipping aggregate pushdown"); return None; @@ -109,7 +109,7 @@ unsafe fn extract_aggregates( }; // Check for DISTINCT - only supported for COUNT - if (*aggref).aggdistinct != ptr::null_mut() { + if !(*aggref).aggdistinct.is_null() { match kind { AggregateKind::CountColumn => { // COUNT(DISTINCT col) is supported @@ -126,7 +126,7 @@ unsafe fn extract_aggregates( } // Get the column being aggregated (if any) - let column = if (*aggref).args != ptr::null_mut() && (*(*aggref).args).length > 0 { + let column = if !(*aggref).args.is_null() && (*(*aggref).args).length > 0 { let args_list: PgList = PgList::from_pg((*aggref).args); // Store the first entry before the if-let to avoid lifetime issues let first_entry = args_list.iter_ptr().next(); @@ -175,8 +175,8 @@ unsafe fn extract_aggregates( let aggregate = Aggregate { kind, column, - distinct: (*aggref).aggdistinct != ptr::null_mut(), - alias: format!("agg_{}", resno), + distinct: !(*aggref).aggdistinct.is_null(), + alias: format!("agg_{resno}"), }; aggregates.push(aggregate); From 342541cc61e30222c30d04429ac9257169b5f895 Mon Sep 17 00:00:00 2001 From: JohnCari Date: Thu, 8 Jan 2026 14:58:14 -0400 Subject: [PATCH 08/11] fix: use fully qualified pgrx::PgList path --- supabase-wrappers/src/upper.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/supabase-wrappers/src/upper.rs b/supabase-wrappers/src/upper.rs index d96630b73..3fe29b838 100644 --- a/supabase-wrappers/src/upper.rs +++ b/supabase-wrappers/src/upper.rs @@ -4,7 +4,7 @@ //! aggregate pushdown to foreign data sources. use pgrx::pg_sys::panic::ErrorReport; -use pgrx::{debug2, pg_guard, pg_sys, PgBox, PgList}; +use pgrx::{debug2, pg_guard, pg_sys, PgBox}; use std::ptr; use crate::interface::{Aggregate, AggregateKind, Column}; @@ -83,7 +83,7 @@ unsafe fn extract_aggregates( return None; } - let exprs_list: PgList = PgList::from_pg(exprs); + let exprs_list: pgrx::PgList = pgrx::PgList::from_pg(exprs); for expr in exprs_list.iter_ptr() { // Check if this is an Aggref (aggregate reference) @@ -127,7 +127,8 @@ unsafe fn extract_aggregates( // Get the column being aggregated (if any) let column = if !(*aggref).args.is_null() && (*(*aggref).args).length > 0 { - let args_list: PgList = PgList::from_pg((*aggref).args); + let args_list: pgrx::PgList = + pgrx::PgList::from_pg((*aggref).args); // Store the first entry before the if-let to avoid lifetime issues let first_entry = args_list.iter_ptr().next(); if let Some(target_entry) = first_entry { @@ -222,8 +223,8 @@ unsafe fn extract_group_by_columns( } // Iterate through group clause items using PgList - let group_list: PgList = PgList::from_pg(group_clause); - let target_entries: PgList = PgList::from_pg(target_list); + let group_list: pgrx::PgList = pgrx::PgList::from_pg(group_clause); + let target_entries: pgrx::PgList = pgrx::PgList::from_pg(target_list); for sort_group_clause in group_list.iter_ptr() { // Find the target entry for this group clause From bab242363b3fa2e9c349c77969ded1085a2d0056 Mon Sep 17 00:00:00 2001 From: JohnCari Date: Thu, 8 Jan 2026 15:07:08 -0400 Subject: [PATCH 09/11] fix: add feature guard to upper module for cargo test --- supabase-wrappers/src/lib.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/supabase-wrappers/src/lib.rs b/supabase-wrappers/src/lib.rs index 101aa9eb8..2ff32d104 100644 --- a/supabase-wrappers/src/lib.rs +++ b/supabase-wrappers/src/lib.rs @@ -319,6 +319,16 @@ mod polyfill; mod qual; mod scan; mod sort; + +// The upper module uses pgrx::PgList which requires PG features +#[cfg(any( + feature = "pg13", + feature = "pg14", + feature = "pg15", + feature = "pg16", + feature = "pg17", + feature = "pg18" +))] mod upper; /// PgBox'ed `FdwRoutine`, used in [`fdw_routine`](interface::ForeignDataWrapper::fdw_routine) From 472cba89c54c1ff0b5fc24b83ff0fe31e562e32b Mon Sep 17 00:00:00 2001 From: JohnCari Date: Thu, 8 Jan 2026 15:25:34 -0400 Subject: [PATCH 10/11] fix: use raw list iteration instead of PgList Replace pgrx::PgList usage with direct pg_sys::List element access. This avoids type availability issues during cargo test since PgList may not be exported at the pgrx crate level in all configurations. The new list_iter helper function iterates over List elements using raw pointer access to the elements array, which is always available as part of the pg_sys FFI bindings. --- supabase-wrappers/src/upper.rs | 40 +++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/supabase-wrappers/src/upper.rs b/supabase-wrappers/src/upper.rs index 3fe29b838..62c301883 100644 --- a/supabase-wrappers/src/upper.rs +++ b/supabase-wrappers/src/upper.rs @@ -11,6 +11,21 @@ use crate::interface::{Aggregate, AggregateKind, Column}; use crate::prelude::ForeignDataWrapper; use crate::scan::FdwState; +/// Helper to iterate over a pg_sys::List using raw pointer access +/// Returns an iterator over pointers to the list elements +unsafe fn list_iter(list: *mut pg_sys::List) -> impl Iterator { + let len = if list.is_null() { + 0 + } else { + (*list).length as usize + }; + + (0..len).map(move |i| { + let cell = (*list).elements.add(i); + (*cell).ptr_value as *mut T + }) +} + /// Check if a given PostgreSQL aggregate OID is supported by the FDW fn oid_to_aggregate_kind(aggfnoid: pg_sys::Oid) -> Option { // PostgreSQL built-in aggregate function OIDs @@ -77,15 +92,13 @@ unsafe fn extract_aggregates( let mut aggregates = Vec::new(); let mut resno = 1; - // Iterate through the target expressions using PgList + // Iterate through the target expressions let exprs = (*reltarget).exprs; if exprs.is_null() { return None; } - let exprs_list: pgrx::PgList = pgrx::PgList::from_pg(exprs); - - for expr in exprs_list.iter_ptr() { + for expr in list_iter::(exprs) { // Check if this is an Aggref (aggregate reference) if (*expr).type_ == pg_sys::NodeTag::T_Aggref { let aggref = expr as *mut pg_sys::Aggref; @@ -127,11 +140,11 @@ unsafe fn extract_aggregates( // Get the column being aggregated (if any) let column = if !(*aggref).args.is_null() && (*(*aggref).args).length > 0 { - let args_list: pgrx::PgList = - pgrx::PgList::from_pg((*aggref).args); - // Store the first entry before the if-let to avoid lifetime issues - let first_entry = args_list.iter_ptr().next(); - if let Some(target_entry) = first_entry { + // Get first argument from the args list + let first_cell = (*(*aggref).args).elements; + let target_entry = (*first_cell).ptr_value as *mut pg_sys::TargetEntry; + + if !target_entry.is_null() { let arg_expr = (*target_entry).expr as *mut pg_sys::Node; if (*arg_expr).type_ == pg_sys::NodeTag::T_Var { @@ -222,16 +235,13 @@ unsafe fn extract_group_by_columns( return group_by; } - // Iterate through group clause items using PgList - let group_list: pgrx::PgList = pgrx::PgList::from_pg(group_clause); - let target_entries: pgrx::PgList = pgrx::PgList::from_pg(target_list); - - for sort_group_clause in group_list.iter_ptr() { + // Iterate through group clause items + for sort_group_clause in list_iter::(group_clause) { // Find the target entry for this group clause let tle_resno = (*sort_group_clause).tleSortGroupRef; // Search target list for matching resno - for tle in target_entries.iter_ptr() { + for tle in list_iter::(target_list) { if (*tle).ressortgroupref == tle_resno { let expr = (*tle).expr as *mut pg_sys::Node; From a3a04406088283b19939b1d04ecd05687ca32565 Mon Sep 17 00:00:00 2001 From: JohnCari Date: Thu, 8 Jan 2026 15:38:29 -0400 Subject: [PATCH 11/11] fix: resolve doctest compilation errors in interface.rs - Change type_oid: 0 to type_oid: pgrx::pg_sys::Oid::INVALID in Aggregate::deparse doctest - Change trait method doc examples to rust,ignore since they show implementation patterns with &self parameters that don't compile as standalone functions --- supabase-wrappers/src/interface.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/supabase-wrappers/src/interface.rs b/supabase-wrappers/src/interface.rs index 365cb7821..91009123b 100644 --- a/supabase-wrappers/src/interface.rs +++ b/supabase-wrappers/src/interface.rs @@ -798,7 +798,7 @@ impl Aggregate { /// /// let sum_col = Aggregate { /// kind: AggregateKind::Sum, - /// column: Some(Column { name: "price".to_string(), num: 1, type_oid: 0 }), + /// column: Some(Column { name: "price".to_string(), num: 1, type_oid: pgrx::pg_sys::Oid::INVALID }), /// distinct: false, /// alias: "total".to_string(), /// }; @@ -1006,7 +1006,7 @@ pub trait ForeignDataWrapper> { /// /// ## Examples /// - /// ```rust,no_run + /// ```rust,ignore /// use supabase_wrappers::prelude::*; /// /// fn supported_aggregates(&self) -> Vec { @@ -1033,7 +1033,7 @@ pub trait ForeignDataWrapper> { /// /// ## Examples /// - /// ```rust,no_run + /// ```rust,ignore /// fn supports_group_by(&self) -> bool { /// true /// } @@ -1063,7 +1063,7 @@ pub trait ForeignDataWrapper> { /// /// ## Examples /// - /// ```rust,no_run + /// ```rust,ignore /// fn get_aggregate_rel_size( /// &mut self, /// aggregates: &[Aggregate], @@ -1113,7 +1113,7 @@ pub trait ForeignDataWrapper> { /// /// ## Examples /// - /// ```rust,no_run + /// ```rust,ignore /// fn begin_aggregate_scan( /// &mut self, /// aggregates: &[Aggregate],