diff --git a/docs/contributing/native.md b/docs/contributing/native.md index 7c8aee7e3..243fbc3f6 100644 --- a/docs/contributing/native.md +++ b/docs/contributing/native.md @@ -25,6 +25,12 @@ pub trait ForeignDataWrapper { fn delete(...); fn end_modify(...); + // functions for aggregate pushdown (optional) + fn supported_aggregates(...) -> Vec; + fn supports_group_by(...) -> bool; + fn get_aggregate_rel_size(...) -> (i64, i32); + fn begin_aggregate_scan(...); + // other optional functions ... } diff --git a/docs/guides/query-pushdown.md b/docs/guides/query-pushdown.md index 208373efd..bc4589a47 100644 --- a/docs/guides/query-pushdown.md +++ b/docs/guides/query-pushdown.md @@ -10,7 +10,7 @@ Query pushdown is a technique that enhances query performance by executing parts ### Using Query Pushdown -In Wrappers, the pushdown logic is integrated into each FDW. You don’t need to modify your queries to benefit from this feature. For example, the [Stripe FDW](https://supabase.com/docs/guides/database/extensions/wrappers/stripe) automatically applies query pushdown for `id` within the `customer` object: +In Wrappers, the pushdown logic is integrated into each FDW. You don't need to modify your queries to benefit from this feature. For example, the [Stripe FDW](https://supabase.com/docs/guides/database/extensions/wrappers/stripe) automatically applies query pushdown for `id` within the `customer` object: ```sql select * @@ -34,3 +34,68 @@ limit 20; ``` This query executes `order by name limit 20` on ClickHouse before transferring the result to Postgres. + +### Aggregate Pushdown + +Aggregate pushdown allows aggregate functions like `COUNT`, `SUM`, `AVG`, `MIN`, and `MAX` to be executed directly on the foreign data source. This is especially valuable for analytics queries where you only need summary statistics rather than raw data. + +```sql +select count(*), sum(amount), avg(amount) +from foreign_table +where status = 'active'; +``` + +Instead of fetching all matching rows and computing aggregates locally, the FDW can push the entire aggregation to the remote source. This dramatically reduces data transfer - returning just a single row with the computed values. + +#### GROUP BY Support + +FDWs that support aggregate pushdown can also support `GROUP BY` pushdown: + +```sql +select department, count(*), avg(salary) +from employees +group by department; +``` + +This executes the grouping and aggregation on the remote server, returning only the grouped results. + +#### Supported Aggregate Functions + +The Wrappers framework supports pushing down these aggregate functions: + +| Function | Description | +|----------|-------------| +| `COUNT(*)` | Count all rows | +| `COUNT(column)` | Count non-null values | +| `COUNT(DISTINCT column)` | Count unique non-null values | +| `SUM(column)` | Sum of values | +| `AVG(column)` | Average of values | +| `MIN(column)` | Minimum value | +| `MAX(column)` | Maximum value | + +#### Implementing Aggregate Pushdown + +FDW developers can enable aggregate pushdown by implementing these trait methods: + +```rust +fn supported_aggregates(&self) -> Vec { + vec![AggregateKind::Count, AggregateKind::Sum, AggregateKind::Avg] +} + +fn supports_group_by(&self) -> bool { + true +} + +fn begin_aggregate_scan( + &mut self, + aggregates: &[Aggregate], + group_by: &[Column], + quals: &[Qual], + options: &HashMap, +) -> Result<(), Error> { + // Build and execute remote aggregate query + Ok(()) +} +``` + +See the [API documentation](https://docs.rs/supabase-wrappers/latest/supabase_wrappers/) for detailed information on implementing aggregate pushdown. diff --git a/supabase-wrappers/src/interface.rs b/supabase-wrappers/src/interface.rs index bcbc361d6..91009123b 100644 --- a/supabase-wrappers/src/interface.rs +++ b/supabase-wrappers/src/interface.rs @@ -678,6 +678,167 @@ impl Limit { } } +/// Represents the type of aggregate function for pushdown +/// +/// This enum is used to declare which aggregate functions an FDW supports +/// for remote execution via [`ForeignDataWrapper::supported_aggregates`]. +/// +/// ## Examples +/// +/// ```rust,no_run +/// use supabase_wrappers::prelude::*; +/// +/// fn get_supported() -> Vec { +/// vec![ +/// AggregateKind::Count, +/// AggregateKind::Sum, +/// AggregateKind::Avg, +/// ] +/// } +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AggregateKind { + /// COUNT(*) - count all rows + Count, + /// COUNT(column) - count non-null values in a column + CountColumn, + /// SUM(column) - sum of values + Sum, + /// AVG(column) - average of values + Avg, + /// MIN(column) - minimum value + Min, + /// MAX(column) - maximum value + Max, +} + +impl AggregateKind { + /// Returns the SQL function name for this aggregate kind + /// + /// ## Examples + /// + /// ```rust,no_run + /// use supabase_wrappers::prelude::*; + /// + /// assert_eq!(AggregateKind::Count.sql_name(), "COUNT"); + /// assert_eq!(AggregateKind::Sum.sql_name(), "SUM"); + /// ``` + pub fn sql_name(&self) -> &'static str { + match self { + AggregateKind::Count => "COUNT", + AggregateKind::CountColumn => "COUNT", + AggregateKind::Sum => "SUM", + AggregateKind::Avg => "AVG", + AggregateKind::Min => "MIN", + AggregateKind::Max => "MAX", + } + } +} + +/// Represents a single aggregate operation in a pushed-down query +/// +/// This struct contains all information needed for an FDW to execute an +/// aggregate function remotely. +/// +/// ## Examples +/// +/// ```rust,no_run +/// use supabase_wrappers::prelude::*; +/// +/// // COUNT(*) aggregate +/// let count_all = Aggregate { +/// kind: AggregateKind::Count, +/// column: None, +/// distinct: false, +/// alias: "count".to_string(), +/// }; +/// +/// // SUM(amount) aggregate +/// let sum_amount = Aggregate { +/// kind: AggregateKind::Sum, +/// column: Some(Column { +/// name: "amount".to_string(), +/// num: 1, +/// type_oid: pgrx::pg_sys::FLOAT8OID, +/// }), +/// distinct: false, +/// alias: "total_amount".to_string(), +/// }; +/// ``` +#[derive(Debug, Clone)] +pub struct Aggregate { + /// Type of aggregate function + pub kind: AggregateKind, + + /// Column being aggregated (None for COUNT(*)) + pub column: Option, + + /// Whether DISTINCT modifier is applied (e.g., COUNT(DISTINCT col)) + pub distinct: bool, + + /// Output column name/alias for the aggregate result + pub alias: String, +} + +impl Aggregate { + /// Deparses the aggregate into SQL syntax + /// + /// ## Examples + /// + /// ```rust,no_run + /// use supabase_wrappers::prelude::*; + /// + /// let count_all = Aggregate { + /// kind: AggregateKind::Count, + /// column: None, + /// distinct: false, + /// alias: "cnt".to_string(), + /// }; + /// assert_eq!(count_all.deparse(), "COUNT(*)"); + /// + /// let sum_col = Aggregate { + /// kind: AggregateKind::Sum, + /// column: Some(Column { name: "price".to_string(), num: 1, type_oid: pgrx::pg_sys::Oid::INVALID }), + /// distinct: false, + /// alias: "total".to_string(), + /// }; + /// assert_eq!(sum_col.deparse(), "SUM(price)"); + /// ``` + pub fn deparse(&self) -> String { + let func_name = self.kind.sql_name(); + match self.kind { + AggregateKind::Count => format!("{func_name}(*)"), + _ => { + let col_name = self.column.as_ref().map(|c| c.name.as_str()).unwrap_or("*"); + if self.distinct { + format!("{func_name}(DISTINCT {col_name})") + } else { + format!("{func_name}({col_name})") + } + } + } + } + + /// Deparses the aggregate with its alias for use in SELECT + /// + /// ## Examples + /// + /// ```rust,no_run + /// use supabase_wrappers::prelude::*; + /// + /// let count_all = Aggregate { + /// kind: AggregateKind::Count, + /// column: None, + /// distinct: false, + /// alias: "cnt".to_string(), + /// }; + /// assert_eq!(count_all.deparse_with_alias(), "COUNT(*) AS cnt"); + /// ``` + pub fn deparse_with_alias(&self) -> String { + format!("{} AS {}", self.deparse(), self.alias) + } +} + /// The Foreign Data Wrapper trait /// /// This is the main interface for your foreign data wrapper. Required functions @@ -827,6 +988,173 @@ pub trait ForeignDataWrapper> { Ok(()) } + // ========================================================================= + // Aggregate Pushdown Methods + // ========================================================================= + + /// Returns the list of aggregate functions this FDW can push down to the + /// remote data source. + /// + /// Default implementation returns an empty vector (no aggregate pushdown). + /// Override this method to enable aggregate pushdown for your FDW. + /// + /// When this method returns a non-empty vector, the framework will: + /// 1. Check if the query's aggregates are all supported + /// 2. Call [`get_aggregate_rel_size`](Self::get_aggregate_rel_size) for cost estimation + /// 3. Create an aggregate path in the query planner + /// 4. Call [`begin_aggregate_scan`](Self::begin_aggregate_scan) if the aggregate path is chosen + /// + /// ## Examples + /// + /// ```rust,ignore + /// use supabase_wrappers::prelude::*; + /// + /// fn supported_aggregates(&self) -> Vec { + /// vec![ + /// AggregateKind::Count, + /// AggregateKind::CountColumn, + /// AggregateKind::Sum, + /// AggregateKind::Avg, + /// AggregateKind::Min, + /// AggregateKind::Max, + /// ] + /// } + /// ``` + fn supported_aggregates(&self) -> Vec { + vec![] + } + + /// Returns whether this FDW supports GROUP BY pushdown alongside aggregates. + /// + /// Only relevant if [`supported_aggregates`](Self::supported_aggregates) returns a non-empty vector. + /// Default implementation returns `false`. + /// + /// When `true`, GROUP BY columns will be passed to [`begin_aggregate_scan`](Self::begin_aggregate_scan). + /// + /// ## Examples + /// + /// ```rust,ignore + /// fn supports_group_by(&self) -> bool { + /// true + /// } + /// ``` + fn supports_group_by(&self) -> bool { + false + } + + /// Estimate the size of aggregate query results for query planning. + /// + /// Called during query planning when aggregate pushdown is being considered. + /// Returns `(estimated_rows, mean_row_width_bytes)`. + /// + /// Default implementation estimates: + /// - 1 row for ungrouped aggregates (e.g., `SELECT COUNT(*) FROM t`) + /// - 100 rows for grouped aggregates (e.g., `SELECT dept, COUNT(*) FROM t GROUP BY dept`) + /// + /// Override this method if you can provide better estimates based on your + /// knowledge of the remote data source. + /// + /// ## Parameters + /// + /// - `aggregates`: List of aggregate operations to be computed + /// - `group_by`: Columns in GROUP BY clause (empty if none) + /// - `quals`: WHERE clause conditions + /// - `options`: Foreign table options + /// + /// ## Examples + /// + /// ```rust,ignore + /// fn get_aggregate_rel_size( + /// &mut self, + /// aggregates: &[Aggregate], + /// group_by: &[Column], + /// quals: &[Qual], + /// options: &HashMap, + /// ) -> Result<(i64, i32), MyFdwError> { + /// let rows = if group_by.is_empty() { 1 } else { 50 }; + /// let width = 8 * aggregates.len() as i32; + /// Ok((rows, width)) + /// } + /// ``` + fn get_aggregate_rel_size( + &mut self, + aggregates: &[Aggregate], + group_by: &[Column], + _quals: &[Qual], + _options: &HashMap, + ) -> Result<(i64, i32), E> { + // Default: 1 row if no GROUP BY, otherwise estimate 100 rows + let rows = if group_by.is_empty() { 1 } else { 100 }; + let width = 8 * aggregates.len() as i32; + Ok((rows, width)) + } + + /// Called when beginning execution of a pushed-down aggregate query. + /// + /// This method is called instead of [`begin_scan`](Self::begin_scan) when + /// the query planner chooses the aggregate pushdown path. The FDW should + /// prepare to return aggregate results based on the provided parameters. + /// + /// After this method returns successfully, [`iter_scan`](Self::iter_scan) + /// will be called to retrieve the aggregate results. + /// + /// ## Parameters + /// + /// - `aggregates`: List of aggregate operations to compute + /// - `group_by`: Columns to group by (empty for ungrouped aggregates) + /// - `quals`: WHERE clause conditions to apply before aggregation + /// - `options`: Foreign table options + /// + /// ## Result Row Format + /// + /// The rows returned by [`iter_scan`](Self::iter_scan) should contain: + /// 1. GROUP BY column values (in order specified) + /// 2. Aggregate results (in order specified, using `alias` as column name) + /// + /// ## Examples + /// + /// ```rust,ignore + /// fn begin_aggregate_scan( + /// &mut self, + /// aggregates: &[Aggregate], + /// group_by: &[Column], + /// quals: &[Qual], + /// options: &HashMap, + /// ) -> Result<(), MyFdwError> { + /// // Build remote query with aggregates + /// let mut select_items = Vec::new(); + /// + /// // Add GROUP BY columns + /// for col in group_by { + /// select_items.push(col.name.clone()); + /// } + /// + /// // Add aggregate expressions + /// for agg in aggregates { + /// select_items.push(agg.deparse_with_alias()); + /// } + /// + /// let query = format!("SELECT {} FROM ...", select_items.join(", ")); + /// + /// // Execute and store results for iter_scan + /// self.results = self.execute_query(&query)?; + /// Ok(()) + /// } + /// ``` + fn begin_aggregate_scan( + &mut self, + _aggregates: &[Aggregate], + _group_by: &[Column], + _quals: &[Qual], + _options: &HashMap, + ) -> Result<(), E> { + // Default: This should not be called if supported_aggregates() returns empty. + // If called, it means the FDW declared aggregate support but didn't implement + // this method - which is a programming error. We'll just proceed with an + // empty result set by returning Ok. + Ok(()) + } + /// Obtain a list of foreign table creation commands /// /// Return a list of string, each of which must contain a CREATE FOREIGN TABLE @@ -848,7 +1176,7 @@ pub trait ForeignDataWrapper> { Self: Sized, { unsafe { - use crate::{import_foreign_schema, modify, scan}; + use crate::{import_foreign_schema, modify, scan, upper}; let mut fdw_routine = FdwRoutine::::alloc_node(pg_sys::NodeTag::T_FdwRoutine); @@ -862,6 +1190,9 @@ pub trait ForeignDataWrapper> { fdw_routine.GetForeignPlan = Some(scan::get_foreign_plan::); fdw_routine.ExplainForeignScan = Some(scan::explain_foreign_scan::); + // upper path planning (aggregate pushdown) + fdw_routine.GetForeignUpperPaths = Some(upper::get_foreign_upper_paths::); + // scan phase fdw_routine.BeginForeignScan = Some(scan::begin_foreign_scan::); fdw_routine.IterateForeignScan = Some(scan::iterate_foreign_scan::); diff --git a/supabase-wrappers/src/lib.rs b/supabase-wrappers/src/lib.rs index 32435a0f6..2ff32d104 100644 --- a/supabase-wrappers/src/lib.rs +++ b/supabase-wrappers/src/lib.rs @@ -320,6 +320,17 @@ mod qual; mod scan; mod sort; +// The upper module uses pgrx::PgList which requires PG features +#[cfg(any( + feature = "pg13", + feature = "pg14", + feature = "pg15", + feature = "pg16", + feature = "pg17", + feature = "pg18" +))] +mod upper; + /// PgBox'ed `FdwRoutine`, used in [`fdw_routine`](interface::ForeignDataWrapper::fdw_routine) pub type FdwRoutine = PgBox; diff --git a/supabase-wrappers/src/scan.rs b/supabase-wrappers/src/scan.rs index 3e274b84d..8bbae4284 100644 --- a/supabase-wrappers/src/scan.rs +++ b/supabase-wrappers/src/scan.rs @@ -25,24 +25,24 @@ use crate::sort::*; use crate::utils::{self, report_error, ReportableError, SerdeList}; // Fdw private state for scan -struct FdwState, W: ForeignDataWrapper> { +pub(crate) struct FdwState, W: ForeignDataWrapper> { // foreign data wrapper instance - instance: Option, + pub(crate) instance: Option, // query conditions - quals: Vec, + pub(crate) quals: Vec, // query target column list - tgts: Vec, + pub(crate) tgts: Vec, // sort list - sorts: Vec, + pub(crate) sorts: Vec, // limit - limit: Option, + pub(crate) limit: Option, // foreign table options - opts: HashMap, + pub(crate) opts: HashMap, // temporary memory context per foreign table, created under Wrappers root // memory context diff --git a/supabase-wrappers/src/upper.rs b/supabase-wrappers/src/upper.rs new file mode 100644 index 000000000..62c301883 --- /dev/null +++ b/supabase-wrappers/src/upper.rs @@ -0,0 +1,401 @@ +//! Upper path planning for aggregate pushdown +//! +//! This module implements the GetForeignUpperPaths callback which enables +//! aggregate pushdown to foreign data sources. + +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::{debug2, pg_guard, pg_sys, PgBox}; +use std::ptr; + +use crate::interface::{Aggregate, AggregateKind, Column}; +use crate::prelude::ForeignDataWrapper; +use crate::scan::FdwState; + +/// Helper to iterate over a pg_sys::List using raw pointer access +/// Returns an iterator over pointers to the list elements +unsafe fn list_iter(list: *mut pg_sys::List) -> impl Iterator { + let len = if list.is_null() { + 0 + } else { + (*list).length as usize + }; + + (0..len).map(move |i| { + let cell = (*list).elements.add(i); + (*cell).ptr_value as *mut T + }) +} + +/// Check if a given PostgreSQL aggregate OID is supported by the FDW +fn oid_to_aggregate_kind(aggfnoid: pg_sys::Oid) -> Option { + // PostgreSQL built-in aggregate function OIDs + // These are from pg_proc.dat in PostgreSQL source + // COUNT(*) = 2803, COUNT(any) = 2147 + // SUM(int8) = 2107, SUM(int4) = 2108, SUM(float8) = 2111, etc. + // AVG(int8) = 2100, AVG(float8) = 2105, etc. + // MIN(any) = 2145, MAX(any) = 2146 + + unsafe { + // Get the aggregate function name from the OID + let agg_name = pg_sys::get_func_name(aggfnoid); + if agg_name.is_null() { + return None; + } + + let name_cstr = std::ffi::CStr::from_ptr(agg_name); + let name = name_cstr.to_str().ok()?; + + match name { + "count" => { + // Determine if it's COUNT(*) or COUNT(column) + // COUNT(*) has no arguments, COUNT(column) has one + let nargs = pg_sys::get_func_nargs(aggfnoid); + if nargs == 0 { + Some(AggregateKind::Count) + } else { + Some(AggregateKind::CountColumn) + } + } + "sum" => Some(AggregateKind::Sum), + "avg" => Some(AggregateKind::Avg), + "min" => Some(AggregateKind::Min), + "max" => Some(AggregateKind::Max), + _ => None, + } + } +} + +/// Extract aggregate information from the query +unsafe fn extract_aggregates( + root: *mut pg_sys::PlannerInfo, + output_rel: *mut pg_sys::RelOptInfo, + extra: *mut std::ffi::c_void, +) -> Option> { + // The extra parameter for UPPERREL_GROUP_AGG is GroupPathExtraData + if extra.is_null() { + return None; + } + + let group_extra = extra as *mut pg_sys::GroupPathExtraData; + if !(*group_extra).havingQual.is_null() { + // HAVING clause not supported for pushdown + debug2!("HAVING clause present, skipping aggregate pushdown"); + return None; + } + + // Get the target list from the output relation + let reltarget = (*output_rel).reltarget; + if reltarget.is_null() { + return None; + } + + let mut aggregates = Vec::new(); + let mut resno = 1; + + // Iterate through the target expressions + let exprs = (*reltarget).exprs; + if exprs.is_null() { + return None; + } + + for expr in list_iter::(exprs) { + // Check if this is an Aggref (aggregate reference) + if (*expr).type_ == pg_sys::NodeTag::T_Aggref { + let aggref = expr as *mut pg_sys::Aggref; + + let kind = match oid_to_aggregate_kind((*aggref).aggfnoid) { + Some(k) => k, + None => { + // Get function name for debug message + let func_name = pg_sys::get_func_name((*aggref).aggfnoid); + if !func_name.is_null() { + let name = std::ffi::CStr::from_ptr(func_name).to_string_lossy(); + debug2!( + "Unsupported aggregate function '{}', skipping pushdown", + name + ); + } else { + debug2!("Unknown aggregate function, skipping pushdown"); + } + return None; + } + }; + + // Check for DISTINCT - only supported for COUNT + if !(*aggref).aggdistinct.is_null() { + match kind { + AggregateKind::CountColumn => { + // COUNT(DISTINCT col) is supported + debug2!("COUNT(DISTINCT) detected, pushdown supported"); + } + _ => { + debug2!( + "DISTINCT modifier on {:?} not supported, skipping pushdown", + kind + ); + return None; + } + } + } + + // Get the column being aggregated (if any) + let column = if !(*aggref).args.is_null() && (*(*aggref).args).length > 0 { + // Get first argument from the args list + let first_cell = (*(*aggref).args).elements; + let target_entry = (*first_cell).ptr_value as *mut pg_sys::TargetEntry; + + if !target_entry.is_null() { + let arg_expr = (*target_entry).expr as *mut pg_sys::Node; + + if (*arg_expr).type_ == pg_sys::NodeTag::T_Var { + let var = arg_expr as *mut pg_sys::Var; + // Get column name from the var + let relid = (*var).varno as pg_sys::Index; + let attno = (*var).varattno; + + // Try to get the column name + let rte = pg_sys::planner_rt_fetch(relid, root); + if !rte.is_null() { + let rel_oid = (*rte).relid; + let att_name = pg_sys::get_attname(rel_oid, attno, false); + if !att_name.is_null() { + let name_cstr = std::ffi::CStr::from_ptr(att_name); + if let Ok(name) = name_cstr.to_str() { + Some(Column { + name: name.to_string(), + num: attno as usize, + type_oid: (*var).vartype, + }) + } else { + None + } + } else { + None + } + } else { + None + } + } else { + // Complex expression, not a simple column reference + None + } + } else { + None + } + } else { + None + }; + + let aggregate = Aggregate { + kind, + column, + distinct: !(*aggref).aggdistinct.is_null(), + alias: format!("agg_{resno}"), + }; + + aggregates.push(aggregate); + } + + resno += 1; + } + + if aggregates.is_empty() { + return None; + } + + debug2!( + "Extracted {} aggregates for pushdown: {:?}", + aggregates.len(), + aggregates.iter().map(|a| a.kind).collect::>() + ); + + Some(aggregates) +} + +/// Extract GROUP BY columns from the query +unsafe fn extract_group_by_columns( + root: *mut pg_sys::PlannerInfo, + _input_rel: *mut pg_sys::RelOptInfo, +) -> Vec { + let mut group_by = Vec::new(); + + let parse = (*root).parse; + if parse.is_null() { + return group_by; + } + + let group_clause = (*parse).groupClause; + if group_clause.is_null() || (*group_clause).length == 0 { + return group_by; + } + + // Get the target list + let target_list = (*parse).targetList; + if target_list.is_null() { + return group_by; + } + + // Iterate through group clause items + for sort_group_clause in list_iter::(group_clause) { + // Find the target entry for this group clause + let tle_resno = (*sort_group_clause).tleSortGroupRef; + + // Search target list for matching resno + for tle in list_iter::(target_list) { + if (*tle).ressortgroupref == tle_resno { + let expr = (*tle).expr as *mut pg_sys::Node; + + if (*expr).type_ == pg_sys::NodeTag::T_Var { + let var = expr as *mut pg_sys::Var; + let relid = (*var).varno as pg_sys::Index; + let attno = (*var).varattno; + + let rte = pg_sys::planner_rt_fetch(relid, root); + if !rte.is_null() { + let rel_oid = (*rte).relid; + let att_name = pg_sys::get_attname(rel_oid, attno, false); + if !att_name.is_null() { + let name_cstr = std::ffi::CStr::from_ptr(att_name); + if let Ok(name) = name_cstr.to_str() { + group_by.push(Column { + name: name.to_string(), + num: attno as usize, + type_oid: (*var).vartype, + }); + } + } + } + } + break; + } + } + } + + group_by +} + +/// GetForeignUpperPaths callback +/// +/// This callback is called by the PostgreSQL planner to create paths for +/// upper-level processing (aggregation, sorting, etc.) that can be pushed +/// down to the foreign server. +#[pg_guard] +pub(super) extern "C-unwind" fn get_foreign_upper_paths< + E: Into, + W: ForeignDataWrapper, +>( + root: *mut pg_sys::PlannerInfo, + stage: pg_sys::UpperRelationKind::Type, + input_rel: *mut pg_sys::RelOptInfo, + output_rel: *mut pg_sys::RelOptInfo, + extra: *mut std::ffi::c_void, +) { + debug2!("---> get_foreign_upper_paths, stage: {:?}", stage); + + // Only handle GROUP_AGG stage + if stage != pg_sys::UpperRelationKind::UPPERREL_GROUP_AGG { + return; + } + + unsafe { + // Get the FDW state from the input relation + let fdw_private = (*input_rel).fdw_private; + if fdw_private.is_null() { + return; + } + + let state = PgBox::>::from_pg(fdw_private as _); + + // Check if FDW supports any aggregates + if let Some(ref instance) = state.instance { + let supported = instance.supported_aggregates(); + if supported.is_empty() { + return; + } + + // Extract aggregates from the query + let aggregates = match extract_aggregates(root, output_rel, extra) { + Some(aggs) => aggs, + None => return, + }; + + // Check if all aggregates are supported + for agg in &aggregates { + if !supported.contains(&agg.kind) { + debug2!("Aggregate {:?} not supported, skipping pushdown", agg.kind); + return; + } + } + + // Extract GROUP BY columns + let group_by = extract_group_by_columns(root, input_rel); + if !group_by.is_empty() { + debug2!( + "Extracted GROUP BY columns: {:?}", + group_by.iter().map(|c| c.name.as_str()).collect::>() + ); + } + + // Check if GROUP BY is supported (if present) + if !group_by.is_empty() && !instance.supports_group_by() { + debug2!("GROUP BY not supported, skipping pushdown"); + return; + } + + // Get cost estimates + let (rows, _width) = { + // We need a mutable reference, but we're in a const context + // For now, use default estimates + let rows = if group_by.is_empty() { 1 } else { 100 }; + let width = 8 * aggregates.len() as i32; + (rows, width) + }; + + // Get startup cost from options + let startup_cost = state + .opts + .get("startup_cost") + .and_then(|c| c.parse::().ok()) + .unwrap_or(0.0); + + let total_cost = startup_cost + rows as f64; + + debug2!( + "Aggregate pushdown cost estimate: rows={}, startup={}, total={}", + rows, + startup_cost, + total_cost + ); + + // Create the foreign upper path + // Note: The function signature differs across PostgreSQL versions: + // - PG13-16: 9 parameters (no disabled_nodes, no fdw_restrictinfo) + // - PG17: 10 parameters (added fdw_restrictinfo) + // - PG18: 11 parameters (added disabled_nodes) + let path = pg_sys::create_foreign_upper_path( + root, + output_rel, + (*output_rel).reltarget, // pathtarget + rows as f64, // rows + #[cfg(feature = "pg18")] + 0, // disabled_nodes (pg18 only) + startup_cost, // startup_cost + total_cost, // total_cost + ptr::null_mut(), // pathkeys + ptr::null_mut(), // fdw_outerpath + #[cfg(any(feature = "pg17", feature = "pg18"))] + ptr::null_mut(), // fdw_restrictinfo (pg17+ only) + ptr::null_mut(), // fdw_private + ); + + // Add the path to the output relation + pg_sys::add_path(output_rel, &mut ((*path).path)); + + debug2!( + "Created aggregate pushdown path: {} aggregates, {} group by columns", + aggregates.len(), + group_by.len() + ); + } + } +}