From 13c660b19f9d509fd58cf1f2485c2c2265e17984 Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Wed, 6 May 2026 20:13:22 +0200 Subject: [PATCH 1/5] chore: switched to pg_introspect --- Cargo.lock | 12 + Cargo.toml | 1 + crates/dry_run_core/Cargo.toml | 1 + .../src/schema/from_pg_introspect.rs | 317 ++++++++++++++++++ .../src/schema/introspect/catalog.rs | 121 ------- .../src/schema/introspect/comments.rs | 62 ---- .../src/schema/introspect/indexes.rs | 78 ----- .../dry_run_core/src/schema/introspect/mod.rs | 302 +++-------------- .../src/schema/introspect/objects.rs | 137 -------- .../src/schema/introspect/partitions.rs | 61 ---- .../src/schema/introspect/policies.rs | 78 ----- .../src/schema/introspect/raw_types.rs | 85 ----- .../src/schema/introspect/tables.rs | 150 --------- crates/dry_run_core/src/schema/mod.rs | 1 + 14 files changed, 383 insertions(+), 1023 deletions(-) create mode 100644 crates/dry_run_core/src/schema/from_pg_introspect.rs delete mode 100644 crates/dry_run_core/src/schema/introspect/catalog.rs delete mode 100644 crates/dry_run_core/src/schema/introspect/comments.rs delete mode 100644 crates/dry_run_core/src/schema/introspect/indexes.rs delete mode 100644 crates/dry_run_core/src/schema/introspect/objects.rs delete mode 100644 crates/dry_run_core/src/schema/introspect/partitions.rs delete mode 100644 crates/dry_run_core/src/schema/introspect/policies.rs delete mode 100644 crates/dry_run_core/src/schema/introspect/raw_types.rs delete mode 100644 crates/dry_run_core/src/schema/introspect/tables.rs diff --git a/Cargo.lock b/Cargo.lock index b75da00..d6a50c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -532,6 +532,7 @@ version = "0.6.1" dependencies = [ "async-trait", "chrono", + "pg_introspect", "pg_query", "regex", "rusqlite", @@ -1528,6 +1529,17 @@ dependencies = [ "indexmap", ] +[[package]] +name = "pg_introspect" +version = "0.1.0" +dependencies = [ + "indexmap", + "serde", + "sqlx", + "thiserror 2.0.18", + "tracing", +] + [[package]] name = "pg_query" version = "6.1.1" diff --git a/Cargo.toml b/Cargo.toml index 65a1012..3e3396e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ dbg_macro = "deny" [workspace.dependencies] dry_run_core = { path = "crates/dry_run_core" } +pg_introspect = { path = "../pg_introspect" } async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } clap = { version = "4", features = ["derive", "env"] } diff --git a/crates/dry_run_core/Cargo.toml b/crates/dry_run_core/Cargo.toml index 6a3330f..37dd5e4 100644 --- a/crates/dry_run_core/Cargo.toml +++ b/crates/dry_run_core/Cargo.toml @@ -6,6 +6,7 @@ edition.workspace = true [dependencies] async-trait = { workspace = true } chrono = { workspace = true } +pg_introspect = { workspace = true } pg_query = { workspace = true } regex = { workspace = true } rusqlite = { workspace = true } diff --git a/crates/dry_run_core/src/schema/from_pg_introspect.rs b/crates/dry_run_core/src/schema/from_pg_introspect.rs new file mode 100644 index 0000000..9694c1f --- /dev/null +++ b/crates/dry_run_core/src/schema/from_pg_introspect.rs @@ -0,0 +1,317 @@ +use pg_introspect::{ + Catalog as PgCatalog, CheckConstraint as PgCheck, Column as PgColumn, + CompositeType as PgComposite, DomainType as PgDomain, EnumType as PgEnum, + ExclusionConstraint as PgExclusion, Extension as PgExtension, ForeignKey as PgFk, + Function as PgFunction, GeneratedKind, IdentityKind, Index as PgIndex, + PartitionChild as PgPartChild, PartitionInfo as PgPartInfo, PartitionStrategy as PgPartStrat, + PolicyCommand, PrimaryKey as PgPrimaryKey, RlsPolicy as PgPolicy, Table as PgTable, + Trigger as PgTrigger, UniqueConstraint as PgUnique, View as PgView, ViewKind, + Volatility as PgVol, +}; + +use super::types::*; + +// envelope (pg_version, database, gucs, content_hash, ...) is the caller's job +pub fn catalog_to_snapshot_parts(cat: PgCatalog) -> SnapshotParts { + let mut out = SnapshotParts::default(); + + for (_, t) in cat.tables { + out.tables.push(convert_table(t)); + } + for (_, v) in cat.views { + out.views.push(convert_view(v)); + } + for e in cat.enums { + out.enums.push(convert_enum(e)); + } + for d in cat.domains { + out.domains.push(convert_domain(d)); + } + for c in cat.composites { + out.composites.push(convert_composite(c)); + } + for f in cat.functions { + out.functions.push(convert_function(f)); + } + for e in cat.extensions { + out.extensions.push(convert_extension(e)); + } + + out +} + +#[derive(Default)] +pub struct SnapshotParts { + pub tables: Vec, + pub enums: Vec, + pub domains: Vec, + pub composites: Vec, + pub views: Vec, + pub functions: Vec, + pub extensions: Vec, +} + +fn convert_table(t: PgTable) -> Table { + let mut constraints: Vec = Vec::new(); + if let Some(pk) = t.primary_key { + constraints.push(convert_pk(pk)); + } + for fk in t.foreign_keys { + constraints.push(convert_fk(fk)); + } + for u in t.unique_constraints { + constraints.push(convert_unique(u)); + } + for c in t.check_constraints { + constraints.push(convert_check(c)); + } + for x in t.exclusion_constraints { + constraints.push(convert_exclusion(x)); + } + + let mut cols: Vec = Vec::with_capacity(t.columns.len()); + for (_, c) in t.columns { + cols.push(convert_column(c)); + } + + Table { + oid: t.oid, + schema: t.name.schema, + name: t.name.name, + columns: cols, + constraints, + indexes: t.indexes.into_iter().map(convert_index).collect(), + comment: t.comment, + partition_info: t.partition_info.map(convert_partition_info), + policies: t.policies.into_iter().map(convert_policy).collect(), + triggers: t.triggers.into_iter().map(convert_trigger).collect(), + reloptions: t.reloptions, + rls_enabled: t.rls_enabled, + } +} + +fn convert_column(c: PgColumn) -> Column { + // dryrun keeps these as the raw pg_attribute char codes + let identity = c.identity.map(|k| match k { + IdentityKind::Always => "a", + IdentityKind::ByDefault => "d", + }); + let generated = c.generated.map(|g| match g { + GeneratedKind::Stored => "s", + GeneratedKind::Virtual => "v", + }); + + Column { + name: c.name, + ordinal: c.ordinal, + type_name: c.type_name, + nullable: c.is_nullable, + default: c.default, + identity: identity.map(String::from), + generated: generated.map(String::from), + comment: c.comment, + statistics_target: c.statistics_target, + } +} + +fn convert_pk(pk: PgPrimaryKey) -> Constraint { + Constraint { + name: pk.name, + kind: ConstraintKind::PrimaryKey, + columns: pk.columns, + definition: Some(pk.definition), + fk_table: None, + fk_columns: vec![], + backing_index: None, + comment: None, + } +} + +fn convert_fk(fk: PgFk) -> Constraint { + let target = format!("{}.{}", fk.references.schema, fk.references.name); + Constraint { + name: fk.constraint_name, + kind: ConstraintKind::ForeignKey, + columns: fk.columns, + definition: Some(fk.definition), + fk_table: Some(target), + fk_columns: fk.references_columns, + backing_index: None, + comment: None, + } +} + +fn convert_unique(u: PgUnique) -> Constraint { + Constraint { + name: u.name, + kind: ConstraintKind::Unique, + columns: u.columns, + definition: Some(u.definition), + fk_table: None, + fk_columns: vec![], + backing_index: Some(u.index_name), + comment: None, + } +} + +fn convert_check(c: PgCheck) -> Constraint { + Constraint { + name: c.name, + kind: ConstraintKind::Check, + columns: c.columns, + definition: Some(c.definition), + fk_table: None, + fk_columns: vec![], + backing_index: None, + comment: None, + } +} + +fn convert_exclusion(x: PgExclusion) -> Constraint { + Constraint { + name: x.name, + kind: ConstraintKind::Exclusion, + columns: x.columns, + definition: Some(x.definition), + fk_table: None, + fk_columns: vec![], + backing_index: Some(x.index_name), + comment: None, + } +} + +fn convert_index(i: PgIndex) -> Index { + Index { + name: i.name, + columns: i.columns, + include_columns: i.included_columns, + index_type: i.method, + is_unique: i.is_unique, + is_primary: i.is_primary, + predicate: i.predicate, + definition: i.definition, + is_valid: i.is_valid, + backs_constraint: i.backs_constraint, + } +} + +fn convert_partition_info(p: PgPartInfo) -> PartitionInfo { + PartitionInfo { + strategy: match p.strategy { + PgPartStrat::Range => PartitionStrategy::Range, + PgPartStrat::List => PartitionStrategy::List, + PgPartStrat::Hash => PartitionStrategy::Hash, + }, + key: p.key, + children: p + .children + .into_iter() + .map(convert_partition_child) + .collect(), + } +} + +fn convert_partition_child(c: PgPartChild) -> PartitionChild { + PartitionChild { + schema: c.name.schema, + name: c.name.name, + bound: c.bound, + } +} + +fn convert_policy(p: PgPolicy) -> RlsPolicy { + let cmd = match p.command { + PolicyCommand::All => "ALL", + PolicyCommand::Select => "SELECT", + PolicyCommand::Insert => "INSERT", + PolicyCommand::Update => "UPDATE", + PolicyCommand::Delete => "DELETE", + }; + RlsPolicy { + name: p.name, + command: cmd.to_string(), + permissive: p.permissive, + roles: p.roles, + using_expr: p.using_expr, + with_check_expr: p.with_check_expr, + } +} + +fn convert_trigger(t: PgTrigger) -> Trigger { + // pg_introspect carries timing/events/orientation separately, but dryrun + // only stores the rendered definition. drop the rest for now. + Trigger { + name: t.name, + definition: t.definition, + } +} + +fn convert_view(v: PgView) -> View { + View { + schema: v.name.schema, + name: v.name.name, + definition: v.definition, + is_materialized: matches!(v.kind, ViewKind::Materialized), + comment: v.comment, + } +} + +fn convert_enum(e: PgEnum) -> EnumType { + EnumType { + schema: e.name.schema, + name: e.name.name, + labels: e.labels, + } +} + +fn convert_domain(d: PgDomain) -> DomainType { + DomainType { + schema: d.name.schema, + name: d.name.name, + base_type: d.base_type, + nullable: d.is_nullable, + default: d.default, + check_constraints: d.constraints, + } +} + +fn convert_composite(c: PgComposite) -> CompositeType { + let mut fields: Vec = Vec::with_capacity(c.attributes.len()); + for a in c.attributes { + fields.push(CompositeField { + name: a.name, + type_name: a.type_name, + }); + } + CompositeType { + schema: c.name.schema, + name: c.name.name, + fields, + } +} + +fn convert_function(f: PgFunction) -> Function { + let volatility = match f.volatility { + PgVol::Immutable => Volatility::Immutable, + PgVol::Stable => Volatility::Stable, + PgVol::Volatile => Volatility::Volatile, + }; + Function { + schema: f.name.schema, + name: f.name.name, + identity_args: f.identity_arguments, + return_type: f.return_type, + language: f.language, + volatility, + security_definer: f.security_definer, + comment: f.comment, + } +} + +fn convert_extension(e: PgExtension) -> Extension { + Extension { + name: e.name, + version: e.version, + schema: e.schema, + } +} diff --git a/crates/dry_run_core/src/schema/introspect/catalog.rs b/crates/dry_run_core/src/schema/introspect/catalog.rs deleted file mode 100644 index cc0800b..0000000 --- a/crates/dry_run_core/src/schema/introspect/catalog.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::collections::HashMap; - -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use super::super::types::*; -use crate::error::Result; - -pub(super) async fn fetch_enums(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT n.nspname AS schema_name, - t.typname AS type_name, - (SELECT array_agg(e.enumlabel ORDER BY e.enumsortorder) - FROM pg_catalog.pg_enum e - WHERE e.enumtypid = t.oid - ) AS labels - FROM pg_catalog.pg_type t - JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace - WHERE t.typtype = 'e' - AND n.nspname NOT IN ('pg_catalog', 'information_schema') - ORDER BY n.nspname, t.typname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| EnumType { - schema: r.get("schema_name"), - name: r.get("type_name"), - labels: r - .get::>, _>("labels") - .unwrap_or_default(), - }) - .collect()) -} - -pub(super) async fn fetch_domains(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT n.nspname AS schema_name, - t.typname AS type_name, - pg_catalog.format_type(t.typbasetype, t.typtypmod) AS base_type, - t.typnotnull AS notnull, - pg_catalog.pg_get_expr(t.typdefaultbin, 0) AS default_expr, - (SELECT array_agg(pg_catalog.pg_get_constraintdef(con.oid) ORDER BY con.conname) - FROM pg_catalog.pg_constraint con - WHERE con.contypid = t.oid - ) AS check_constraints - FROM pg_catalog.pg_type t - JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace - WHERE t.typtype = 'd' - AND n.nspname NOT IN ('pg_catalog', 'information_schema') - ORDER BY n.nspname, t.typname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| DomainType { - schema: r.get("schema_name"), - name: r.get("type_name"), - base_type: r.get("base_type"), - nullable: !r.get::("notnull"), - default: r.get("default_expr"), - check_constraints: r - .get::>, _>("check_constraints") - .unwrap_or_default(), - }) - .collect()) -} - -pub(super) async fn fetch_composites(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT n.nspname AS schema_name, - t.typname AS type_name, - a.attname AS field_name, - pg_catalog.format_type(a.atttypid, a.atttypmod) AS field_type - FROM pg_catalog.pg_type t - JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace - JOIN pg_catalog.pg_class c ON c.oid = t.typrelid - JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid - WHERE t.typtype = 'c' - AND c.relkind = 'c' - AND a.attnum > 0 - AND NOT a.attisdropped - AND n.nspname NOT IN ('pg_catalog', 'information_schema') - ORDER BY n.nspname, t.typname, a.attnum - "#, - ) - .fetch_all(pool) - .await?; - - let mut map: HashMap<(String, String), Vec> = HashMap::new(); - for r in &rows { - let key = ( - r.get::("schema_name"), - r.get::("type_name"), - ); - map.entry(key).or_default().push(CompositeField { - name: r.get("field_name"), - type_name: r.get("field_type"), - }); - } - - let mut composites: Vec = map - .into_iter() - .map(|((schema, name), fields)| CompositeType { - schema, - name, - fields, - }) - .collect(); - composites.sort_by(|a, b| (&a.schema, &a.name).cmp(&(&b.schema, &b.name))); - Ok(composites) -} diff --git a/crates/dry_run_core/src/schema/introspect/comments.rs b/crates/dry_run_core/src/schema/introspect/comments.rs deleted file mode 100644 index a4be1fb..0000000 --- a/crates/dry_run_core/src/schema/introspect/comments.rs +++ /dev/null @@ -1,62 +0,0 @@ -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use crate::error::Result; - -use super::raw_types::*; - -pub(super) async fn fetch_table_comments(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT d.objoid::int4 AS table_oid, - d.description AS comment - FROM pg_catalog.pg_description d - JOIN pg_catalog.pg_class c ON c.oid = d.objoid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE d.objsubid = 0 - AND c.relkind IN ('r', 'p') - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawTableComment { - table_oid: r.get::("table_oid") as u32, - comment: r.get("comment"), - }) - .collect()) -} - -pub(super) async fn fetch_column_comments(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT d.objoid::int4 AS table_oid, - a.attname AS column_name, - d.description AS comment - FROM pg_catalog.pg_description d - JOIN pg_catalog.pg_class c ON c.oid = d.objoid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - JOIN pg_catalog.pg_attribute a - ON a.attrelid = d.objoid AND a.attnum = d.objsubid - WHERE d.objsubid > 0 - AND c.relkind IN ('r', 'p') - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawColumnComment { - table_oid: r.get::("table_oid") as u32, - column_name: r.get("column_name"), - comment: r.get("comment"), - }) - .collect()) -} diff --git a/crates/dry_run_core/src/schema/introspect/indexes.rs b/crates/dry_run_core/src/schema/introspect/indexes.rs deleted file mode 100644 index 1188e93..0000000 --- a/crates/dry_run_core/src/schema/introspect/indexes.rs +++ /dev/null @@ -1,78 +0,0 @@ -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use crate::error::Result; - -use super::raw_types::*; - -pub(super) async fn fetch_indexes(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT i.indrelid::int4 AS table_oid, - ci.relname AS index_name, - am.amname AS index_type, - i.indisunique AS is_unique, - i.indisprimary AS is_primary, - pg_catalog.pg_get_expr(i.indpred, i.indrelid) AS predicate, - pg_catalog.pg_get_indexdef(i.indexrelid) AS definition, - i.indisvalid AS is_valid, - i.indnkeyatts AS n_key_atts, - -- check when index backs a UNIQUE/PK/EXCLUSION constraint - EXISTS ( - SELECT 1 FROM pg_catalog.pg_constraint con - WHERE con.conindid = i.indexrelid - ) AS backs_constraint, - -- All column names (key + include) - (SELECT array_agg(a.attname ORDER BY ord.n) - FROM unnest(i.indkey) WITH ORDINALITY AS ord(attnum, n) - JOIN pg_catalog.pg_attribute a - ON a.attrelid = i.indrelid AND a.attnum = ord.attnum - WHERE ord.attnum > 0 - ) AS all_col_names, - array_length(i.indkey, 1) AS total_cols - FROM pg_catalog.pg_index i - JOIN pg_catalog.pg_class ci ON ci.oid = i.indexrelid - JOIN pg_catalog.pg_class ct ON ct.oid = i.indrelid - JOIN pg_catalog.pg_namespace n ON n.oid = ct.relnamespace - JOIN pg_catalog.pg_am am ON am.oid = ci.relam - WHERE n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - AND NOT EXISTS (SELECT 1 FROM pg_inherits inh WHERE inh.inhrelid = i.indexrelid) - ORDER BY i.indrelid, ci.relname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| { - let all_cols: Vec = r - .get::>, _>("all_col_names") - .unwrap_or_default(); - let n_key_atts = r.get::("n_key_atts") as usize; - let (key_cols, include_cols) = if n_key_atts > 0 && n_key_atts <= all_cols.len() { - ( - all_cols[..n_key_atts].to_vec(), - all_cols[n_key_atts..].to_vec(), - ) - } else { - (all_cols, vec![]) - }; - - RawIndex { - table_oid: r.get::("table_oid") as u32, - name: r.get("index_name"), - columns: key_cols, - include_columns: include_cols, - index_type: r.get("index_type"), - is_unique: r.get("is_unique"), - is_primary: r.get("is_primary"), - predicate: r.get("predicate"), - definition: r.get("definition"), - is_valid: r.get("is_valid"), - backs_constraint: r.get("backs_constraint"), - } - }) - .collect()) -} diff --git a/crates/dry_run_core/src/schema/introspect/mod.rs b/crates/dry_run_core/src/schema/introspect/mod.rs index 599853c..7a8b44e 100644 --- a/crates/dry_run_core/src/schema/introspect/mod.rs +++ b/crates/dry_run_core/src/schema/introspect/mod.rs @@ -1,22 +1,13 @@ -mod catalog; -mod comments; -mod indexes; -mod objects; -mod partitions; -mod policies; -mod raw_types; mod stats; -mod tables; - -use std::collections::HashMap; use chrono::Utc; +use pg_introspect::IntrospectOptions; +use sha2::{Digest, Sha256}; use sqlx::postgres::PgRow; use sqlx::{PgPool, Row}; use tracing::info; -use sha2::{Digest, Sha256}; - +use super::from_pg_introspect::catalog_to_snapshot_parts; use super::hash::{HashInput, compute_content_hash}; use super::snapshot::*; use super::types::*; @@ -26,71 +17,26 @@ pub async fn introspect_schema(pool: &PgPool) -> Result { let pg_version: String = sqlx::query_scalar("SELECT version()") .fetch_one(pool) .await?; - let database: String = sqlx::query_scalar("SELECT current_database()") .fetch_one(pool) .await?; - // Group 1: table-centric data. Stats now live in PlannerStatsSnapshot / - // ActivityStatsSnapshot; introspect_schema is DDL-only. - let ( - raw_tables, - raw_columns, - raw_constraints, - table_comments, - column_comments, - raw_indexes, - raw_partitions, - raw_partition_children, - raw_policies, - raw_triggers, - ) = tokio::try_join!( - tables::fetch_tables(pool), - tables::fetch_columns(pool), - tables::fetch_constraints(pool), - comments::fetch_table_comments(pool), - comments::fetch_column_comments(pool), - indexes::fetch_indexes(pool), - partitions::fetch_partition_info(pool), - partitions::fetch_partition_children(pool), - policies::fetch_policies(pool), - policies::fetch_triggers(pool), - )?; + let cat = pg_introspect::introspect(pool, &IntrospectOptions::default()) + .await + .map_err(|e| Error::Introspection(format!("pg_introspect: {e}")))?; + let parts = catalog_to_snapshot_parts(cat); - // Group 2: top-level objects. - let (enums, domains, composites, views, functions, extensions, gucs, _is_standby) = tokio::try_join!( - catalog::fetch_enums(pool), - catalog::fetch_domains(pool), - catalog::fetch_composites(pool), - objects::fetch_views(pool), - objects::fetch_functions(pool), - objects::fetch_extensions(pool), - objects::fetch_gucs(pool), - fetch_is_standby(pool), - )?; - - let tables = assemble_tables( - raw_tables, - raw_columns, - raw_constraints, - table_comments, - column_comments, - raw_indexes, - raw_partitions, - raw_partition_children, - raw_policies, - raw_triggers, - ); + let gucs = fetch_gucs(pool).await?; let content_hash = compute_content_hash(&HashInput { pg_version: &pg_version, - tables: &tables, - enums: &enums, - domains: &domains, - composites: &composites, - views: &views, - functions: &functions, - extensions: &extensions, + tables: &parts.tables, + enums: &parts.enums, + domains: &parts.domains, + composites: &parts.composites, + views: &parts.views, + functions: &parts.functions, + extensions: &parts.extensions, }); let snapshot = SchemaSnapshot { @@ -99,13 +45,13 @@ pub async fn introspect_schema(pool: &PgPool) -> Result { timestamp: Utc::now(), content_hash, source: None, - tables, - enums, - domains, - composites, - views, - functions, - extensions, + tables: parts.tables, + enums: parts.enums, + domains: parts.domains, + composites: parts.composites, + views: parts.views, + functions: parts.functions, + extensions: parts.extensions, gucs, }; @@ -124,6 +70,35 @@ pub async fn introspect_schema(pool: &PgPool) -> Result { Ok(snapshot) } +async fn fetch_gucs(pool: &PgPool) -> Result> { + let rows: Vec = sqlx::query( + r#" + SELECT name, setting, unit + FROM pg_catalog.pg_settings + WHERE name IN ( + 'work_mem', 'effective_cache_size', 'random_page_cost', + 'seq_page_cost', 'effective_io_concurrency', 'shared_buffers', + 'maintenance_work_mem', 'default_statistics_target', + 'autovacuum', 'autovacuum_vacuum_threshold', + 'autovacuum_vacuum_scale_factor', 'autovacuum_analyze_threshold', + 'autovacuum_analyze_scale_factor' + ) + ORDER BY name + "#, + ) + .fetch_all(pool) + .await?; + + Ok(rows + .iter() + .map(|r| GucSetting { + name: r.get("name"), + setting: r.get("setting"), + unit: r.get("unit"), + }) + .collect()) +} + pub async fn fetch_is_standby(pool: &PgPool) -> Result { let row: PgRow = sqlx::query("SELECT pg_catalog.pg_is_in_recovery() AS is_standby") .fetch_one(pool) @@ -131,8 +106,6 @@ pub async fn fetch_is_standby(pool: &PgPool) -> Result { Ok(row.get("is_standby")) } -// Snapshot split: planner-only and per-node activity captures - pub async fn introspect_planner_stats( pool: &PgPool, schema_ref_hash: &str, @@ -261,179 +234,6 @@ fn hash_payload(value: &T) -> Result { Ok(format!("{digest:x}")) } -// --------------------------------------------------------------------------- -// Assembly: merge parts into Table structs -// --------------------------------------------------------------------------- - -use raw_types::*; - -#[allow(clippy::too_many_arguments)] -fn assemble_tables( - raw_tables: Vec, - raw_columns: Vec, - raw_constraints: Vec, - table_comments: Vec, - column_comments: Vec, - raw_indexes: Vec, - raw_partitions: Vec, - raw_partition_children: Vec, - raw_policies: Vec, - raw_triggers: Vec, -) -> Vec
{ - // --- Columns --- - let mut columns_by_oid: HashMap> = HashMap::new(); - for rc in raw_columns { - columns_by_oid - .entry(rc.table_oid) - .or_default() - .push(Column { - name: rc.name, - ordinal: rc.ordinal, - type_name: rc.type_name, - nullable: rc.nullable, - default: rc.default, - identity: rc.identity, - generated: rc.generated, - comment: None, - statistics_target: rc.statistics_target, - }); - } - - // --- Constraints --- - let mut constraints_by_oid: HashMap> = HashMap::new(); - for rc in raw_constraints { - let kind = match ConstraintKind::from_pg_contype(&rc.contype) { - Some(k) => k, - None => continue, - }; - constraints_by_oid - .entry(rc.table_oid) - .or_default() - .push(Constraint { - name: rc.name, - kind, - columns: rc.columns, - definition: rc.definition, - fk_table: rc.fk_table, - fk_columns: rc.fk_columns, - backing_index: rc.backing_index, - comment: rc.comment, - }); - } - - // --- Table comments --- - let table_comment_map: HashMap = table_comments - .into_iter() - .map(|tc| (tc.table_oid, tc.comment)) - .collect(); - - // --- Column comments --- - let col_comment_map: HashMap<(u32, String), String> = column_comments - .into_iter() - .map(|cc| ((cc.table_oid, cc.column_name), cc.comment)) - .collect(); - - for (oid, cols) in &mut columns_by_oid { - for col in cols.iter_mut() { - if let Some(comment) = col_comment_map.get(&(*oid, col.name.clone())) { - col.comment = Some(comment.clone()); - } - } - } - - // --- Indexes --- - let mut indexes_by_oid: HashMap> = HashMap::new(); - for ri in raw_indexes { - indexes_by_oid.entry(ri.table_oid).or_default().push(Index { - name: ri.name, - columns: ri.columns, - include_columns: ri.include_columns, - index_type: ri.index_type, - is_unique: ri.is_unique, - is_primary: ri.is_primary, - predicate: ri.predicate, - definition: ri.definition, - is_valid: ri.is_valid, - backs_constraint: ri.backs_constraint, - }); - } - - // --- Partition info --- - let mut children_by_parent: HashMap> = HashMap::new(); - for pc in raw_partition_children { - children_by_parent - .entry(pc.parent_oid) - .or_default() - .push(PartitionChild { - schema: pc.schema, - name: pc.name, - bound: pc.bound, - }); - } - - let partition_info_by_oid: HashMap = raw_partitions - .into_iter() - .filter_map(|rp| { - let strategy = PartitionStrategy::from_pg_partstrat(&rp.strategy)?; - Some(( - rp.table_oid, - PartitionInfo { - strategy, - key: rp.key, - children: children_by_parent.remove(&rp.table_oid).unwrap_or_default(), - }, - )) - }) - .collect(); - - // --- Policies --- - let mut policies_by_oid: HashMap> = HashMap::new(); - for rp in raw_policies { - policies_by_oid - .entry(rp.table_oid) - .or_default() - .push(RlsPolicy { - name: rp.name, - command: rp.command, - permissive: rp.permissive, - roles: rp.roles, - using_expr: rp.using_expr, - with_check_expr: rp.with_check_expr, - }); - } - - // --- Triggers --- - let mut triggers_by_oid: HashMap> = HashMap::new(); - for rt in raw_triggers { - triggers_by_oid - .entry(rt.table_oid) - .or_default() - .push(Trigger { - name: rt.name, - definition: rt.definition, - }); - } - - // --- Assemble --- - raw_tables - .into_iter() - .map(|rt| Table { - oid: rt.oid, - schema: rt.schema, - name: rt.name, - columns: columns_by_oid.remove(&rt.oid).unwrap_or_default(), - constraints: constraints_by_oid.remove(&rt.oid).unwrap_or_default(), - indexes: indexes_by_oid.remove(&rt.oid).unwrap_or_default(), - comment: table_comment_map.get(&rt.oid).cloned(), - partition_info: partition_info_by_oid.get(&rt.oid).cloned(), - policies: policies_by_oid.remove(&rt.oid).unwrap_or_default(), - triggers: triggers_by_oid.remove(&rt.oid).unwrap_or_default(), - reloptions: rt.reloptions, - rls_enabled: rt.rls_enabled, - }) - .collect() -} - #[cfg(test)] mod tests { use chrono::TimeZone; diff --git a/crates/dry_run_core/src/schema/introspect/objects.rs b/crates/dry_run_core/src/schema/introspect/objects.rs deleted file mode 100644 index 0a12676..0000000 --- a/crates/dry_run_core/src/schema/introspect/objects.rs +++ /dev/null @@ -1,137 +0,0 @@ -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use super::super::types::*; -use crate::error::Result; - -pub(super) async fn fetch_views(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT n.nspname AS schema_name, - c.relname AS view_name, - c.relkind = 'm' AS is_materialized, - pg_catalog.pg_get_viewdef(c.oid, true) AS definition, - d.description AS comment - FROM pg_catalog.pg_class c - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - LEFT JOIN pg_catalog.pg_description d - ON d.objoid = c.oid AND d.objsubid = 0 - WHERE c.relkind IN ('v', 'm') - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - ORDER BY n.nspname, c.relname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| View { - schema: r.get("schema_name"), - name: r.get("view_name"), - definition: r.get::, _>("definition").unwrap_or_default(), - is_materialized: r.get("is_materialized"), - comment: r.get("comment"), - }) - .collect()) -} - -pub(super) async fn fetch_functions(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT n.nspname AS schema_name, - p.proname AS func_name, - pg_catalog.pg_get_function_identity_arguments(p.oid) AS identity_args, - pg_catalog.pg_get_function_result(p.oid) AS return_type, - l.lanname AS language, - p.provolatile::text AS volatility, - p.prosecdef AS security_definer, - d.description AS comment - FROM pg_catalog.pg_proc p - JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace - JOIN pg_catalog.pg_language l ON l.oid = p.prolang - LEFT JOIN pg_catalog.pg_description d - ON d.objoid = p.oid AND d.objsubid = 0 - WHERE p.prokind IN ('f', 'p') - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - ORDER BY n.nspname, p.proname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| { - let vol_str: String = r.get("volatility"); - Function { - schema: r.get("schema_name"), - name: r.get("func_name"), - identity_args: r.get("identity_args"), - return_type: r - .get::, _>("return_type") - .unwrap_or_default(), - language: r.get("language"), - volatility: Volatility::from_pg_provolatile(&vol_str) - .unwrap_or(Volatility::Volatile), - security_definer: r.get("security_definer"), - comment: r.get("comment"), - } - }) - .collect()) -} - -pub(super) async fn fetch_extensions(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT e.extname AS ext_name, - e.extversion AS ext_version, - n.nspname AS schema_name - FROM pg_catalog.pg_extension e - JOIN pg_catalog.pg_namespace n ON n.oid = e.extnamespace - ORDER BY e.extname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| Extension { - name: r.get("ext_name"), - version: r.get("ext_version"), - schema: r.get("schema_name"), - }) - .collect()) -} - -pub(super) async fn fetch_gucs(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT name, setting, unit - FROM pg_catalog.pg_settings - WHERE name IN ( - 'work_mem', 'effective_cache_size', 'random_page_cost', - 'seq_page_cost', 'effective_io_concurrency', 'shared_buffers', - 'maintenance_work_mem', 'default_statistics_target', - 'autovacuum', 'autovacuum_vacuum_threshold', - 'autovacuum_vacuum_scale_factor', 'autovacuum_analyze_threshold', - 'autovacuum_analyze_scale_factor' - ) - ORDER BY name - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| GucSetting { - name: r.get("name"), - setting: r.get("setting"), - unit: r.get("unit"), - }) - .collect()) -} diff --git a/crates/dry_run_core/src/schema/introspect/partitions.rs b/crates/dry_run_core/src/schema/introspect/partitions.rs deleted file mode 100644 index e6e7f64..0000000 --- a/crates/dry_run_core/src/schema/introspect/partitions.rs +++ /dev/null @@ -1,61 +0,0 @@ -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use crate::error::Result; - -use super::raw_types::*; - -pub(super) async fn fetch_partition_info(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT pt.partrelid::int4 AS table_oid, - pt.partstrat::text AS strategy, - pg_catalog.pg_get_partkeydef(pt.partrelid) AS part_key - FROM pg_catalog.pg_partitioned_table pt - JOIN pg_catalog.pg_class c ON c.oid = pt.partrelid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawPartitionInfo { - table_oid: r.get::("table_oid") as u32, - strategy: r.get("strategy"), - key: r.get("part_key"), - }) - .collect()) -} - -pub(super) async fn fetch_partition_children(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT inh.inhparent::int4 AS parent_oid, - n.nspname AS schema_name, - c.relname AS table_name, - pg_catalog.pg_get_expr(c.relpartbound, c.oid) AS bound - FROM pg_catalog.pg_inherits inh - JOIN pg_catalog.pg_class c ON c.oid = inh.inhrelid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE c.relispartition - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - ORDER BY inh.inhparent, c.relname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawPartitionChild { - parent_oid: r.get::("parent_oid") as u32, - schema: r.get("schema_name"), - name: r.get("table_name"), - bound: r.get::, _>("bound").unwrap_or_default(), - }) - .collect()) -} diff --git a/crates/dry_run_core/src/schema/introspect/policies.rs b/crates/dry_run_core/src/schema/introspect/policies.rs deleted file mode 100644 index 760a5b1..0000000 --- a/crates/dry_run_core/src/schema/introspect/policies.rs +++ /dev/null @@ -1,78 +0,0 @@ -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use crate::error::Result; - -use super::raw_types::*; - -pub(super) async fn fetch_policies(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT pol.polrelid::int4 AS table_oid, - pol.polname AS policy_name, - CASE pol.polcmd - WHEN 'r' THEN 'SELECT' - WHEN 'a' THEN 'INSERT' - WHEN 'w' THEN 'UPDATE' - WHEN 'd' THEN 'DELETE' - WHEN '*' THEN 'ALL' - ELSE pol.polcmd::text - END AS command, - pol.polpermissive AS permissive, - (SELECT array_agg(r.rolname) - FROM unnest(pol.polroles) AS rid(oid) - JOIN pg_catalog.pg_roles r ON r.oid = rid.oid - ) AS roles, - pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS using_expr, - pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS with_check_expr - FROM pg_catalog.pg_policy pol - JOIN pg_catalog.pg_class c ON c.oid = pol.polrelid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - ORDER BY pol.polrelid, pol.polname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawPolicy { - table_oid: r.get::("table_oid") as u32, - name: r.get("policy_name"), - command: r.get("command"), - permissive: r.get("permissive"), - roles: r.get::>, _>("roles").unwrap_or_default(), - using_expr: r.get("using_expr"), - with_check_expr: r.get("with_check_expr"), - }) - .collect()) -} - -pub(super) async fn fetch_triggers(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT t.tgrelid::int4 AS table_oid, - t.tgname AS trigger_name, - pg_catalog.pg_get_triggerdef(t.oid) AS definition - FROM pg_catalog.pg_trigger t - JOIN pg_catalog.pg_class c ON c.oid = t.tgrelid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE NOT t.tgisinternal - AND t.tgparentid = 0 - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - ORDER BY t.tgrelid, t.tgname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawTrigger { - table_oid: r.get::("table_oid") as u32, - name: r.get("trigger_name"), - definition: r.get("definition"), - }) - .collect()) -} diff --git a/crates/dry_run_core/src/schema/introspect/raw_types.rs b/crates/dry_run_core/src/schema/introspect/raw_types.rs deleted file mode 100644 index 51b4a20..0000000 --- a/crates/dry_run_core/src/schema/introspect/raw_types.rs +++ /dev/null @@ -1,85 +0,0 @@ -pub(super) struct RawTable { - pub oid: u32, - pub schema: String, - pub name: String, - pub rls_enabled: bool, - pub reloptions: Vec, -} - -pub(super) struct RawColumn { - pub table_oid: u32, - pub name: String, - pub ordinal: i16, - pub type_name: String, - pub nullable: bool, - pub default: Option, - pub identity: Option, - pub generated: Option, - pub statistics_target: Option, -} - -pub(super) struct RawConstraint { - pub table_oid: u32, - pub name: String, - pub contype: String, - pub columns: Vec, - pub definition: Option, - pub fk_table: Option, - pub fk_columns: Vec, - pub backing_index: Option, - pub comment: Option, -} - -pub(super) struct RawTableComment { - pub table_oid: u32, - pub comment: String, -} - -pub(super) struct RawColumnComment { - pub table_oid: u32, - pub column_name: String, - pub comment: String, -} - -pub(super) struct RawIndex { - pub table_oid: u32, - pub name: String, - pub columns: Vec, - pub include_columns: Vec, - pub index_type: String, - pub is_unique: bool, - pub is_primary: bool, - pub predicate: Option, - pub definition: String, - pub is_valid: bool, - pub backs_constraint: bool, -} - -pub(super) struct RawPartitionInfo { - pub table_oid: u32, - pub strategy: String, - pub key: String, -} - -pub(super) struct RawPartitionChild { - pub parent_oid: u32, - pub schema: String, - pub name: String, - pub bound: String, -} - -pub(super) struct RawPolicy { - pub table_oid: u32, - pub name: String, - pub command: String, - pub permissive: bool, - pub roles: Vec, - pub using_expr: Option, - pub with_check_expr: Option, -} - -pub(super) struct RawTrigger { - pub table_oid: u32, - pub name: String, - pub definition: String, -} diff --git a/crates/dry_run_core/src/schema/introspect/tables.rs b/crates/dry_run_core/src/schema/introspect/tables.rs deleted file mode 100644 index 80e1f2a..0000000 --- a/crates/dry_run_core/src/schema/introspect/tables.rs +++ /dev/null @@ -1,150 +0,0 @@ -use sqlx::postgres::PgRow; -use sqlx::{PgPool, Row}; - -use crate::error::Result; - -use super::raw_types::*; - -pub(super) async fn fetch_tables(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT c.oid::int4 AS oid, - n.nspname AS schema_name, - c.relname AS table_name, - c.relrowsecurity AS rls_enabled, - COALESCE(c.reloptions, '{}') AS reloptions - FROM pg_catalog.pg_class c - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - WHERE c.relkind IN ('r', 'p') - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - ORDER BY n.nspname, c.relname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawTable { - oid: r.get::("oid") as u32, - schema: r.get("schema_name"), - name: r.get("table_name"), - rls_enabled: r.get("rls_enabled"), - reloptions: r.get::, _>("reloptions"), - }) - .collect()) -} - -pub(super) async fn fetch_columns(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT a.attrelid::int4 AS table_oid, - a.attname AS column_name, - a.attnum AS ordinal, - pg_catalog.format_type(a.atttypid, a.atttypmod) AS type_name, - NOT a.attnotnull AS nullable, - pg_catalog.pg_get_expr(d.adbin, d.adrelid) AS default_expr, - CASE a.attidentity - WHEN 'a' THEN 'always' - WHEN 'd' THEN 'by_default' - ELSE NULL - END AS identity, - NULLIF(a.attstattarget, -1)::int2 AS statistics_target, - CASE a.attgenerated - WHEN 's' THEN 'stored' - ELSE NULL - END AS generated - FROM pg_catalog.pg_attribute a - JOIN pg_catalog.pg_class c ON c.oid = a.attrelid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - LEFT JOIN pg_catalog.pg_attrdef d ON d.adrelid = a.attrelid AND d.adnum = a.attnum - WHERE a.attnum > 0 - AND NOT a.attisdropped - AND c.relkind IN ('r', 'p') - AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - ORDER BY a.attrelid, a.attnum - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawColumn { - table_oid: r.get::("table_oid") as u32, - name: r.get("column_name"), - ordinal: r.get("ordinal"), - type_name: r.get("type_name"), - nullable: r.get("nullable"), - default: r.get("default_expr"), - identity: r.get("identity"), - generated: r.get("generated"), - statistics_target: r.get("statistics_target"), - }) - .collect()) -} - -pub(super) async fn fetch_constraints(pool: &PgPool) -> Result> { - let rows: Vec = sqlx::query( - r#" - SELECT con.conrelid::int4 AS table_oid, - con.conname AS constraint_name, - con.contype::text AS contype, - pg_catalog.pg_get_constraintdef(con.oid) AS definition, - (SELECT array_agg(a.attname ORDER BY ord.n) - FROM unnest(con.conkey) WITH ORDINALITY AS ord(attnum, n) - JOIN pg_catalog.pg_attribute a - ON a.attrelid = con.conrelid AND a.attnum = ord.attnum - ) AS col_names, - CASE WHEN con.contype = 'f' THEN - (SELECT n2.nspname || '.' || c2.relname - FROM pg_catalog.pg_class c2 - JOIN pg_catalog.pg_namespace n2 ON n2.oid = c2.relnamespace - WHERE c2.oid = con.confrelid) - END AS fk_table, - CASE WHEN con.contype = 'f' THEN - (SELECT array_agg(a.attname ORDER BY ord.n) - FROM unnest(con.confkey) WITH ORDINALITY AS ord(attnum, n) - JOIN pg_catalog.pg_attribute a - ON a.attrelid = con.confrelid AND a.attnum = ord.attnum - ) - END AS fk_col_names, - ci.relname::text AS backing_index, - d.description AS comment - FROM pg_catalog.pg_constraint con - JOIN pg_catalog.pg_class c ON c.oid = con.conrelid - JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace - LEFT JOIN pg_catalog.pg_class ci - ON ci.oid = con.conindid - LEFT JOIN pg_catalog.pg_description d - ON d.objoid = con.oid AND d.objsubid = 0 - WHERE n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') - AND n.nspname NOT LIKE 'pg_temp_%' - AND con.conislocal - ORDER BY con.conrelid, con.conname - "#, - ) - .fetch_all(pool) - .await?; - - Ok(rows - .iter() - .map(|r| RawConstraint { - table_oid: r.get::("table_oid") as u32, - name: r.get("constraint_name"), - contype: r.get("contype"), - columns: r - .get::>, _>("col_names") - .unwrap_or_default(), - definition: r.get("definition"), - fk_table: r.get("fk_table"), - fk_columns: r - .get::>, _>("fk_col_names") - .unwrap_or_default(), - backing_index: r.get("backing_index"), - comment: r.get("comment"), - }) - .collect()) -} diff --git a/crates/dry_run_core/src/schema/mod.rs b/crates/dry_run_core/src/schema/mod.rs index 0d6366a..9c17165 100644 --- a/crates/dry_run_core/src/schema/mod.rs +++ b/crates/dry_run_core/src/schema/mod.rs @@ -1,4 +1,5 @@ pub mod bloat; +mod from_pg_introspect; mod hash; mod introspect; pub mod profile; From 764415326123c2704a4dabd644f227858b1e2284 Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Wed, 6 May 2026 21:54:59 +0200 Subject: [PATCH 2/5] fix: order by conname for 100% snapshot compatibility --- crates/dry_run_core/src/schema/from_pg_introspect.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/dry_run_core/src/schema/from_pg_introspect.rs b/crates/dry_run_core/src/schema/from_pg_introspect.rs index 9694c1f..9523bfc 100644 --- a/crates/dry_run_core/src/schema/from_pg_introspect.rs +++ b/crates/dry_run_core/src/schema/from_pg_introspect.rs @@ -1,4 +1,4 @@ -use pg_introspect::{ +crates/dry_run_core/src/schema/from_pg_introspect.rsuse pg_introspect::{ Catalog as PgCatalog, CheckConstraint as PgCheck, Column as PgColumn, CompositeType as PgComposite, DomainType as PgDomain, EnumType as PgEnum, ExclusionConstraint as PgExclusion, Extension as PgExtension, ForeignKey as PgFk, @@ -68,6 +68,8 @@ fn convert_table(t: PgTable) -> Table { for x in t.exclusion_constraints { constraints.push(convert_exclusion(x)); } + // match the old ORDER BY conname so content_hash stays stable + constraints.sort_by(|a, b| a.name.cmp(&b.name)); let mut cols: Vec = Vec::with_capacity(t.columns.len()); for (_, c) in t.columns { From 588a8b47d5513207eef9cdb7f4c864736025baa9 Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Wed, 6 May 2026 21:56:10 +0200 Subject: [PATCH 3/5] fix: typo --- crates/dry_run_core/src/schema/from_pg_introspect.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/dry_run_core/src/schema/from_pg_introspect.rs b/crates/dry_run_core/src/schema/from_pg_introspect.rs index 9523bfc..10fb4ac 100644 --- a/crates/dry_run_core/src/schema/from_pg_introspect.rs +++ b/crates/dry_run_core/src/schema/from_pg_introspect.rs @@ -1,4 +1,4 @@ -crates/dry_run_core/src/schema/from_pg_introspect.rsuse pg_introspect::{ +use pg_introspect::{ Catalog as PgCatalog, CheckConstraint as PgCheck, Column as PgColumn, CompositeType as PgComposite, DomainType as PgDomain, EnumType as PgEnum, ExclusionConstraint as PgExclusion, Extension as PgExtension, ForeignKey as PgFk, From 4251f4c4fc73678a9c1ad12c0ceea18db845a22f Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Wed, 6 May 2026 22:03:38 +0200 Subject: [PATCH 4/5] chore: lock in pg_introspect v0.2.0 --- Cargo.lock | 8 +++++--- Cargo.toml | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d6a50c7..5137e50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1531,7 +1531,9 @@ dependencies = [ [[package]] name = "pg_introspect" -version = "0.1.0" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f7c26d6c97143ecd713d32e06aaadc9aa37b0df5be8cadcb64241a33a824dfb" dependencies = [ "indexmap", "serde", @@ -2803,9 +2805,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.9" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a28f0d049ccfaa566e14e9663d304d8577427b368cb4710a20528690287a738b" +checksum = "68d6fdd9f81c2819c9a8b0e0cd91660e7746a8e6ea2ba7c6b2b057985f6bcb51" dependencies = [ "bitflags", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 3e3396e..5f3b066 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ dbg_macro = "deny" [workspace.dependencies] dry_run_core = { path = "crates/dry_run_core" } -pg_introspect = { path = "../pg_introspect" } +pg_introspect = "0.2.0" async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } clap = { version = "4", features = ["derive", "env"] } From 9a9c8f16ae95bb6606a009656a9ba0269ae322fc Mon Sep 17 00:00:00 2001 From: Radim Marek Date: Wed, 6 May 2026 22:28:18 +0200 Subject: [PATCH 5/5] =?UTF-8?q?test:=20cover=20pg=5Fintrospect=20=E2=86=92?= =?UTF-8?q?=20snapshot=20conversion?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds enum-mapping tests and a golden fixture catalog with a frozen content_hash to guard against silent drift in field ordering or upstream pg_introspect changes that would invalidate stored snapshots. Also runs cargo fmt across the workspace (server.rs). Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 1 + crates/dry_run_cli/src/mcp/server.rs | 6 +- crates/dry_run_core/Cargo.toml | 1 + .../src/schema/from_pg_introspect.rs | 4 + .../src/schema/from_pg_introspect_tests.rs | 420 ++++++++++++++++++ 5 files changed, 431 insertions(+), 1 deletion(-) create mode 100644 crates/dry_run_core/src/schema/from_pg_introspect_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 5137e50..9ae2835 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -532,6 +532,7 @@ version = "0.6.1" dependencies = [ "async-trait", "chrono", + "indexmap", "pg_introspect", "pg_query", "regex", diff --git a/crates/dry_run_cli/src/mcp/server.rs b/crates/dry_run_cli/src/mcp/server.rs index c89fa64..956a9d0 100644 --- a/crates/dry_run_cli/src/mcp/server.rs +++ b/crates/dry_run_cli/src/mcp/server.rs @@ -1554,7 +1554,11 @@ impl DryRunServer { annotated.schema.tables.len(), annotated.schema.views.len(), annotated.schema.functions.len(), - if annotated.planner.is_some() { "yes" } else { "no" }, + if annotated.planner.is_some() { + "yes" + } else { + "no" + }, annotated.activity_by_node.len(), ); *self.schema.write().await = Some(annotated); diff --git a/crates/dry_run_core/Cargo.toml b/crates/dry_run_core/Cargo.toml index 37dd5e4..44595f6 100644 --- a/crates/dry_run_core/Cargo.toml +++ b/crates/dry_run_core/Cargo.toml @@ -21,5 +21,6 @@ tracing = { workspace = true } zstd = { workspace = true } [dev-dependencies] +indexmap = "2" tokio = { workspace = true } tempfile = "3" diff --git a/crates/dry_run_core/src/schema/from_pg_introspect.rs b/crates/dry_run_core/src/schema/from_pg_introspect.rs index 10fb4ac..2a6f71a 100644 --- a/crates/dry_run_core/src/schema/from_pg_introspect.rs +++ b/crates/dry_run_core/src/schema/from_pg_introspect.rs @@ -317,3 +317,7 @@ fn convert_extension(e: PgExtension) -> Extension { schema: e.schema, } } + +#[cfg(test)] +#[path = "from_pg_introspect_tests.rs"] +mod tests; diff --git a/crates/dry_run_core/src/schema/from_pg_introspect_tests.rs b/crates/dry_run_core/src/schema/from_pg_introspect_tests.rs new file mode 100644 index 0000000..53552d8 --- /dev/null +++ b/crates/dry_run_core/src/schema/from_pg_introspect_tests.rs @@ -0,0 +1,420 @@ +use indexmap::IndexMap; +use pg_introspect::{ + Catalog as PgCatalog, CheckConstraint as PgCheck, Column as PgColumn, + CompositeAttribute as PgCompAttr, CompositeType as PgComposite, DomainType as PgDomain, + EnumType as PgEnum, ExclusionConstraint as PgExclusion, Extension as PgExtension, FkAction, + FkMatch, ForeignKey as PgFk, Function as PgFunction, FunctionKind, GeneratedKind, IdentityKind, + Index as PgIndex, PartitionChild as PgPartChild, PartitionInfo as PgPartInfo, + PartitionStrategy as PgPartStrat, PolicyCommand, PrimaryKey as PgPrimaryKey, QualifiedName, + RlsPolicy as PgPolicy, Table as PgTable, Trigger as PgTrigger, TriggerEvent, + TriggerOrientation, TriggerTiming, UniqueConstraint as PgUnique, View as PgView, ViewKind, + Volatility as PgVol, +}; + +use super::super::hash::{HashInput, compute_content_hash}; +use super::super::types::{ConstraintKind, PartitionStrategy, Volatility}; +use super::*; + +fn qn(schema: &str, name: &str) -> QualifiedName { + QualifiedName { + schema: schema.into(), + name: name.into(), + } +} + +fn col(name: &str, ordinal: i16, type_name: &str) -> PgColumn { + PgColumn { + name: name.into(), + type_name: type_name.into(), + ordinal, + is_nullable: false, + is_primary_key: false, + is_foreign_key: false, + is_unique: false, + identity: None, + generated: None, + statistics_target: None, + default: None, + comment: None, + } +} + +// ── enum / variant mappings ─────────────────────────────────────────────── + +#[test] +fn identity_kind_maps_to_pg_attribute_codes() { + let cases: &[(IdentityKind, &str)] = + &[(IdentityKind::Always, "a"), (IdentityKind::ByDefault, "d")]; + for (kind, expected) in cases { + let mut c = col("c", 1, "int"); + c.identity = Some(*kind); + assert_eq!(convert_column(c).identity.as_deref(), Some(*expected)); + } +} + +#[test] +fn generated_kind_maps_to_pg_attribute_codes() { + let cases: &[(GeneratedKind, &str)] = + &[(GeneratedKind::Stored, "s"), (GeneratedKind::Virtual, "v")]; + for (kind, expected) in cases { + let mut c = col("c", 1, "int"); + c.generated = Some(*kind); + assert_eq!(convert_column(c).generated.as_deref(), Some(*expected)); + } +} + +#[test] +fn column_without_identity_or_generated_stays_none() { + let c = convert_column(col("c", 1, "int")); + assert!(c.identity.is_none()); + assert!(c.generated.is_none()); +} + +#[test] +fn policy_command_maps_to_uppercase_strings() { + let cases: &[(PolicyCommand, &str)] = &[ + (PolicyCommand::All, "ALL"), + (PolicyCommand::Select, "SELECT"), + (PolicyCommand::Insert, "INSERT"), + (PolicyCommand::Update, "UPDATE"), + (PolicyCommand::Delete, "DELETE"), + ]; + for (cmd, expected) in cases { + let p = PgPolicy { + name: "p".into(), + command: *cmd, + permissive: true, + roles: vec!["public".into()], + using_expr: None, + with_check_expr: None, + }; + assert_eq!(convert_policy(p).command, *expected); + } +} + +#[test] +fn volatility_maps_to_internal_enum() { + let cases: &[(PgVol, Volatility)] = &[ + (PgVol::Immutable, Volatility::Immutable), + (PgVol::Stable, Volatility::Stable), + (PgVol::Volatile, Volatility::Volatile), + ]; + for (pg_vol, expected) in cases { + let f = PgFunction { + name: qn("public", "f"), + kind: FunctionKind::Function, + language: "sql".into(), + volatility: *pg_vol, + security_definer: false, + arguments: String::new(), + identity_arguments: String::new(), + return_type: "int".into(), + comment: None, + }; + assert_eq!(convert_function(f).volatility, *expected); + } +} + +#[test] +fn partition_strategy_maps_to_internal_enum() { + let cases: &[(PgPartStrat, PartitionStrategy)] = &[ + (PgPartStrat::Range, PartitionStrategy::Range), + (PgPartStrat::List, PartitionStrategy::List), + (PgPartStrat::Hash, PartitionStrategy::Hash), + ]; + for (pg_strat, expected) in cases { + let p = PgPartInfo { + strategy: *pg_strat, + key: "k".into(), + children: vec![], + }; + assert_eq!(convert_partition_info(p).strategy, *expected); + } +} + +#[test] +fn view_kind_materialized_sets_flag() { + let mat = PgView { + oid: 1, + name: qn("public", "v"), + kind: ViewKind::Materialized, + columns: IndexMap::new(), + definition: "SELECT 1".into(), + is_updatable: false, + comment: None, + }; + assert!(convert_view(mat).is_materialized); + + let plain = PgView { + oid: 1, + name: qn("public", "v"), + kind: ViewKind::View, + columns: IndexMap::new(), + definition: "SELECT 1".into(), + is_updatable: false, + comment: None, + }; + assert!(!convert_view(plain).is_materialized); +} + +// ── golden fixture catalog ──────────────────────────────────────────────── + +fn fixture_catalog() -> PgCatalog { + let mut columns = IndexMap::new(); + let mut id_col = col("id", 1, "int8"); + id_col.identity = Some(IdentityKind::Always); + columns.insert("id".into(), id_col); + + let mut amount = col("amount", 2, "numeric"); + amount.is_nullable = true; + columns.insert("amount".into(), amount); + + let mut full_name = col("full_name", 3, "text"); + full_name.generated = Some(GeneratedKind::Stored); + full_name.default = Some("''".into()); + columns.insert("full_name".into(), full_name); + + let table = PgTable { + oid: 16384, + name: qn("public", "orders"), + columns, + primary_key: Some(PgPrimaryKey { + name: "orders_pkey".into(), + columns: vec!["id".into()], + definition: "PRIMARY KEY (id)".into(), + }), + foreign_keys: vec![PgFk { + constraint_name: "orders_customer_fk".into(), + columns: vec!["customer_id".into()], + references: qn("public", "customers"), + references_columns: vec!["id".into()], + is_validated: true, + is_enforced: true, + is_deferrable: false, + is_deferred: false, + on_update: FkAction::NoAction, + on_delete: FkAction::Cascade, + match_type: FkMatch::Simple, + definition: "FOREIGN KEY (customer_id) REFERENCES public.customers(id) ON DELETE CASCADE".into(), + }], + indexes: vec![PgIndex { + name: "orders_pkey".into(), + columns: vec!["id".into()], + included_columns: vec![], + is_unique: true, + is_primary: true, + is_partial: false, + predicate: None, + method: "btree".into(), + definition: "CREATE UNIQUE INDEX orders_pkey ON public.orders (id)".into(), + is_valid: true, + backs_constraint: true, + }], + unique_constraints: vec![PgUnique { + name: "orders_external_id_key".into(), + columns: vec!["external_id".into()], + index_name: "orders_external_id_key".into(), + is_validated: true, + is_deferrable: false, + is_deferred: false, + nulls_not_distinct: false, + definition: "UNIQUE (external_id)".into(), + }], + exclusion_constraints: vec![PgExclusion { + name: "orders_no_overlap".into(), + columns: vec!["during".into()], + index_name: "orders_no_overlap".into(), + definition: "EXCLUDE USING gist (during WITH &&)".into(), + }], + check_constraints: vec![PgCheck { + name: "orders_amount_check".into(), + definition: "CHECK ((amount > 0))".into(), + columns: vec!["amount".into()], + is_no_inherit: false, + }], + not_null_constraints: vec![], + comment: Some("order rows".into()), + is_partitioned: true, + is_partition: false, + partition_info: Some(PgPartInfo { + strategy: PgPartStrat::Range, + key: "RANGE (created_at)".into(), + children: vec![PgPartChild { + name: qn("public", "orders_2026"), + bound: "FOR VALUES FROM ('2026-01-01') TO ('2027-01-01')".into(), + }], + }), + reloptions: vec!["fillfactor=80".into()], + rls_enabled: true, + policies: vec![PgPolicy { + name: "orders_owner".into(), + command: PolicyCommand::Select, + permissive: true, + roles: vec!["app".into()], + using_expr: Some("(owner = current_user)".into()), + with_check_expr: None, + }], + triggers: vec![PgTrigger { + name: "orders_audit".into(), + timing: TriggerTiming::After, + events: vec![TriggerEvent::Insert], + orientation: TriggerOrientation::Row, + is_constraint: false, + is_enabled: true, + function: qn("public", "audit_log"), + definition: "CREATE TRIGGER orders_audit AFTER INSERT ON public.orders FOR EACH ROW EXECUTE FUNCTION public.audit_log()".into(), + }], + }; + + let mat_view = PgView { + oid: 16500, + name: qn("public", "orders_summary"), + kind: ViewKind::Materialized, + columns: IndexMap::new(), + definition: "SELECT count(*) FROM orders".into(), + is_updatable: false, + comment: None, + }; + + let mut tables = IndexMap::new(); + tables.insert(table.name.clone(), table); + let mut views = IndexMap::new(); + views.insert(mat_view.name.clone(), mat_view); + + PgCatalog { + tables, + views, + partition_roots: Default::default(), + dependencies: vec![], + extensions: vec![PgExtension { + name: "pgcrypto".into(), + schema: "public".into(), + version: "1.3".into(), + }], + functions: vec![PgFunction { + name: qn("public", "audit_log"), + kind: FunctionKind::Function, + language: "plpgsql".into(), + volatility: PgVol::Volatile, + security_definer: true, + arguments: String::new(), + identity_arguments: String::new(), + return_type: "trigger".into(), + comment: Some("audit trigger".into()), + }], + enums: vec![PgEnum { + name: qn("public", "order_status"), + labels: vec!["new".into(), "shipped".into()], + }], + domains: vec![PgDomain { + name: qn("public", "positive_amount"), + base_type: "numeric".into(), + is_nullable: false, + default: None, + constraints: vec!["CHECK (VALUE > 0)".into()], + }], + composites: vec![PgComposite { + name: qn("public", "address"), + attributes: vec![ + PgCompAttr { + name: "street".into(), + type_name: "text".into(), + }, + PgCompAttr { + name: "zip".into(), + type_name: "text".into(), + }, + ], + }], + } +} + +#[test] +fn fixture_catalog_converts_to_expected_snapshot_parts() { + let parts = catalog_to_snapshot_parts(fixture_catalog()); + + assert_eq!(parts.tables.len(), 1); + let t = &parts.tables[0]; + assert_eq!(t.schema, "public"); + assert_eq!(t.name, "orders"); + assert_eq!(t.oid, 16384); + assert_eq!(t.columns.len(), 3); + assert_eq!(t.columns[0].identity.as_deref(), Some("a")); + assert_eq!(t.columns[2].generated.as_deref(), Some("s")); + assert!(t.rls_enabled); + assert_eq!(t.reloptions, vec!["fillfactor=80".to_string()]); + + // PK + FK + unique + check + exclusion, sorted by name (matches old ORDER BY conname) + assert_eq!(t.constraints.len(), 5); + let names: Vec<&str> = t.constraints.iter().map(|c| c.name.as_str()).collect(); + let mut sorted = names.clone(); + sorted.sort(); + assert_eq!(names, sorted, "constraints must be sorted by name"); + + let fk = t + .constraints + .iter() + .find(|c| c.kind == ConstraintKind::ForeignKey) + .expect("fk present"); + assert_eq!(fk.fk_table.as_deref(), Some("public.customers")); + assert_eq!(fk.fk_columns, vec!["id".to_string()]); + + let unique = t + .constraints + .iter() + .find(|c| c.kind == ConstraintKind::Unique) + .expect("unique present"); + assert_eq!( + unique.backing_index.as_deref(), + Some("orders_external_id_key") + ); + + let p = t.partition_info.as_ref().expect("partition info"); + assert_eq!(p.strategy, PartitionStrategy::Range); + assert_eq!(p.children.len(), 1); + assert_eq!(p.children[0].schema, "public"); + assert_eq!(p.children[0].name, "orders_2026"); + + assert_eq!(t.policies.len(), 1); + assert_eq!(t.policies[0].command, "SELECT"); + assert_eq!(t.triggers.len(), 1); + assert_eq!(t.triggers[0].name, "orders_audit"); + + assert_eq!(parts.views.len(), 1); + assert!(parts.views[0].is_materialized); + + assert_eq!(parts.enums.len(), 1); + assert_eq!(parts.enums[0].labels, vec!["new", "shipped"]); + assert_eq!(parts.domains.len(), 1); + assert_eq!(parts.domains[0].check_constraints.len(), 1); + assert_eq!(parts.composites.len(), 1); + assert_eq!(parts.composites[0].fields.len(), 2); + assert_eq!(parts.functions.len(), 1); + assert_eq!(parts.functions[0].volatility, Volatility::Volatile); + assert!(parts.functions[0].security_definer); + assert_eq!(parts.extensions.len(), 1); + assert_eq!(parts.extensions[0].name, "pgcrypto"); +} + +// guards against silent regressions in field ordering, default values, or +// upstream pg_introspect changes that would invalidate snapshots stored in +// users' history.db. update EXPECTED only on intentional snapshot-format changes. +#[test] +fn fixture_catalog_content_hash_is_stable() { + let parts = catalog_to_snapshot_parts(fixture_catalog()); + let hash = compute_content_hash(&HashInput { + pg_version: "PostgreSQL 17.0", + tables: &parts.tables, + enums: &parts.enums, + domains: &parts.domains, + composites: &parts.composites, + views: &parts.views, + functions: &parts.functions, + extensions: &parts.extensions, + }); + const EXPECTED: &str = "ef118e31e0004baa508665111e32a9c2da964b60b24a900a6a1c654629d32fd6"; + assert_eq!( + hash, EXPECTED, + "content_hash drifted; if intentional, update EXPECTED" + ); +}