diff --git a/.opencode/skills/data-validate/SKILL.md b/.opencode/skills/data-validate/SKILL.md new file mode 100644 index 0000000000..4e1e87c29e --- /dev/null +++ b/.opencode/skills/data-validate/SKILL.md @@ -0,0 +1,183 @@ +--- +name: data-validate +description: Compare data between two tables across any warehouses using progressive validation — row counts, column profiles, segment checksums, and row-level drill-down. +--- + +# Data Validate + +## Requirements +**Agent:** data-diff or migrator (requires sql_execute on both source and target) +**Tools used:** sql_execute, warehouse_list, warehouse_test, schema_inspect, read, glob + +Cross-database data validation using a progressive, multi-level approach. Each level provides increasing confidence with increasing query cost — stop as soon as you have enough evidence. + +## Validation Levels + +### Level 1: Row Count (seconds, near-zero cost) +Compare total row counts between source and target. If counts match exactly, proceed to Level 2. If they differ, report the delta immediately — no deeper checks needed. + +```sql +-- Run on source warehouse +SELECT COUNT(*) AS row_count FROM {source_table} [WHERE ...] + +-- Run on target warehouse +SELECT COUNT(*) AS row_count FROM {target_table} [WHERE ...] +``` + +### Level 2: Column Profile (seconds, low cost) +For each column, compare aggregate statistics. This catches type coercion bugs, NULL handling differences, and truncation issues without scanning every row. + +```sql +SELECT + COUNT(*) AS total_rows, + COUNT({col}) AS non_null_count, + COUNT(DISTINCT {col}) AS distinct_count, + MIN({col}) AS min_val, + MAX({col}) AS max_val, + -- Numeric columns only: + AVG(CAST({col} AS DOUBLE)) AS avg_val, + SUM(CAST({col} AS DOUBLE)) AS sum_val +FROM {table} [WHERE ...] +``` + +Run this for each column (or the key columns + any columns the user cares about). Compare results side by side: + +``` +Column Profile Comparison +========================= +Column | Source | Target | Match +----------------|-----------------|-----------------|------ +total_rows | 1,234,567 | 1,234,567 | OK +user_id.distinct| 500,000 | 500,000 | OK +email.nulls | 0 | 1,204 | MISMATCH +amount.sum | 45,678,901.23 | 45,678,901.23 | OK +amount.avg | 37.01 | 37.01 | OK +created_at.min | 2020-01-01 | 2020-01-01 | OK +created_at.max | 2024-12-31 | 2024-12-31 | OK +``` + +If all profiles match, tables are equivalent with high confidence. Report and stop. + +### Level 3: Segment Checksums (moderate cost) +If profiles match but the user wants stronger guarantees, or if you need to locate WHERE the differences are, split the key space into segments and compare checksums. + +Requires: a sortable key column (integer PK, timestamp, etc.) + +```sql +-- Get key range +SELECT MIN({key_col}) AS min_key, MAX({key_col}) AS max_key FROM {table} + +-- Segment checksum (dialect-specific hash aggregation) +-- Snowflake: +SELECT + FLOOR(({key_col} - {min_key}) * {num_buckets} / ({max_key} - {min_key} + 1)) AS bucket, + COUNT(*) AS cnt, + BITXOR_AGG(HASH({columns})) AS checksum +FROM {table} +WHERE {key_col} >= {min_key} AND {key_col} <= {max_key} +GROUP BY bucket ORDER BY bucket + +-- Postgres: +SELECT + FLOOR(({key_col} - {min_key}) * {num_buckets} / ({max_key} - {min_key} + 1)) AS bucket, + COUNT(*) AS cnt, + BIT_XOR(('x' || SUBSTR(MD5(CONCAT({columns}::text)), 1, 12))::bit(48)::bigint) AS checksum +FROM {table} +WHERE {key_col} >= {min_key} AND {key_col} <= {max_key} +GROUP BY bucket ORDER BY bucket + +-- BigQuery: +SELECT + CAST(FLOOR(({key_col} - {min_key}) * {num_buckets} / ({max_key} - {min_key} + 1)) AS INT64) AS bucket, + COUNT(*) AS cnt, + BIT_XOR(FARM_FINGERPRINT(CONCAT({columns}))) AS checksum +FROM {table} +WHERE {key_col} >= {min_key} AND {key_col} <= {max_key} +GROUP BY bucket ORDER BY bucket + +-- DuckDB: +SELECT + FLOOR(({key_col} - {min_key}) * {num_buckets} / ({max_key} - {min_key} + 1)) AS bucket, + COUNT(*) AS cnt, + BIT_XOR(md5_number_lower64(CONCAT({columns}::text))) AS checksum +FROM {table} +WHERE {key_col} >= {min_key} AND {key_col} <= {max_key} +GROUP BY bucket ORDER BY bucket +``` + +Compare bucket-by-bucket. Matching checksums = identical data in that segment. Mismatched buckets narrow down where differences live. + +### Level 4: Row-Level Diff (targeted, on mismatched segments only) +For any mismatched segments from Level 3, download the actual rows and diff them locally. Only fetch rows in the mismatched key range. + +```sql +SELECT {key_col}, {columns} +FROM {table} +WHERE {key_col} >= {segment_min} AND {key_col} < {segment_max} +ORDER BY {key_col} +``` + +Compare row by row. Report additions, deletions, and value changes. + +## Workflow + +1. **Identify source and target** — Ask the user or infer from context: + - Which warehouse connections? (use `warehouse_list` to show available) + - Which tables to compare? + - Any WHERE clause filters? (date range, partition, etc.) + - Which columns matter? (all, or specific subset) + +2. **Verify connectivity** — Run `warehouse_test` on both connections. + +3. **Inspect schemas** — Use `schema_inspect` on both tables. Compare column names, types, and nullability. Flag any schema differences before proceeding (e.g., VARCHAR(100) vs VARCHAR(256), INT vs BIGINT). + +4. **Run Level 1** — Row counts. If mismatched, report and ask if user wants to drill deeper. + +5. **Run Level 2** — Column profiles. Compare side by side. If all match, report high-confidence equivalence. If mismatches found, highlight which columns differ and by how much. + +6. **Run Level 3** (if needed) — Segment checksums. Use 32 buckets by default. Report which segments match and which differ. + +7. **Run Level 4** (if needed) — Fetch rows from mismatched segments. Show the actual diff rows (additions/deletions/changes). + +8. **Report** — Always produce a structured summary: + +``` +Data Validation Report +====================== +Source: snowflake://analytics.public.orders +Target: bigquery://project.dataset.orders +Filter: created_at >= '2024-01-01' +Status: PASS | FAIL | PARTIAL + +Level 1 — Row Count: PASS (1,234,567 rows both sides) +Level 2 — Profile: PASS (all 12 columns match) +Level 3 — Checksum: PASS (32/32 segments match) +Level 4 — Row Diff: SKIPPED (not needed) + +Confidence: HIGH +``` + +## Dialect-Specific Notes + +**Hash functions by dialect:** +| Dialect | Row Hash | Aggregation | +|-------------|-----------------------|-----------------| +| Snowflake | `HASH(cols)` | `BITXOR_AGG` | +| Postgres | `MD5(CONCAT(cols))` | `BIT_XOR` | +| BigQuery | `FARM_FINGERPRINT` | `BIT_XOR` | +| DuckDB | `md5_number_lower64` | `BIT_XOR` | +| Databricks | `xxhash64(cols)` | `BIT_XOR` | +| MySQL | `MD5(CONCAT(cols))` | `BIT_XOR` | +| ClickHouse | `cityHash64(cols)` | `groupBitXor` | + +**Cross-database checksum comparison**: When source and target use different dialects, checksums won't match even for identical data (different hash functions). In this case, skip Level 3 and go directly from Level 2 to Level 4 if needed, OR download sorted rows from both sides and compare locally. + +## Usage + +- `/data-validate` — Start interactive validation (will ask for source/target) +- `/data-validate orders` — Validate the `orders` table across connected warehouses +- `/data-validate snowflake.orders bigquery.orders` — Explicit source and target +- `/data-validate --level 2` — Stop at profile level (skip checksums) +- `/data-validate --columns id,amount,created_at` — Only validate specific columns + +Use the tools: `sql_execute`, `warehouse_list`, `warehouse_test`, `schema_inspect`, `read`, `glob`. diff --git a/packages/altimate-engine/src/altimate_engine/server.py b/packages/altimate-engine/src/altimate_engine/server.py index b29bd7438a..9431b1fd0d 100644 --- a/packages/altimate-engine/src/altimate_engine/server.py +++ b/packages/altimate-engine/src/altimate_engine/server.py @@ -953,6 +953,24 @@ def dispatch(request: JsonRpcRequest) -> JsonRpcResponse: target_dialect=p.target_dialect, ) result = LocalTestResult(**raw) + elif method == "data_diff.run": + from altimate_engine.sql.data_diff import run_data_diff + + raw = run_data_diff( + source_table=params.get("source_table", ""), + target_table=params.get("target_table", ""), + source_warehouse=params.get("source_warehouse", ""), + target_warehouse=params.get("target_warehouse"), + key_columns=params.get("key_columns", []), + extra_columns=params.get("extra_columns"), + algorithm=params.get("algorithm", "auto"), + where_clause=params.get("where_clause"), + source_database=params.get("source_database"), + source_schema=params.get("source_schema"), + target_database=params.get("target_database"), + target_schema=params.get("target_schema"), + ) + return JsonRpcResponse(result=raw, id=request.id) elif method == "ping": return JsonRpcResponse(result={"status": "ok"}, id=request.id) else: diff --git a/packages/altimate-engine/src/altimate_engine/sql/data_diff.py b/packages/altimate-engine/src/altimate_engine/sql/data_diff.py new file mode 100644 index 0000000000..4930ca0f00 --- /dev/null +++ b/packages/altimate-engine/src/altimate_engine/sql/data_diff.py @@ -0,0 +1,201 @@ +"""Deterministic data diff engine using altimate-core's reladiff state machine. + +Orchestrates the cooperative Rust state machine: creates a session, loops +start() → execute SQL → step() until Done or Error. All SQL execution goes +through the existing ConnectionRegistry. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any + +from altimate_engine.connections import ConnectionRegistry +from altimate_engine.models import SqlExecuteParams +from altimate_engine.sql.executor import execute_sql + +logger = logging.getLogger(__name__) + +try: + import altimate_core + + RELADIFF_AVAILABLE = True +except ImportError: + RELADIFF_AVAILABLE = False + +# Map TableSide enum values to warehouse names +_SIDE_MAP = {"Table1": "source", "Table2": "target"} + + +def _resolve_dialect(warehouse_name: str) -> str: + """Infer SQL dialect from connection type.""" + try: + conn = ConnectionRegistry.get(warehouse_name) + conn_type = getattr(conn, "type", "").lower() + dialect_map = { + "snowflake": "snowflake", + "duckdb": "duckdb", + "postgres": "postgres", + "postgresql": "postgres", + "bigquery": "bigquery", + "mysql": "mysql", + "clickhouse": "clickhouse", + "databricks": "databricks", + "redshift": "redshift", + } + return dialect_map.get(conn_type, "generic") + except Exception: + return "generic" + + +def _execute_task(task: dict, warehouse: str) -> dict: + """Execute a single SQL task against the given warehouse.""" + result = execute_sql( + SqlExecuteParams(sql=task["sql"], warehouse=warehouse, limit=100_000) + ) + + # Convert SqlExecuteResult rows to the format expected by ReladiffSession.step() + rows: list[list[str | None]] = [] + for row in result.rows: + rows.append([str(v) if v is not None else None for v in row]) + + return {"id": task["id"], "rows": rows} + + +def run_data_diff( + *, + source_table: str, + target_table: str, + source_warehouse: str, + target_warehouse: str | None = None, + key_columns: list[str], + extra_columns: list[str] | None = None, + algorithm: str = "auto", + where_clause: str | None = None, + source_database: str | None = None, + source_schema: str | None = None, + target_database: str | None = None, + target_schema: str | None = None, +) -> dict[str, Any]: + """Run a deterministic data diff using the Rust reladiff engine. + + Returns the complete validation result as a dict. + """ + if not RELADIFF_AVAILABLE: + return { + "success": False, + "error": "altimate-core not installed. ReladiffSession unavailable.", + } + + target_warehouse = target_warehouse or source_warehouse + + # Resolve dialects from connection types + dialect1 = _resolve_dialect(source_warehouse) + dialect2 = _resolve_dialect(target_warehouse) + + # Build session spec + table1: dict[str, Any] = {"table": source_table} + if source_database: + table1["database"] = source_database + if source_schema: + table1["schema"] = source_schema + + table2: dict[str, Any] = {"table": target_table} + if target_database: + table2["database"] = target_database + if target_schema: + table2["schema"] = target_schema + + spec = { + "table1": table1, + "table2": table2, + "dialect1": dialect1, + "dialect2": dialect2, + "config": { + "algorithm": algorithm, + "key_columns": key_columns, + "extra_columns": extra_columns or [], + }, + } + + if where_clause: + spec["config"]["where_clause"] = where_clause + + logger.info("Starting reladiff session: %s", json.dumps(spec, indent=2)) + + # Create session and run the state machine loop + try: + session = altimate_core.ReladiffSession(json.dumps(spec)) + except Exception as e: + return {"success": False, "error": f"Failed to create session: {e}"} + + # Map table sides to warehouses + warehouse_map = {"Table1": source_warehouse, "Table2": target_warehouse} + + action = session.start() + step_count = 0 + max_steps = 100 # Safety limit + + while step_count < max_steps: + step_count += 1 + action_type = action.get("type") + + if action_type == "Done": + outcome = action.get("outcome", {}) + return { + "success": True, + "status": "completed", + "steps": step_count, + "outcome": outcome, + } + + if action_type == "Error": + return { + "success": False, + "error": action.get("message", "Unknown engine error"), + "steps": step_count, + } + + if action_type != "ExecuteSql": + return { + "success": False, + "error": f"Unexpected action type: {action_type}", + "steps": step_count, + } + + # Execute all SQL tasks + tasks = action.get("tasks", []) + responses = [] + + for task in tasks: + side = task.get("table_side", "Table1") + wh = warehouse_map.get(side, source_warehouse) + + logger.info( + "Step %d: Executing [%s] on %s: %s", + step_count, + side, + wh, + task["sql"][:120], + ) + + try: + resp = _execute_task(task, wh) + responses.append(resp) + except Exception as e: + return { + "success": False, + "error": f"SQL execution failed on {wh}: {e}", + "steps": step_count, + "failed_sql": task["sql"], + } + + # Feed responses back to the engine + action = session.step(json.dumps(responses)) + + return { + "success": False, + "error": f"State machine did not converge after {max_steps} steps", + "steps": step_count, + } diff --git a/packages/altimate-engine/src/altimate_engine/sql/guard.py b/packages/altimate-engine/src/altimate_engine/sql/guard.py index de757a142a..0ed5fb7d07 100644 --- a/packages/altimate-engine/src/altimate_engine/sql/guard.py +++ b/packages/altimate-engine/src/altimate_engine/sql/guard.py @@ -449,7 +449,7 @@ def guard_column_lineage( default_database: str = "", default_schema: str = "", ) -> dict: - """Schema-aware column lineage (requires altimate_core.init).""" + """Schema-aware column lineage. Works without API keys; logs usage when initialized.""" if not ALTIMATE_CORE_AVAILABLE: return _not_installed_result() try: @@ -471,7 +471,7 @@ def guard_track_lineage( schema_path: str = "", schema_context: dict[str, Any] | None = None, ) -> dict: - """Track lineage across multiple queries (requires altimate_core.init).""" + """Track lineage across multiple queries. Works without API keys; logs usage when initialized.""" if not ALTIMATE_CORE_AVAILABLE: return _not_installed_result() try: diff --git a/packages/opencode/src/agent/agent.ts b/packages/opencode/src/agent/agent.ts index 6eb16f80fa..0ac962f0bd 100644 --- a/packages/opencode/src/agent/agent.ts +++ b/packages/opencode/src/agent/agent.ts @@ -19,6 +19,7 @@ import PROMPT_ANALYST from "../altimate/prompts/analyst.txt" import PROMPT_VALIDATOR from "../altimate/prompts/validator.txt" import PROMPT_MIGRATOR from "../altimate/prompts/migrator.txt" import PROMPT_EXECUTIVE from "../altimate/prompts/executive.txt" +import PROMPT_DATA_DIFF from "../altimate/prompts/data-diff.txt" // altimate_change end import { PermissionNext } from "@/permission/next" import { mergeDeep, pipe, sortBy, values } from "remeda" @@ -221,6 +222,38 @@ export namespace Agent { mode: "primary", native: true, }, + "data-diff": { + name: "data-diff", + description: "Cross-database data validation. Compare tables across warehouses using progressive checks: row counts, column profiles, segment checksums, and row-level diffs.", + prompt: PROMPT_DATA_DIFF, + options: {}, + permission: PermissionNext.merge( + defaults, + PermissionNext.fromConfig({ + sql_execute: "allow", sql_validate: "allow", sql_analyze: "allow", + sql_translate: "allow", sql_optimize: "allow", lineage_check: "allow", + warehouse_list: "allow", warehouse_test: "allow", warehouse_discover: "allow", + schema_inspect: "allow", schema_index: "allow", schema_search: "allow", + schema_cache_status: "allow", sql_explain: "allow", sql_format: "allow", + sql_fix: "allow", sql_autocomplete: "allow", sql_diff: "allow", + finops_query_history: "allow", finops_analyze_credits: "allow", + finops_expensive_queries: "allow", finops_warehouse_advice: "allow", + finops_unused_resources: "allow", finops_role_grants: "allow", + finops_role_hierarchy: "allow", finops_user_roles: "allow", + schema_detect_pii: "allow", schema_tags: "allow", schema_tags_list: "allow", + altimate_core_validate: "allow", altimate_core_lint: "allow", + altimate_core_safety: "allow", altimate_core_transpile: "allow", + altimate_core_check: "allow", + data_diff: "allow", + read: "allow", write: "allow", edit: "allow", + grep: "allow", glob: "allow", bash: "allow", + question: "allow", + }), + user, + ), + mode: "primary", + native: true, + }, // altimate_change end plan: { name: "plan", diff --git a/packages/opencode/src/altimate/bridge/protocol.ts b/packages/opencode/src/altimate/bridge/protocol.ts index ccaa99fc7b..39673491b2 100644 --- a/packages/opencode/src/altimate/bridge/protocol.ts +++ b/packages/opencode/src/altimate/bridge/protocol.ts @@ -876,6 +876,31 @@ export interface AltimateCoreIsSafeParams { sql: string } +// --- data diff (reladiff) --- +export interface DataDiffRunParams { + source_table: string + target_table: string + source_warehouse: string + target_warehouse?: string + key_columns: string[] + extra_columns?: string[] + algorithm?: "auto" | "hashdiff" | "joindiff" | "profile" | "recon" | "cascade" + where_clause?: string + source_database?: string + source_schema?: string + target_database?: string + target_schema?: string +} + +export interface DataDiffRunResult { + success: boolean + status?: string + error?: string + steps?: number + outcome?: Record + failed_sql?: string +} + // --- dbt Lineage --- export interface DbtLineageParams { @@ -1032,6 +1057,7 @@ export const BridgeMethods = { "altimate_core.introspection_sql": {} as { params: AltimateCoreIntrospectionSqlParams; result: AltimateCoreResult }, "altimate_core.parse_dbt": {} as { params: AltimateCoreParseDbtParams; result: AltimateCoreResult }, "altimate_core.is_safe": {} as { params: AltimateCoreIsSafeParams; result: AltimateCoreResult }, + "data_diff.run": {} as { params: DataDiffRunParams; result: DataDiffRunResult }, ping: {} as { params: Record; result: { status: string } }, } as const diff --git a/packages/opencode/src/altimate/prompts/data-diff.txt b/packages/opencode/src/altimate/prompts/data-diff.txt new file mode 100644 index 0000000000..9e5804e12c --- /dev/null +++ b/packages/opencode/src/altimate/prompts/data-diff.txt @@ -0,0 +1,98 @@ +You are altimate-code in data-diff mode — a cross-database data validation agent. + +Your purpose is to compare data between two tables (same database or different warehouses) and determine whether they contain the same data. + +## PRIMARY TOOL: `data_diff` + +**Always use the `data_diff` tool as your first choice.** It wraps a deterministic Rust engine (reladiff) that handles checksums, bisection, and row-level comparison automatically — far more reliable than hand-written SQL. + +### How to use `data_diff` + +1. Call `warehouse_list` to discover configured warehouses. +2. Call `schema_inspect` to examine both tables and identify key columns. +3. Call `data_diff` with: + - `source_table`: source table name (without database/schema prefix) + - `target_table`: target table name + - `source_warehouse`: warehouse connection name from warehouse_list + - `target_warehouse`: (optional) target warehouse, defaults to source + - `key_columns`: primary key column(s) that uniquely identify each row + - `extra_columns`: (optional) additional columns to compare beyond keys + - `algorithm`: (optional) "auto" (default), "hashdiff", "joindiff", "profile", "recon", or "cascade" + - `where_clause`: (optional) WHERE filter applied to both tables + - `source_database`, `source_schema`, `target_database`, `target_schema`: (optional) fully qualify tables + +### Algorithm selection + +- **auto** (default): JoinDiff if same database, HashDiff if cross-database +- **hashdiff**: Bisection with checksums — works cross-database, scales to billions of rows +- **joindiff**: FULL OUTER JOIN — fastest for same-database, exact results in one pass +- **profile**: Column-level statistics only (count, nulls, distinct, min/max) — no row-level diff +- **cascade**: Progressive — count → profile → content diff, stops as soon as mismatch found +- **recon**: Reconciliation with custom tolerance rules + +### Example + +``` +warehouse_list() +schema_inspect(table: "DATA_DIFF_TEST.customers_source", warehouse: "js") +data_diff( + source_table: "customers_source", + target_table: "customers_target", + source_warehouse: "js", + key_columns: ["id"], + extra_columns: ["name", "email", "amount"], + source_database: "JS", + source_schema: "DATA_DIFF_TEST" +) +``` + +The tool returns a structured report with row counts, diff statistics, and any mismatched rows. + +## FALLBACK: Manual SQL Validation + +Only use manual SQL via `sql_execute` if `data_diff` is unavailable (e.g., altimate-core not installed) or if you need a quick ad-hoc check outside the tool's scope. + +### Manual tools available +- `warehouse_list` — discover configured warehouses +- `warehouse_test` — verify a connection works +- `schema_inspect` — inspect table schemas +- `sql_execute` — run SQL queries (with `query`, `warehouse`, `limit` parameters) + +### Manual progressive approach (when data_diff unavailable) + +**Level 1: Row Count** — `SELECT COUNT(*) FROM {table}` on both sides. +**Level 2: Column Profile** — COUNT, COUNT(DISTINCT), MIN, MAX, AVG, SUM per column. +**Level 3: Segment Checksums** — Split key space into 32 buckets, compare hash checksums (same-dialect only). +**Level 4: Row-Level Diff** — Fetch actual mismatched rows for targeted comparison. + +NEVER use bash, pip install, or raw Python scripts for database queries. + +## Output Format + +Every validation MUST end with a structured summary: +``` +Data Validation Report +====================== +Source: {warehouse}.{table} +Target: {warehouse}.{table} +Algorithm: {algorithm used} +Status: PASS | FAIL + +Results: + Rows table1: X + Rows table2: Y + Exclusive to table1: A + Exclusive to table2: B + Updated: C + Unchanged: D + +Confidence: HIGH | MEDIUM | LOW +``` + +## Key Principles + +1. **Use `data_diff` first** — it's deterministic, handles dialect differences, and scales. +2. **Cheapest check first** — if using manual mode, start with COUNT(*). +3. **Cross-database awareness** — `data_diff` auto-selects the right algorithm. Manual checksums won't work across dialects. +4. **Show your work** — display the tool call parameters and results. +5. **Respect cost** — use WHERE filters when available. Don't full-scan TB tables without consent. diff --git a/packages/opencode/src/altimate/prompts/migrator.txt b/packages/opencode/src/altimate/prompts/migrator.txt index 6f92c21fa6..e9753617cf 100644 --- a/packages/opencode/src/altimate/prompts/migrator.txt +++ b/packages/opencode/src/altimate/prompts/migrator.txt @@ -23,6 +23,7 @@ When migrating: ## Available Skills You have access to these skills that users can invoke with /: +- /data-validate — Progressive data validation: row counts → profiles → checksums → row diff - /sql-translate — Cross-dialect SQL translation with warnings - /lineage-diff — Compare column lineage between SQL versions - /query-optimize — Query optimization with anti-pattern detection diff --git a/packages/opencode/src/altimate/prompts/validator.txt b/packages/opencode/src/altimate/prompts/validator.txt index 636e6e39cb..45db8e1636 100644 --- a/packages/opencode/src/altimate/prompts/validator.txt +++ b/packages/opencode/src/altimate/prompts/validator.txt @@ -95,6 +95,7 @@ Report the checklist with pass/fail/skip status for each item. - read, grep, glob — File reading ## Skills Available (read-only — these produce analysis, not file changes) +- /data-validate — Progressive data validation: row counts → profiles → checksums → row diff - /lineage-diff — Compare column lineage between SQL versions - /cost-report — Snowflake cost analysis with optimization suggestions - /query-optimize — Query optimization with anti-pattern detection diff --git a/packages/opencode/src/altimate/tools/data-diff-run.ts b/packages/opencode/src/altimate/tools/data-diff-run.ts new file mode 100644 index 0000000000..734b7718ac --- /dev/null +++ b/packages/opencode/src/altimate/tools/data-diff-run.ts @@ -0,0 +1,129 @@ +import z from "zod" +import { Tool } from "../../tool/tool" +import { Bridge } from "../bridge/client" + +export const DataDiffRunTool = Tool.define("data_diff", { + description: + "Run deterministic data validation between two database tables using the reladiff engine. " + + "Compares tables row-by-row using checksums and bisection search (cross-database) or " + + "FULL OUTER JOIN (same database). Returns a structured diff report.", + parameters: z.object({ + source_table: z.string().describe("Source table name (without database/schema prefix)"), + target_table: z.string().describe("Target table name (without database/schema prefix)"), + source_warehouse: z.string().describe("Source warehouse connection name (from warehouse_list)"), + target_warehouse: z + .string() + .optional() + .describe("Target warehouse connection name. Defaults to source_warehouse if same database."), + key_columns: z.array(z.string()).describe("Primary key column(s) that uniquely identify each row"), + extra_columns: z + .array(z.string()) + .optional() + .describe("Additional columns to compare beyond the key columns"), + algorithm: z + .enum(["auto", "hashdiff", "joindiff", "profile", "recon", "cascade"]) + .optional() + .default("auto") + .describe( + "Comparison algorithm. auto=JoinDiff if same DB, HashDiff if cross-DB. " + + "profile=column statistics only. cascade=count→profile→content.", + ), + where_clause: z.string().optional().describe("Optional WHERE filter applied to both tables"), + source_database: z.string().optional().describe("Source database/catalog name"), + source_schema: z.string().optional().describe("Source schema name"), + target_database: z.string().optional().describe("Target database/catalog name"), + target_schema: z.string().optional().describe("Target schema name"), + }), + async execute(args, ctx) { + try { + const result = await Bridge.call("data_diff.run", { + source_table: args.source_table, + target_table: args.target_table, + source_warehouse: args.source_warehouse, + target_warehouse: args.target_warehouse, + key_columns: args.key_columns, + extra_columns: args.extra_columns, + algorithm: args.algorithm, + where_clause: args.where_clause, + source_database: args.source_database, + source_schema: args.source_schema, + target_database: args.target_database, + target_schema: args.target_schema, + }) + + if (!result.success) { + return { + title: `Data Diff: FAILED`, + metadata: { status: "error", steps: result.steps ?? 0 }, + output: `Error: ${result.error}\n\nSteps completed: ${result.steps ?? 0}`, + } + } + + const outcome = result.outcome ?? {} + const output = formatOutcome(outcome, args, result.steps ?? 0) + + return { + title: `Data Diff: ${args.source_table} ↔ ${args.target_table}`, + metadata: { status: "completed", steps: result.steps }, + output, + } + } catch (e) { + const msg = e instanceof Error ? e.message : String(e) + return { + title: "Data Diff: ERROR", + metadata: { status: "error", steps: 0 }, + output: `Failed to run data diff: ${msg}\n\nEnsure altimate-core is installed in the engine.`, + } + } + }, +}) + +function formatOutcome(outcome: Record, args: Record, steps: number): string { + const mode = outcome.mode as string | undefined + const lines: string[] = [] + + lines.push("```") + lines.push("Data Validation Report") + lines.push("======================") + lines.push(`Source: ${args.source_warehouse}.${args.source_table}`) + lines.push(`Target: ${(args.target_warehouse || args.source_warehouse)}.${args.target_table}`) + lines.push(`Algorithm: ${args.algorithm || "auto"}`) + lines.push(`Steps: ${steps}`) + lines.push("") + + if (mode === "diff") { + const stats = (outcome.stats ?? {}) as Record + const diffRows = (outcome.diff_rows ?? []) as unknown[] + const pass = diffRows.length === 0 + lines.push(`Status: ${pass ? "PASS ✓" : "FAIL ✗"}`) + lines.push(`Rows table1: ${stats.rows_table1 ?? "?"}`) + lines.push(`Rows table2: ${stats.rows_table2 ?? "?"}`) + if (!pass) { + lines.push(`Exclusive to table1: ${stats.exclusive_table1 ?? 0}`) + lines.push(`Exclusive to table2: ${stats.exclusive_table2 ?? 0}`) + lines.push(`Updated: ${stats.updated ?? 0}`) + lines.push(`Diff %: ${((stats.diff_percent as number) * 100).toFixed(2)}%`) + } else { + lines.push(`Unchanged: ${stats.unchanged ?? stats.rows_table1}`) + } + } else if (mode === "profile") { + const verdict = outcome.overall_verdict as string + lines.push(`Status: ${verdict === "match" ? "PASS ✓" : "FAIL ✗"}`) + lines.push(`Overall verdict: ${verdict}`) + const cols = (outcome.columns ?? []) as Record[] + for (const col of cols) { + lines.push(` ${col.column}: ${col.verdict}`) + } + } else if (mode === "cascade") { + const countResult = (outcome.count_result ?? {}) as Record + lines.push(`Stage: ${outcome.stopped_at}`) + lines.push(`Count table1: ${countResult.count_table1}`) + lines.push(`Count table2: ${countResult.count_table2}`) + lines.push(`Count match: ${countResult.match_ ? "YES" : "NO"}`) + } else { + lines.push(JSON.stringify(outcome, null, 2)) + } + + lines.push("```") + return lines.join("\n") +} diff --git a/packages/opencode/src/tool/registry.ts b/packages/opencode/src/tool/registry.ts index 3f93520678..d45af33e76 100644 --- a/packages/opencode/src/tool/registry.ts +++ b/packages/opencode/src/tool/registry.ts @@ -98,6 +98,7 @@ import { AltimateCoreFingerprintTool } from "../altimate/tools/altimate-core-fin import { AltimateCoreIntrospectionSqlTool } from "../altimate/tools/altimate-core-introspection-sql" import { AltimateCoreParseDbtTool } from "../altimate/tools/altimate-core-parse-dbt" import { AltimateCoreIsSafeTool } from "../altimate/tools/altimate-core-is-safe" +import { DataDiffRunTool } from "../altimate/tools/data-diff-run" import { ProjectScanTool } from "../altimate/tools/project-scan" // altimate_change end @@ -260,6 +261,7 @@ export namespace ToolRegistry { AltimateCoreIntrospectionSqlTool, AltimateCoreParseDbtTool, AltimateCoreIsSafeTool, + DataDiffRunTool, ProjectScanTool, // altimate_change end ...custom,