From 0cf347775c25ad3bc2bc838e0d52d06efb8780b7 Mon Sep 17 00:00:00 2001 From: suryaiyer95 Date: Mon, 9 Mar 2026 18:15:13 -0700 Subject: [PATCH 1/3] feat: add `data-diff` agent mode and `/data-validate` skill MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New `data-diff` primary agent mode for cross-database data validation with progressive checks: row counts → column profiles → segment checksums → row-level diffs - New `/data-validate` skill with dialect-specific SQL templates for Snowflake, Postgres, BigQuery, DuckDB, Databricks, ClickHouse, MySQL - Prompt covers 4 validation levels, cross-database checksum awareness, and structured PASS/FAIL reporting - Added `/data-validate` to migrator and validator skill lists so both modes can invoke it Co-Authored-By: Claude Opus 4.6 --- .opencode/skills/data-validate/SKILL.md | 183 ++++++++++++++++++ packages/opencode/src/agent/agent.ts | 32 +++ .../src/altimate/prompts/data-diff.txt | 112 +++++++++++ .../src/altimate/prompts/migrator.txt | 1 + .../src/altimate/prompts/validator.txt | 1 + 5 files changed, 329 insertions(+) create mode 100644 .opencode/skills/data-validate/SKILL.md create mode 100644 packages/opencode/src/altimate/prompts/data-diff.txt diff --git a/.opencode/skills/data-validate/SKILL.md b/.opencode/skills/data-validate/SKILL.md new file mode 100644 index 0000000000..4e1e87c29e --- /dev/null +++ b/.opencode/skills/data-validate/SKILL.md @@ -0,0 +1,183 @@ +--- +name: data-validate +description: Compare data between two tables across any warehouses using progressive validation — row counts, column profiles, segment checksums, and row-level drill-down. +--- + +# Data Validate + +## Requirements +**Agent:** data-diff or migrator (requires sql_execute on both source and target) +**Tools used:** sql_execute, warehouse_list, warehouse_test, schema_inspect, read, glob + +Cross-database data validation using a progressive, multi-level approach. Each level provides increasing confidence with increasing query cost — stop as soon as you have enough evidence. + +## Validation Levels + +### Level 1: Row Count (seconds, near-zero cost) +Compare total row counts between source and target. If counts match exactly, proceed to Level 2. If they differ, report the delta immediately — no deeper checks needed. + +```sql +-- Run on source warehouse +SELECT COUNT(*) AS row_count FROM {source_table} [WHERE ...] + +-- Run on target warehouse +SELECT COUNT(*) AS row_count FROM {target_table} [WHERE ...] +``` + +### Level 2: Column Profile (seconds, low cost) +For each column, compare aggregate statistics. This catches type coercion bugs, NULL handling differences, and truncation issues without scanning every row. + +```sql +SELECT + COUNT(*) AS total_rows, + COUNT({col}) AS non_null_count, + COUNT(DISTINCT {col}) AS distinct_count, + MIN({col}) AS min_val, + MAX({col}) AS max_val, + -- Numeric columns only: + AVG(CAST({col} AS DOUBLE)) AS avg_val, + SUM(CAST({col} AS DOUBLE)) AS sum_val +FROM {table} [WHERE ...] +``` + +Run this for each column (or the key columns + any columns the user cares about). Compare results side by side: + +``` +Column Profile Comparison +========================= +Column | Source | Target | Match +----------------|-----------------|-----------------|------ +total_rows | 1,234,567 | 1,234,567 | OK +user_id.distinct| 500,000 | 500,000 | OK +email.nulls | 0 | 1,204 | MISMATCH +amount.sum | 45,678,901.23 | 45,678,901.23 | OK +amount.avg | 37.01 | 37.01 | OK +created_at.min | 2020-01-01 | 2020-01-01 | OK +created_at.max | 2024-12-31 | 2024-12-31 | OK +``` + +If all profiles match, tables are equivalent with high confidence. Report and stop. + +### Level 3: Segment Checksums (moderate cost) +If profiles match but the user wants stronger guarantees, or if you need to locate WHERE the differences are, split the key space into segments and compare checksums. + +Requires: a sortable key column (integer PK, timestamp, etc.) + +```sql +-- Get key range +SELECT MIN({key_col}) AS min_key, MAX({key_col}) AS max_key FROM {table} + +-- Segment checksum (dialect-specific hash aggregation) +-- Snowflake: +SELECT + FLOOR(({key_col} - {min_key}) * {num_buckets} / ({max_key} - {min_key} + 1)) AS bucket, + COUNT(*) AS cnt, + BITXOR_AGG(HASH({columns})) AS checksum +FROM {table} +WHERE {key_col} >= {min_key} AND {key_col} <= {max_key} +GROUP BY bucket ORDER BY bucket + +-- Postgres: +SELECT + FLOOR(({key_col} - {min_key}) * {num_buckets} / ({max_key} - {min_key} + 1)) AS bucket, + COUNT(*) AS cnt, + BIT_XOR(('x' || SUBSTR(MD5(CONCAT({columns}::text)), 1, 12))::bit(48)::bigint) AS checksum +FROM {table} +WHERE {key_col} >= {min_key} AND {key_col} <= {max_key} +GROUP BY bucket ORDER BY bucket + +-- BigQuery: +SELECT + CAST(FLOOR(({key_col} - {min_key}) * {num_buckets} / ({max_key} - {min_key} + 1)) AS INT64) AS bucket, + COUNT(*) AS cnt, + BIT_XOR(FARM_FINGERPRINT(CONCAT({columns}))) AS checksum +FROM {table} +WHERE {key_col} >= {min_key} AND {key_col} <= {max_key} +GROUP BY bucket ORDER BY bucket + +-- DuckDB: +SELECT + FLOOR(({key_col} - {min_key}) * {num_buckets} / ({max_key} - {min_key} + 1)) AS bucket, + COUNT(*) AS cnt, + BIT_XOR(md5_number_lower64(CONCAT({columns}::text))) AS checksum +FROM {table} +WHERE {key_col} >= {min_key} AND {key_col} <= {max_key} +GROUP BY bucket ORDER BY bucket +``` + +Compare bucket-by-bucket. Matching checksums = identical data in that segment. Mismatched buckets narrow down where differences live. + +### Level 4: Row-Level Diff (targeted, on mismatched segments only) +For any mismatched segments from Level 3, download the actual rows and diff them locally. Only fetch rows in the mismatched key range. + +```sql +SELECT {key_col}, {columns} +FROM {table} +WHERE {key_col} >= {segment_min} AND {key_col} < {segment_max} +ORDER BY {key_col} +``` + +Compare row by row. Report additions, deletions, and value changes. + +## Workflow + +1. **Identify source and target** — Ask the user or infer from context: + - Which warehouse connections? (use `warehouse_list` to show available) + - Which tables to compare? + - Any WHERE clause filters? (date range, partition, etc.) + - Which columns matter? (all, or specific subset) + +2. **Verify connectivity** — Run `warehouse_test` on both connections. + +3. **Inspect schemas** — Use `schema_inspect` on both tables. Compare column names, types, and nullability. Flag any schema differences before proceeding (e.g., VARCHAR(100) vs VARCHAR(256), INT vs BIGINT). + +4. **Run Level 1** — Row counts. If mismatched, report and ask if user wants to drill deeper. + +5. **Run Level 2** — Column profiles. Compare side by side. If all match, report high-confidence equivalence. If mismatches found, highlight which columns differ and by how much. + +6. **Run Level 3** (if needed) — Segment checksums. Use 32 buckets by default. Report which segments match and which differ. + +7. **Run Level 4** (if needed) — Fetch rows from mismatched segments. Show the actual diff rows (additions/deletions/changes). + +8. **Report** — Always produce a structured summary: + +``` +Data Validation Report +====================== +Source: snowflake://analytics.public.orders +Target: bigquery://project.dataset.orders +Filter: created_at >= '2024-01-01' +Status: PASS | FAIL | PARTIAL + +Level 1 — Row Count: PASS (1,234,567 rows both sides) +Level 2 — Profile: PASS (all 12 columns match) +Level 3 — Checksum: PASS (32/32 segments match) +Level 4 — Row Diff: SKIPPED (not needed) + +Confidence: HIGH +``` + +## Dialect-Specific Notes + +**Hash functions by dialect:** +| Dialect | Row Hash | Aggregation | +|-------------|-----------------------|-----------------| +| Snowflake | `HASH(cols)` | `BITXOR_AGG` | +| Postgres | `MD5(CONCAT(cols))` | `BIT_XOR` | +| BigQuery | `FARM_FINGERPRINT` | `BIT_XOR` | +| DuckDB | `md5_number_lower64` | `BIT_XOR` | +| Databricks | `xxhash64(cols)` | `BIT_XOR` | +| MySQL | `MD5(CONCAT(cols))` | `BIT_XOR` | +| ClickHouse | `cityHash64(cols)` | `groupBitXor` | + +**Cross-database checksum comparison**: When source and target use different dialects, checksums won't match even for identical data (different hash functions). In this case, skip Level 3 and go directly from Level 2 to Level 4 if needed, OR download sorted rows from both sides and compare locally. + +## Usage + +- `/data-validate` — Start interactive validation (will ask for source/target) +- `/data-validate orders` — Validate the `orders` table across connected warehouses +- `/data-validate snowflake.orders bigquery.orders` — Explicit source and target +- `/data-validate --level 2` — Stop at profile level (skip checksums) +- `/data-validate --columns id,amount,created_at` — Only validate specific columns + +Use the tools: `sql_execute`, `warehouse_list`, `warehouse_test`, `schema_inspect`, `read`, `glob`. diff --git a/packages/opencode/src/agent/agent.ts b/packages/opencode/src/agent/agent.ts index 6eb16f80fa..ac9a256da2 100644 --- a/packages/opencode/src/agent/agent.ts +++ b/packages/opencode/src/agent/agent.ts @@ -19,6 +19,7 @@ import PROMPT_ANALYST from "../altimate/prompts/analyst.txt" import PROMPT_VALIDATOR from "../altimate/prompts/validator.txt" import PROMPT_MIGRATOR from "../altimate/prompts/migrator.txt" import PROMPT_EXECUTIVE from "../altimate/prompts/executive.txt" +import PROMPT_DATA_DIFF from "../altimate/prompts/data-diff.txt" // altimate_change end import { PermissionNext } from "@/permission/next" import { mergeDeep, pipe, sortBy, values } from "remeda" @@ -221,6 +222,37 @@ export namespace Agent { mode: "primary", native: true, }, + "data-diff": { + name: "data-diff", + description: "Cross-database data validation. Compare tables across warehouses using progressive checks: row counts, column profiles, segment checksums, and row-level diffs.", + prompt: PROMPT_DATA_DIFF, + options: {}, + permission: PermissionNext.merge( + defaults, + PermissionNext.fromConfig({ + sql_execute: "allow", sql_validate: "allow", sql_analyze: "allow", + sql_translate: "allow", sql_optimize: "allow", lineage_check: "allow", + warehouse_list: "allow", warehouse_test: "allow", warehouse_discover: "allow", + schema_inspect: "allow", schema_index: "allow", schema_search: "allow", + schema_cache_status: "allow", sql_explain: "allow", sql_format: "allow", + sql_fix: "allow", sql_autocomplete: "allow", sql_diff: "allow", + finops_query_history: "allow", finops_analyze_credits: "allow", + finops_expensive_queries: "allow", finops_warehouse_advice: "allow", + finops_unused_resources: "allow", finops_role_grants: "allow", + finops_role_hierarchy: "allow", finops_user_roles: "allow", + schema_detect_pii: "allow", schema_tags: "allow", schema_tags_list: "allow", + altimate_core_validate: "allow", altimate_core_lint: "allow", + altimate_core_safety: "allow", altimate_core_transpile: "allow", + altimate_core_check: "allow", + read: "allow", write: "allow", edit: "allow", + grep: "allow", glob: "allow", bash: "allow", + question: "allow", + }), + user, + ), + mode: "primary", + native: true, + }, // altimate_change end plan: { name: "plan", diff --git a/packages/opencode/src/altimate/prompts/data-diff.txt b/packages/opencode/src/altimate/prompts/data-diff.txt new file mode 100644 index 0000000000..2ac4287b3d --- /dev/null +++ b/packages/opencode/src/altimate/prompts/data-diff.txt @@ -0,0 +1,112 @@ +You are altimate-code in data-diff mode — a cross-database data validation agent. + +Your purpose is to compare data between two tables (same database or different warehouses) and determine whether they contain the same data. You use a progressive validation approach: cheap checks first, expensive checks only when needed. + +## CRITICAL: Always Use Built-in Tools + +NEVER use bash, pip install, or raw Python scripts to query databases. You have dedicated tools: + +### Step 0: Discover connections +Call `warehouse_list` (no parameters) to see all configured warehouses. +Call `warehouse_test` with `name: "js"` to verify a connection works. + +### Step 1: Inspect schemas +Call `schema_inspect` with `table: "DATA_DIFF_TEST.customers_identical_source"` and `warehouse: "js"` to see columns and types. + +### Step 2: Execute SQL +Call `sql_execute` with: +- `query`: the SQL string +- `warehouse`: connection name (e.g. "js" for Snowflake, "test_duckdb" for DuckDB) +- `limit`: max rows (default 100, increase for row-level diffs) + +Example: `sql_execute(query: "SELECT COUNT(*) FROM JS.DATA_DIFF_TEST.customers_identical_source", warehouse: "js")` + +NEVER fall back to bash or Python for SQL execution. If sql_execute fails, report the error — do not try to work around it. + +## Validation Protocol + +Always follow this progressive approach — stop as soon as you have a definitive answer: + +### Level 1: Row Count (run FIRST, always) +Use sql_execute to run `SELECT COUNT(*) AS row_count FROM {table}` on both tables via their respective warehouse connections. If counts differ, report the delta immediately. + +Example: +``` +sql_execute(query: "SELECT COUNT(*) AS cnt FROM JS.DATA_DIFF_TEST.customers_count_source", warehouse: "js") +sql_execute(query: "SELECT COUNT(*) AS cnt FROM JS.DATA_DIFF_TEST.customers_count_target", warehouse: "js") +``` + +### Level 2: Column Profile +For each column, use sql_execute to compare aggregates: +```sql +SELECT + COUNT(*) AS total_rows, + COUNT(col) AS non_null, + COUNT(DISTINCT col) AS distinct_count, + MIN(col) AS min_val, + MAX(col) AS max_val, + AVG(col::DOUBLE) AS avg_val, -- numeric only + SUM(col::DOUBLE) AS sum_val -- numeric only +FROM table +``` + +Run this on both source and target via sql_execute. Present results side by side. + +### Level 3: Segment Checksums (same-dialect only) +Split the key space into 32 buckets and compare hash checksums per bucket. + +Snowflake: +```sql +SELECT + FLOOR((id - {min}) * 32 / ({max} - {min} + 1)) AS bucket, + COUNT(*) AS cnt, + BITXOR_AGG(HASH(col1, col2, col3)) AS checksum +FROM table WHERE id >= {min} AND id <= {max} +GROUP BY bucket ORDER BY bucket +``` + +DuckDB: +```sql +SELECT + FLOOR((id - {min}) * 32 / ({max} - {min} + 1)) AS bucket, + COUNT(*) AS cnt, + BIT_XOR(md5_number_lower64(CONCAT(col1::text, col2::text))) AS checksum +FROM table WHERE id >= {min} AND id <= {max} +GROUP BY bucket ORDER BY bucket +``` + +When source and target use DIFFERENT dialects (e.g. Snowflake vs DuckDB), SKIP this level — hash functions differ so checksums won't match even for identical data. Go directly to Level 4. + +### Level 4: Row-Level Diff (targeted) +For mismatched segments, fetch actual rows via sql_execute with a higher limit: +``` +sql_execute(query: "SELECT * FROM table WHERE id >= {seg_min} AND id < {seg_max} ORDER BY id", warehouse: "js", limit: 1000) +``` + +Compare rows and report additions, deletions, and value changes. + +## Output Format + +Every validation MUST end with a structured summary: +``` +Data Validation Report +====================== +Source: {warehouse}.{table} +Target: {warehouse}.{table} +Status: PASS | FAIL + +Level 1 — Row Count: PASS (1,234 rows both sides) | FAIL (1,234 vs 1,184) +Level 2 — Profile: PASS (all columns match) | FAIL (3 columns differ) +Level 3 — Checksum: PASS (32/32 match) | FAIL (3/32 differ) | SKIPPED +Level 4 — Row Diff: X rows differ | SKIPPED + +Confidence: HIGH | MEDIUM | LOW +``` + +## Key Principles + +1. **Cheapest check first** — COUNT(*) takes milliseconds. Don't skip it. +2. **Cross-database awareness** — When warehouses use different dialects, skip Level 3 checksums. +3. **Show your work** — Display the SQL and results at each level. +4. **Use tools, not bash** — sql_execute for queries, schema_inspect for schemas, warehouse_list for connections. +5. **Respect cost** — Use WHERE filters when available. Don't full-scan TB tables without consent. diff --git a/packages/opencode/src/altimate/prompts/migrator.txt b/packages/opencode/src/altimate/prompts/migrator.txt index 6f92c21fa6..e9753617cf 100644 --- a/packages/opencode/src/altimate/prompts/migrator.txt +++ b/packages/opencode/src/altimate/prompts/migrator.txt @@ -23,6 +23,7 @@ When migrating: ## Available Skills You have access to these skills that users can invoke with /: +- /data-validate — Progressive data validation: row counts → profiles → checksums → row diff - /sql-translate — Cross-dialect SQL translation with warnings - /lineage-diff — Compare column lineage between SQL versions - /query-optimize — Query optimization with anti-pattern detection diff --git a/packages/opencode/src/altimate/prompts/validator.txt b/packages/opencode/src/altimate/prompts/validator.txt index 636e6e39cb..45db8e1636 100644 --- a/packages/opencode/src/altimate/prompts/validator.txt +++ b/packages/opencode/src/altimate/prompts/validator.txt @@ -95,6 +95,7 @@ Report the checklist with pass/fail/skip status for each item. - read, grep, glob — File reading ## Skills Available (read-only — these produce analysis, not file changes) +- /data-validate — Progressive data validation: row counts → profiles → checksums → row diff - /lineage-diff — Compare column lineage between SQL versions - /cost-report — Snowflake cost analysis with optimization suggestions - /query-optimize — Query optimization with anti-pattern detection From c5773ed43702c503547b511bed59f17ca64fa0e8 Mon Sep 17 00:00:00 2001 From: suryaiyer95 Date: Mon, 9 Mar 2026 20:43:30 -0700 Subject: [PATCH 2/3] feat: wire `data_diff` tool through reladiff engine for deterministic data validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the full pipeline: TypeScript tool → Bridge → Python orchestrator → Rust engine. - `data-diff-run.ts`: TypeScript tool wrapping `Bridge.call("data_diff.run")` - `data_diff.py`: Python orchestrator driving the cooperative state machine loop via `altimate_core.ReladiffSession` (start → execute SQL → step → repeat) - `server.py`: Added `data_diff.run` dispatch to JSON-RPC bridge - `protocol.ts`: `DataDiffRunParams`/`DataDiffRunResult` interfaces + bridge method - `registry.ts`: Registered `DataDiffRunTool` in tool registry - `agent.ts`: Added `data_diff: "allow"` to data-diff agent permissions - `data-diff.txt`: Rewrote prompt to use `data_diff` tool as primary approach, with manual SQL as fallback Co-Authored-By: Claude Opus 4.6 --- .../src/altimate_engine/server.py | 18 ++ .../src/altimate_engine/sql/data_diff.py | 201 ++++++++++++++++++ packages/opencode/src/agent/agent.ts | 1 + .../opencode/src/altimate/bridge/protocol.ts | 26 +++ .../src/altimate/prompts/data-diff.txt | 138 ++++++------ .../src/altimate/tools/data-diff-run.ts | 129 +++++++++++ packages/opencode/src/tool/registry.ts | 2 + 7 files changed, 439 insertions(+), 76 deletions(-) create mode 100644 packages/altimate-engine/src/altimate_engine/sql/data_diff.py create mode 100644 packages/opencode/src/altimate/tools/data-diff-run.ts diff --git a/packages/altimate-engine/src/altimate_engine/server.py b/packages/altimate-engine/src/altimate_engine/server.py index b29bd7438a..9431b1fd0d 100644 --- a/packages/altimate-engine/src/altimate_engine/server.py +++ b/packages/altimate-engine/src/altimate_engine/server.py @@ -953,6 +953,24 @@ def dispatch(request: JsonRpcRequest) -> JsonRpcResponse: target_dialect=p.target_dialect, ) result = LocalTestResult(**raw) + elif method == "data_diff.run": + from altimate_engine.sql.data_diff import run_data_diff + + raw = run_data_diff( + source_table=params.get("source_table", ""), + target_table=params.get("target_table", ""), + source_warehouse=params.get("source_warehouse", ""), + target_warehouse=params.get("target_warehouse"), + key_columns=params.get("key_columns", []), + extra_columns=params.get("extra_columns"), + algorithm=params.get("algorithm", "auto"), + where_clause=params.get("where_clause"), + source_database=params.get("source_database"), + source_schema=params.get("source_schema"), + target_database=params.get("target_database"), + target_schema=params.get("target_schema"), + ) + return JsonRpcResponse(result=raw, id=request.id) elif method == "ping": return JsonRpcResponse(result={"status": "ok"}, id=request.id) else: diff --git a/packages/altimate-engine/src/altimate_engine/sql/data_diff.py b/packages/altimate-engine/src/altimate_engine/sql/data_diff.py new file mode 100644 index 0000000000..4930ca0f00 --- /dev/null +++ b/packages/altimate-engine/src/altimate_engine/sql/data_diff.py @@ -0,0 +1,201 @@ +"""Deterministic data diff engine using altimate-core's reladiff state machine. + +Orchestrates the cooperative Rust state machine: creates a session, loops +start() → execute SQL → step() until Done or Error. All SQL execution goes +through the existing ConnectionRegistry. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any + +from altimate_engine.connections import ConnectionRegistry +from altimate_engine.models import SqlExecuteParams +from altimate_engine.sql.executor import execute_sql + +logger = logging.getLogger(__name__) + +try: + import altimate_core + + RELADIFF_AVAILABLE = True +except ImportError: + RELADIFF_AVAILABLE = False + +# Map TableSide enum values to warehouse names +_SIDE_MAP = {"Table1": "source", "Table2": "target"} + + +def _resolve_dialect(warehouse_name: str) -> str: + """Infer SQL dialect from connection type.""" + try: + conn = ConnectionRegistry.get(warehouse_name) + conn_type = getattr(conn, "type", "").lower() + dialect_map = { + "snowflake": "snowflake", + "duckdb": "duckdb", + "postgres": "postgres", + "postgresql": "postgres", + "bigquery": "bigquery", + "mysql": "mysql", + "clickhouse": "clickhouse", + "databricks": "databricks", + "redshift": "redshift", + } + return dialect_map.get(conn_type, "generic") + except Exception: + return "generic" + + +def _execute_task(task: dict, warehouse: str) -> dict: + """Execute a single SQL task against the given warehouse.""" + result = execute_sql( + SqlExecuteParams(sql=task["sql"], warehouse=warehouse, limit=100_000) + ) + + # Convert SqlExecuteResult rows to the format expected by ReladiffSession.step() + rows: list[list[str | None]] = [] + for row in result.rows: + rows.append([str(v) if v is not None else None for v in row]) + + return {"id": task["id"], "rows": rows} + + +def run_data_diff( + *, + source_table: str, + target_table: str, + source_warehouse: str, + target_warehouse: str | None = None, + key_columns: list[str], + extra_columns: list[str] | None = None, + algorithm: str = "auto", + where_clause: str | None = None, + source_database: str | None = None, + source_schema: str | None = None, + target_database: str | None = None, + target_schema: str | None = None, +) -> dict[str, Any]: + """Run a deterministic data diff using the Rust reladiff engine. + + Returns the complete validation result as a dict. + """ + if not RELADIFF_AVAILABLE: + return { + "success": False, + "error": "altimate-core not installed. ReladiffSession unavailable.", + } + + target_warehouse = target_warehouse or source_warehouse + + # Resolve dialects from connection types + dialect1 = _resolve_dialect(source_warehouse) + dialect2 = _resolve_dialect(target_warehouse) + + # Build session spec + table1: dict[str, Any] = {"table": source_table} + if source_database: + table1["database"] = source_database + if source_schema: + table1["schema"] = source_schema + + table2: dict[str, Any] = {"table": target_table} + if target_database: + table2["database"] = target_database + if target_schema: + table2["schema"] = target_schema + + spec = { + "table1": table1, + "table2": table2, + "dialect1": dialect1, + "dialect2": dialect2, + "config": { + "algorithm": algorithm, + "key_columns": key_columns, + "extra_columns": extra_columns or [], + }, + } + + if where_clause: + spec["config"]["where_clause"] = where_clause + + logger.info("Starting reladiff session: %s", json.dumps(spec, indent=2)) + + # Create session and run the state machine loop + try: + session = altimate_core.ReladiffSession(json.dumps(spec)) + except Exception as e: + return {"success": False, "error": f"Failed to create session: {e}"} + + # Map table sides to warehouses + warehouse_map = {"Table1": source_warehouse, "Table2": target_warehouse} + + action = session.start() + step_count = 0 + max_steps = 100 # Safety limit + + while step_count < max_steps: + step_count += 1 + action_type = action.get("type") + + if action_type == "Done": + outcome = action.get("outcome", {}) + return { + "success": True, + "status": "completed", + "steps": step_count, + "outcome": outcome, + } + + if action_type == "Error": + return { + "success": False, + "error": action.get("message", "Unknown engine error"), + "steps": step_count, + } + + if action_type != "ExecuteSql": + return { + "success": False, + "error": f"Unexpected action type: {action_type}", + "steps": step_count, + } + + # Execute all SQL tasks + tasks = action.get("tasks", []) + responses = [] + + for task in tasks: + side = task.get("table_side", "Table1") + wh = warehouse_map.get(side, source_warehouse) + + logger.info( + "Step %d: Executing [%s] on %s: %s", + step_count, + side, + wh, + task["sql"][:120], + ) + + try: + resp = _execute_task(task, wh) + responses.append(resp) + except Exception as e: + return { + "success": False, + "error": f"SQL execution failed on {wh}: {e}", + "steps": step_count, + "failed_sql": task["sql"], + } + + # Feed responses back to the engine + action = session.step(json.dumps(responses)) + + return { + "success": False, + "error": f"State machine did not converge after {max_steps} steps", + "steps": step_count, + } diff --git a/packages/opencode/src/agent/agent.ts b/packages/opencode/src/agent/agent.ts index ac9a256da2..0ac962f0bd 100644 --- a/packages/opencode/src/agent/agent.ts +++ b/packages/opencode/src/agent/agent.ts @@ -244,6 +244,7 @@ export namespace Agent { altimate_core_validate: "allow", altimate_core_lint: "allow", altimate_core_safety: "allow", altimate_core_transpile: "allow", altimate_core_check: "allow", + data_diff: "allow", read: "allow", write: "allow", edit: "allow", grep: "allow", glob: "allow", bash: "allow", question: "allow", diff --git a/packages/opencode/src/altimate/bridge/protocol.ts b/packages/opencode/src/altimate/bridge/protocol.ts index ccaa99fc7b..39673491b2 100644 --- a/packages/opencode/src/altimate/bridge/protocol.ts +++ b/packages/opencode/src/altimate/bridge/protocol.ts @@ -876,6 +876,31 @@ export interface AltimateCoreIsSafeParams { sql: string } +// --- data diff (reladiff) --- +export interface DataDiffRunParams { + source_table: string + target_table: string + source_warehouse: string + target_warehouse?: string + key_columns: string[] + extra_columns?: string[] + algorithm?: "auto" | "hashdiff" | "joindiff" | "profile" | "recon" | "cascade" + where_clause?: string + source_database?: string + source_schema?: string + target_database?: string + target_schema?: string +} + +export interface DataDiffRunResult { + success: boolean + status?: string + error?: string + steps?: number + outcome?: Record + failed_sql?: string +} + // --- dbt Lineage --- export interface DbtLineageParams { @@ -1032,6 +1057,7 @@ export const BridgeMethods = { "altimate_core.introspection_sql": {} as { params: AltimateCoreIntrospectionSqlParams; result: AltimateCoreResult }, "altimate_core.parse_dbt": {} as { params: AltimateCoreParseDbtParams; result: AltimateCoreResult }, "altimate_core.is_safe": {} as { params: AltimateCoreIsSafeParams; result: AltimateCoreResult }, + "data_diff.run": {} as { params: DataDiffRunParams; result: DataDiffRunResult }, ping: {} as { params: Record; result: { status: string } }, } as const diff --git a/packages/opencode/src/altimate/prompts/data-diff.txt b/packages/opencode/src/altimate/prompts/data-diff.txt index 2ac4287b3d..9e5804e12c 100644 --- a/packages/opencode/src/altimate/prompts/data-diff.txt +++ b/packages/opencode/src/altimate/prompts/data-diff.txt @@ -1,89 +1,71 @@ You are altimate-code in data-diff mode — a cross-database data validation agent. -Your purpose is to compare data between two tables (same database or different warehouses) and determine whether they contain the same data. You use a progressive validation approach: cheap checks first, expensive checks only when needed. +Your purpose is to compare data between two tables (same database or different warehouses) and determine whether they contain the same data. -## CRITICAL: Always Use Built-in Tools +## PRIMARY TOOL: `data_diff` -NEVER use bash, pip install, or raw Python scripts to query databases. You have dedicated tools: +**Always use the `data_diff` tool as your first choice.** It wraps a deterministic Rust engine (reladiff) that handles checksums, bisection, and row-level comparison automatically — far more reliable than hand-written SQL. -### Step 0: Discover connections -Call `warehouse_list` (no parameters) to see all configured warehouses. -Call `warehouse_test` with `name: "js"` to verify a connection works. +### How to use `data_diff` -### Step 1: Inspect schemas -Call `schema_inspect` with `table: "DATA_DIFF_TEST.customers_identical_source"` and `warehouse: "js"` to see columns and types. +1. Call `warehouse_list` to discover configured warehouses. +2. Call `schema_inspect` to examine both tables and identify key columns. +3. Call `data_diff` with: + - `source_table`: source table name (without database/schema prefix) + - `target_table`: target table name + - `source_warehouse`: warehouse connection name from warehouse_list + - `target_warehouse`: (optional) target warehouse, defaults to source + - `key_columns`: primary key column(s) that uniquely identify each row + - `extra_columns`: (optional) additional columns to compare beyond keys + - `algorithm`: (optional) "auto" (default), "hashdiff", "joindiff", "profile", "recon", or "cascade" + - `where_clause`: (optional) WHERE filter applied to both tables + - `source_database`, `source_schema`, `target_database`, `target_schema`: (optional) fully qualify tables -### Step 2: Execute SQL -Call `sql_execute` with: -- `query`: the SQL string -- `warehouse`: connection name (e.g. "js" for Snowflake, "test_duckdb" for DuckDB) -- `limit`: max rows (default 100, increase for row-level diffs) +### Algorithm selection -Example: `sql_execute(query: "SELECT COUNT(*) FROM JS.DATA_DIFF_TEST.customers_identical_source", warehouse: "js")` +- **auto** (default): JoinDiff if same database, HashDiff if cross-database +- **hashdiff**: Bisection with checksums — works cross-database, scales to billions of rows +- **joindiff**: FULL OUTER JOIN — fastest for same-database, exact results in one pass +- **profile**: Column-level statistics only (count, nulls, distinct, min/max) — no row-level diff +- **cascade**: Progressive — count → profile → content diff, stops as soon as mismatch found +- **recon**: Reconciliation with custom tolerance rules -NEVER fall back to bash or Python for SQL execution. If sql_execute fails, report the error — do not try to work around it. +### Example -## Validation Protocol - -Always follow this progressive approach — stop as soon as you have a definitive answer: - -### Level 1: Row Count (run FIRST, always) -Use sql_execute to run `SELECT COUNT(*) AS row_count FROM {table}` on both tables via their respective warehouse connections. If counts differ, report the delta immediately. - -Example: ``` -sql_execute(query: "SELECT COUNT(*) AS cnt FROM JS.DATA_DIFF_TEST.customers_count_source", warehouse: "js") -sql_execute(query: "SELECT COUNT(*) AS cnt FROM JS.DATA_DIFF_TEST.customers_count_target", warehouse: "js") +warehouse_list() +schema_inspect(table: "DATA_DIFF_TEST.customers_source", warehouse: "js") +data_diff( + source_table: "customers_source", + target_table: "customers_target", + source_warehouse: "js", + key_columns: ["id"], + extra_columns: ["name", "email", "amount"], + source_database: "JS", + source_schema: "DATA_DIFF_TEST" +) ``` -### Level 2: Column Profile -For each column, use sql_execute to compare aggregates: -```sql -SELECT - COUNT(*) AS total_rows, - COUNT(col) AS non_null, - COUNT(DISTINCT col) AS distinct_count, - MIN(col) AS min_val, - MAX(col) AS max_val, - AVG(col::DOUBLE) AS avg_val, -- numeric only - SUM(col::DOUBLE) AS sum_val -- numeric only -FROM table -``` +The tool returns a structured report with row counts, diff statistics, and any mismatched rows. -Run this on both source and target via sql_execute. Present results side by side. +## FALLBACK: Manual SQL Validation -### Level 3: Segment Checksums (same-dialect only) -Split the key space into 32 buckets and compare hash checksums per bucket. +Only use manual SQL via `sql_execute` if `data_diff` is unavailable (e.g., altimate-core not installed) or if you need a quick ad-hoc check outside the tool's scope. -Snowflake: -```sql -SELECT - FLOOR((id - {min}) * 32 / ({max} - {min} + 1)) AS bucket, - COUNT(*) AS cnt, - BITXOR_AGG(HASH(col1, col2, col3)) AS checksum -FROM table WHERE id >= {min} AND id <= {max} -GROUP BY bucket ORDER BY bucket -``` +### Manual tools available +- `warehouse_list` — discover configured warehouses +- `warehouse_test` — verify a connection works +- `schema_inspect` — inspect table schemas +- `sql_execute` — run SQL queries (with `query`, `warehouse`, `limit` parameters) -DuckDB: -```sql -SELECT - FLOOR((id - {min}) * 32 / ({max} - {min} + 1)) AS bucket, - COUNT(*) AS cnt, - BIT_XOR(md5_number_lower64(CONCAT(col1::text, col2::text))) AS checksum -FROM table WHERE id >= {min} AND id <= {max} -GROUP BY bucket ORDER BY bucket -``` - -When source and target use DIFFERENT dialects (e.g. Snowflake vs DuckDB), SKIP this level — hash functions differ so checksums won't match even for identical data. Go directly to Level 4. +### Manual progressive approach (when data_diff unavailable) -### Level 4: Row-Level Diff (targeted) -For mismatched segments, fetch actual rows via sql_execute with a higher limit: -``` -sql_execute(query: "SELECT * FROM table WHERE id >= {seg_min} AND id < {seg_max} ORDER BY id", warehouse: "js", limit: 1000) -``` +**Level 1: Row Count** — `SELECT COUNT(*) FROM {table}` on both sides. +**Level 2: Column Profile** — COUNT, COUNT(DISTINCT), MIN, MAX, AVG, SUM per column. +**Level 3: Segment Checksums** — Split key space into 32 buckets, compare hash checksums (same-dialect only). +**Level 4: Row-Level Diff** — Fetch actual mismatched rows for targeted comparison. -Compare rows and report additions, deletions, and value changes. +NEVER use bash, pip install, or raw Python scripts for database queries. ## Output Format @@ -93,20 +75,24 @@ Data Validation Report ====================== Source: {warehouse}.{table} Target: {warehouse}.{table} +Algorithm: {algorithm used} Status: PASS | FAIL -Level 1 — Row Count: PASS (1,234 rows both sides) | FAIL (1,234 vs 1,184) -Level 2 — Profile: PASS (all columns match) | FAIL (3 columns differ) -Level 3 — Checksum: PASS (32/32 match) | FAIL (3/32 differ) | SKIPPED -Level 4 — Row Diff: X rows differ | SKIPPED +Results: + Rows table1: X + Rows table2: Y + Exclusive to table1: A + Exclusive to table2: B + Updated: C + Unchanged: D Confidence: HIGH | MEDIUM | LOW ``` ## Key Principles -1. **Cheapest check first** — COUNT(*) takes milliseconds. Don't skip it. -2. **Cross-database awareness** — When warehouses use different dialects, skip Level 3 checksums. -3. **Show your work** — Display the SQL and results at each level. -4. **Use tools, not bash** — sql_execute for queries, schema_inspect for schemas, warehouse_list for connections. -5. **Respect cost** — Use WHERE filters when available. Don't full-scan TB tables without consent. +1. **Use `data_diff` first** — it's deterministic, handles dialect differences, and scales. +2. **Cheapest check first** — if using manual mode, start with COUNT(*). +3. **Cross-database awareness** — `data_diff` auto-selects the right algorithm. Manual checksums won't work across dialects. +4. **Show your work** — display the tool call parameters and results. +5. **Respect cost** — use WHERE filters when available. Don't full-scan TB tables without consent. diff --git a/packages/opencode/src/altimate/tools/data-diff-run.ts b/packages/opencode/src/altimate/tools/data-diff-run.ts new file mode 100644 index 0000000000..734b7718ac --- /dev/null +++ b/packages/opencode/src/altimate/tools/data-diff-run.ts @@ -0,0 +1,129 @@ +import z from "zod" +import { Tool } from "../../tool/tool" +import { Bridge } from "../bridge/client" + +export const DataDiffRunTool = Tool.define("data_diff", { + description: + "Run deterministic data validation between two database tables using the reladiff engine. " + + "Compares tables row-by-row using checksums and bisection search (cross-database) or " + + "FULL OUTER JOIN (same database). Returns a structured diff report.", + parameters: z.object({ + source_table: z.string().describe("Source table name (without database/schema prefix)"), + target_table: z.string().describe("Target table name (without database/schema prefix)"), + source_warehouse: z.string().describe("Source warehouse connection name (from warehouse_list)"), + target_warehouse: z + .string() + .optional() + .describe("Target warehouse connection name. Defaults to source_warehouse if same database."), + key_columns: z.array(z.string()).describe("Primary key column(s) that uniquely identify each row"), + extra_columns: z + .array(z.string()) + .optional() + .describe("Additional columns to compare beyond the key columns"), + algorithm: z + .enum(["auto", "hashdiff", "joindiff", "profile", "recon", "cascade"]) + .optional() + .default("auto") + .describe( + "Comparison algorithm. auto=JoinDiff if same DB, HashDiff if cross-DB. " + + "profile=column statistics only. cascade=count→profile→content.", + ), + where_clause: z.string().optional().describe("Optional WHERE filter applied to both tables"), + source_database: z.string().optional().describe("Source database/catalog name"), + source_schema: z.string().optional().describe("Source schema name"), + target_database: z.string().optional().describe("Target database/catalog name"), + target_schema: z.string().optional().describe("Target schema name"), + }), + async execute(args, ctx) { + try { + const result = await Bridge.call("data_diff.run", { + source_table: args.source_table, + target_table: args.target_table, + source_warehouse: args.source_warehouse, + target_warehouse: args.target_warehouse, + key_columns: args.key_columns, + extra_columns: args.extra_columns, + algorithm: args.algorithm, + where_clause: args.where_clause, + source_database: args.source_database, + source_schema: args.source_schema, + target_database: args.target_database, + target_schema: args.target_schema, + }) + + if (!result.success) { + return { + title: `Data Diff: FAILED`, + metadata: { status: "error", steps: result.steps ?? 0 }, + output: `Error: ${result.error}\n\nSteps completed: ${result.steps ?? 0}`, + } + } + + const outcome = result.outcome ?? {} + const output = formatOutcome(outcome, args, result.steps ?? 0) + + return { + title: `Data Diff: ${args.source_table} ↔ ${args.target_table}`, + metadata: { status: "completed", steps: result.steps }, + output, + } + } catch (e) { + const msg = e instanceof Error ? e.message : String(e) + return { + title: "Data Diff: ERROR", + metadata: { status: "error", steps: 0 }, + output: `Failed to run data diff: ${msg}\n\nEnsure altimate-core is installed in the engine.`, + } + } + }, +}) + +function formatOutcome(outcome: Record, args: Record, steps: number): string { + const mode = outcome.mode as string | undefined + const lines: string[] = [] + + lines.push("```") + lines.push("Data Validation Report") + lines.push("======================") + lines.push(`Source: ${args.source_warehouse}.${args.source_table}`) + lines.push(`Target: ${(args.target_warehouse || args.source_warehouse)}.${args.target_table}`) + lines.push(`Algorithm: ${args.algorithm || "auto"}`) + lines.push(`Steps: ${steps}`) + lines.push("") + + if (mode === "diff") { + const stats = (outcome.stats ?? {}) as Record + const diffRows = (outcome.diff_rows ?? []) as unknown[] + const pass = diffRows.length === 0 + lines.push(`Status: ${pass ? "PASS ✓" : "FAIL ✗"}`) + lines.push(`Rows table1: ${stats.rows_table1 ?? "?"}`) + lines.push(`Rows table2: ${stats.rows_table2 ?? "?"}`) + if (!pass) { + lines.push(`Exclusive to table1: ${stats.exclusive_table1 ?? 0}`) + lines.push(`Exclusive to table2: ${stats.exclusive_table2 ?? 0}`) + lines.push(`Updated: ${stats.updated ?? 0}`) + lines.push(`Diff %: ${((stats.diff_percent as number) * 100).toFixed(2)}%`) + } else { + lines.push(`Unchanged: ${stats.unchanged ?? stats.rows_table1}`) + } + } else if (mode === "profile") { + const verdict = outcome.overall_verdict as string + lines.push(`Status: ${verdict === "match" ? "PASS ✓" : "FAIL ✗"}`) + lines.push(`Overall verdict: ${verdict}`) + const cols = (outcome.columns ?? []) as Record[] + for (const col of cols) { + lines.push(` ${col.column}: ${col.verdict}`) + } + } else if (mode === "cascade") { + const countResult = (outcome.count_result ?? {}) as Record + lines.push(`Stage: ${outcome.stopped_at}`) + lines.push(`Count table1: ${countResult.count_table1}`) + lines.push(`Count table2: ${countResult.count_table2}`) + lines.push(`Count match: ${countResult.match_ ? "YES" : "NO"}`) + } else { + lines.push(JSON.stringify(outcome, null, 2)) + } + + lines.push("```") + return lines.join("\n") +} diff --git a/packages/opencode/src/tool/registry.ts b/packages/opencode/src/tool/registry.ts index 3f93520678..d45af33e76 100644 --- a/packages/opencode/src/tool/registry.ts +++ b/packages/opencode/src/tool/registry.ts @@ -98,6 +98,7 @@ import { AltimateCoreFingerprintTool } from "../altimate/tools/altimate-core-fin import { AltimateCoreIntrospectionSqlTool } from "../altimate/tools/altimate-core-introspection-sql" import { AltimateCoreParseDbtTool } from "../altimate/tools/altimate-core-parse-dbt" import { AltimateCoreIsSafeTool } from "../altimate/tools/altimate-core-is-safe" +import { DataDiffRunTool } from "../altimate/tools/data-diff-run" import { ProjectScanTool } from "../altimate/tools/project-scan" // altimate_change end @@ -260,6 +261,7 @@ export namespace ToolRegistry { AltimateCoreIntrospectionSqlTool, AltimateCoreParseDbtTool, AltimateCoreIsSafeTool, + DataDiffRunTool, ProjectScanTool, // altimate_change end ...custom, From 17138481d588ee1567d8955789cd616852bd2c28 Mon Sep 17 00:00:00 2001 From: suryaiyer95 Date: Mon, 9 Mar 2026 21:19:20 -0700 Subject: [PATCH 3/3] =?UTF-8?q?docs:=20update=20lineage=20guard=20docstrin?= =?UTF-8?q?gs=20=E2=80=94=20no=20longer=20requires=20API=20keys?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reflects altimate-core change: `column_lineage` and `track_lineage` now work without credentials. SDK logging activates when initialized. Co-Authored-By: Claude Opus 4.6 --- packages/altimate-engine/src/altimate_engine/sql/guard.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/altimate-engine/src/altimate_engine/sql/guard.py b/packages/altimate-engine/src/altimate_engine/sql/guard.py index de757a142a..0ed5fb7d07 100644 --- a/packages/altimate-engine/src/altimate_engine/sql/guard.py +++ b/packages/altimate-engine/src/altimate_engine/sql/guard.py @@ -449,7 +449,7 @@ def guard_column_lineage( default_database: str = "", default_schema: str = "", ) -> dict: - """Schema-aware column lineage (requires altimate_core.init).""" + """Schema-aware column lineage. Works without API keys; logs usage when initialized.""" if not ALTIMATE_CORE_AVAILABLE: return _not_installed_result() try: @@ -471,7 +471,7 @@ def guard_track_lineage( schema_path: str = "", schema_context: dict[str, Any] | None = None, ) -> dict: - """Track lineage across multiple queries (requires altimate_core.init).""" + """Track lineage across multiple queries. Works without API keys; logs usage when initialized.""" if not ALTIMATE_CORE_AVAILABLE: return _not_installed_result() try: