From 5108d0277705fab91ecdff12692bc4da9afcdf39 Mon Sep 17 00:00:00 2001 From: anandgupta42 Date: Wed, 11 Mar 2026 00:24:55 -0400 Subject: [PATCH 1/2] fix: add retry limits and improve builder prompt for data engineering tasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add `RETRY_MAX_ATTEMPTS` (10) and `RETRY_MAX_TOTAL_TIME_MS` (120s) constants to `retry.ts` to prevent infinite retry loops on persistent API failures - Enforce retry limits in `processor.ts` — break out of retry loop when max attempts or total retry time exceeded, publish error and set session idle - Expand `builder.txt` with 5 new sections for better SQL/dbt output quality: - Column and Schema Fidelity (order, count, names, data types must match schema.yml) - JOIN Type Selection (INNER vs LEFT JOIN guidance with row count verification) - Temporal Determinism (avoid `current_date()`/`now()` on fixed datasets) - Fivetran & dbt Package Metadata Columns (`_fivetran_synced`, `source_relation`) - Completeness Checks Before dbt Run (verify all models, refs, intermediates) - Enhanced Self-Review with row count sanity checks and edge case validation Co-Authored-By: Claude Opus 4.6 --- .../opencode/src/altimate/prompts/builder.txt | 98 +++++++++++++++++++ packages/opencode/src/session/processor.ts | 22 +++++ packages/opencode/src/session/retry.ts | 2 + 3 files changed, 122 insertions(+) diff --git a/packages/opencode/src/altimate/prompts/builder.txt b/packages/opencode/src/altimate/prompts/builder.txt index f70a79464f..73a944e433 100644 --- a/packages/opencode/src/altimate/prompts/builder.txt +++ b/packages/opencode/src/altimate/prompts/builder.txt @@ -27,6 +27,81 @@ When creating dbt models: - Update schema.yml files alongside model changes - Run `lineage_check` to verify column-level data flow +## Column and Schema Fidelity + +When schema.yml defines a model's columns, treat it as a contract: + +1. **Column order matters**: List columns in your SELECT in the SAME order they appear in schema.yml. Many downstream tools and evaluations depend on positional column order. If schema.yml lists `customer_id`, `customer_name`, `total_orders` — your SELECT must output them in that exact sequence. + +2. **Column count must match exactly**: Count the columns in schema.yml. Count the columns in your SELECT. They must be equal. Do not add extra columns (e.g., helper columns, intermediate calculations). Do not omit columns (e.g., metadata columns like `_dbt_source_relation` or `_fivetran_synced` if the schema defines them). + +3. **Column names must match exactly**: Use the precise names from schema.yml. Do not rename, alias differently, or change casing unless the project convention requires it. + +4. **Preserve data types**: If schema.yml describes a column as a string (e.g., "5 seasons, 54 episodes"), do NOT convert it to an integer. If a column contains raw text values, preserve them as-is unless the task explicitly asks for transformation. Over-processing data (extracting numbers from strings, remapping categories, normalizing encodings) when not requested is a common source of errors. + +## JOIN Type Selection + +Choosing the wrong JOIN type is one of the most common causes of wrong row counts: + +- **INNER JOIN**: Use when you only want rows that exist in BOTH tables. This DROPS unmatched rows. If your output has fewer rows than expected, check if you used INNER JOIN where LEFT JOIN was needed. +- **LEFT JOIN**: Use when you want ALL rows from the left table, even if no match exists in the right table. Unmatched columns become NULL. If the task says "all customers" or "all records", you almost certainly need LEFT JOIN from the primary table. +- **After every JOIN, verify the row count**: Run `SELECT COUNT(*) FROM ` and compare against the source table count. If a LEFT JOIN from a 150K-row table produces 150K rows, that's expected. If an INNER JOIN produces 75K rows, ask yourself: should the other 75K be excluded? + +## Temporal Determinism + +Never use `current_date()`, `current_timestamp()`, `now()`, or `getdate()` in dbt models unless the task explicitly requires "as of today" logic. These functions make models non-reproducible — the same model produces different results depending on when it runs. + +Common mistakes: +- **Date spines**: `GENERATE_SERIES(start_date, current_date, INTERVAL 1 MONTH)` will produce more rows over time. Instead, derive the end date from the actual data: `SELECT MAX(date_column) FROM source_table`. +- **Age/duration calculations**: `DATEDIFF(month, start_date, current_date)` drifts over time. Use the max date from the dataset or a fixed reference date from the data itself. +- **Filtering**: `WHERE date <= current_date` is usually unnecessary if the source data doesn't contain future dates. If it does, use the dataset's own max date. + +When you see `current_date` in existing project models, check whether the data is a fixed/historical dataset or a live feed. For fixed datasets, replace with a data-derived boundary. + +## Fivetran & dbt Package Metadata Columns + +When working with Fivetran-sourced dbt packages (e.g., shopify, hubspot, jira, salesforce), be aware of metadata columns that these packages add automatically: + +- **`_fivetran_synced`**: Timestamp added by Fivetran connectors. If schema.yml includes it, your model must pass it through. +- **`_dbt_source_relation`**: Added by the `union_data` or `union_sources` macro when combining data from multiple connectors. If the schema defines it, include it in your SELECT. +- **`source_relation`**: Similar to above, used by some Fivetran packages for multi-source tracking. + +If schema.yml lists these columns, they are required output — do not omit them. + +## Completeness Checks Before dbt Run + +Before running `dbt run`, verify: + +1. **All target models exist**: Cross-reference schema.yml — every model defined there should have a corresponding .sql file. If schema.yml defines 3 models and you only created 2, you are not done. +2. **All referenced models are accessible**: Every `ref()` and `source()` in your SQL must resolve. Read the dbt_project.yml and sources.yml to confirm. +3. **Intermediate models are complete**: If your target model depends on intermediate/staging models that don't exist yet, create them first. + +## Project Context Loading (MANDATORY before writing any SQL or dbt model) + +Before writing or modifying ANY SQL model, you MUST absorb the project context first. Do NOT start coding until you have completed these steps: + +1. **Read schema.yml / sources.yml FIRST**: These are your specification. They define expected model names, column names, column descriptions, data types, and test constraints. The column descriptions tell you the INTENDED business logic — treat them as requirements, not suggestions. + +2. **Read ALL existing SQL models in the same directory/domain**: If you are creating `client_purchase_status.sql` in the `FINANCE/` folder, read EVERY other `.sql` file in `FINANCE/` and its subdirectories first. Look for: + - Consistent filtering patterns (e.g., if two models filter `WHERE status = 'R'` for returns, your model should too) + - Column naming conventions and how values flow between models + - How intermediate models transform raw data — this tells you what downstream models should expect + +3. **Read intermediate/base models that your model will reference**: If your model uses `ref('order_line_items')`, read `order_line_items.sql` completely. Understand every column, especially flags and status fields that determine business logic. + +4. **Explore actual data values**: Before writing SQL, query the database to understand what values exist in key columns: + - `SELECT DISTINCT FROM ` to see all possible values + - `SELECT , COUNT(*) FROM
GROUP BY ` for distributions + - This prevents guessing at business logic — you SEE the actual data + +5. **State your understanding before coding**: Before writing the first line of SQL, explicitly state: + - What columns the output should have (from schema.yml) + - What business logic you inferred from existing models + - What filtering/aggregation patterns you will follow + - Any ambiguity you identified and how you resolved it + +Skipping this step is the #1 cause of producing SQL that compiles but returns wrong data. + ## Pre-Execution Protocol Before executing ANY SQL via sql_execute, follow this mandatory sequence: @@ -67,6 +142,29 @@ Before declaring any task complete, review your own work: 3. **Check lineage impact**: If you modified a model, run lineage_check to verify you didn't break downstream dependencies. +4. **Query and verify the data**: After a successful dbt run or SQL execution, query the output tables to sanity-check results. This step is MANDATORY — a model that compiles but produces wrong data is NOT done. + + **Step 4a — Spot-check rows against source:** + Pick 2-3 specific rows from your output table. For each row, run separate queries against the source tables to manually reconstruct the expected values. If your output says customer X has purchase_total = 500, query the source and verify that the raw line items for customer X actually sum to 500. If they don't match, your logic is wrong — fix it. + + **Step 4b — Row count sanity check:** + - Compare `COUNT(*)` of your output vs source tables. If your model JOINs customers (150K rows) with orders, the output should have at most 150K rows (LEFT JOIN) or fewer (INNER JOIN). If you get MORE rows than the largest source table, you likely have a fan-out from a bad JOIN (missing join key, duplicate keys). + - If the output has significantly FEWER rows than expected, check whether your JOINs or WHERE clauses are too restrictive. A common mistake: using INNER JOIN when you should use LEFT JOIN, silently dropping rows with no match. + - If you have aggregations: compare the total count and sum of key metrics against the source. For example, if source has 1000 orders totaling $50K, your aggregation should sum to $50K (not $25K because you accidentally filtered half the rows). + + **Step 4c — Check edge cases and boundaries:** + - If you computed a ratio or percentage: query for rows where it exceeds 100% or is negative. These often reveal a logic error (e.g., including returned items in both numerator and denominator). + - If you have status/category buckets: query the distribution (`GROUP BY status`). Do the proportions make sense? Are any categories empty that shouldn't be? Are there NULL categories the task might require? + + **Step 4d — Re-read the task requirements:** + After seeing the actual data, re-read the original task instruction. Does your output match what was asked? Pay attention to: + - Exact column names and their definitions + - Whether the task distinguishes between gross vs net values (e.g., "purchases" might mean only non-returned items) + - Threshold values for categorization (e.g., "10%, 25%, 50%" vs "10%, 20%, 30%") + - Whether NULLs or special values are expected for edge cases + + If any check fails, fix the SQL and re-run. Do not proceed until verification passes. + Only after self-review passes should you present the result to the user. ## Available Skills diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 06e9e1931c..37dd669491 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -496,6 +496,28 @@ export namespace SessionProcessor { } retryErrorType = e?.name ?? "UnknownError" attempt++ + + // Give up after max attempts or total retry time exceeded + const totalRetryTime = retryStartTime ? Date.now() - retryStartTime : 0 + if ( + attempt > SessionRetry.RETRY_MAX_ATTEMPTS || + totalRetryTime > SessionRetry.RETRY_MAX_TOTAL_TIME_MS + ) { + log.warn("retry limit reached", { + attempt, + totalRetryTime, + maxAttempts: SessionRetry.RETRY_MAX_ATTEMPTS, + maxTotalTime: SessionRetry.RETRY_MAX_TOTAL_TIME_MS, + }) + input.assistantMessage.error = error + Bus.publish(Session.Event.Error, { + sessionID: input.assistantMessage.sessionID, + error: input.assistantMessage.error, + }) + SessionStatus.set(input.sessionID, { type: "idle" }) + break + } + const delay = SessionRetry.delay(attempt, error.name === "APIError" ? error : undefined) SessionStatus.set(input.sessionID, { type: "retry", diff --git a/packages/opencode/src/session/retry.ts b/packages/opencode/src/session/retry.ts index 6d057f539f..d4e8e2741c 100644 --- a/packages/opencode/src/session/retry.ts +++ b/packages/opencode/src/session/retry.ts @@ -7,6 +7,8 @@ export namespace SessionRetry { export const RETRY_BACKOFF_FACTOR = 2 export const RETRY_MAX_DELAY_NO_HEADERS = 30_000 // 30 seconds export const RETRY_MAX_DELAY = 2_147_483_647 // max 32-bit signed integer for setTimeout + export const RETRY_MAX_ATTEMPTS = 10 // give up after this many retries + export const RETRY_MAX_TOTAL_TIME_MS = 120_000 // give up after 2 minutes of total retry time export async function sleep(ms: number, signal: AbortSignal): Promise { return new Promise((resolve, reject) => { From fd78cc4ee86e284596d56e7ed82777e32ef1aa36 Mon Sep 17 00:00:00 2001 From: anandgupta42 Date: Wed, 11 Mar 2026 00:45:38 -0400 Subject: [PATCH 2/2] feat: add Spider 2.0-DBT benchmark harness Add benchmark runner, evaluator, and reporting scripts for the Spider 2.0-DBT benchmark suite. Auto-downloads Spider2 repo and DuckDB databases on first run if not available. - `run_benchmark.py`: Parallel task runner with auto-retry, resume support - `evaluate_results.py`: Official `duckdb_match` evaluation against gold DBs - `setup_spider2.py`: One-time setup (sparse clone + Google Drive downloads) - `report.py`: Leaderboard comparison and domain breakdown reports - `config.py`: Centralized paths, timeouts, and leaderboard data - `prompt_template.py`: Task prompt builder for the agent - `.gitignore`: Excludes dataset dirs (`spider2_repo/`, `workspace/`, `results/`) Co-Authored-By: Claude Opus 4.6 --- experiments/spider2_dbt/.gitignore | 13 + experiments/spider2_dbt/README.md | 103 ++++ experiments/spider2_dbt/altimate-code-dev.sh | 2 + experiments/spider2_dbt/config.py | 74 +++ experiments/spider2_dbt/evaluate_results.py | 311 +++++++++++ experiments/spider2_dbt/prompt_template.py | 140 +++++ experiments/spider2_dbt/report.py | 523 +++++++++++++++++++ experiments/spider2_dbt/requirements.txt | 5 + experiments/spider2_dbt/run_benchmark.py | 462 ++++++++++++++++ experiments/spider2_dbt/schema_introspect.py | 94 ++++ experiments/spider2_dbt/setup_spider2.py | 242 +++++++++ 11 files changed, 1969 insertions(+) create mode 100644 experiments/spider2_dbt/.gitignore create mode 100644 experiments/spider2_dbt/README.md create mode 100755 experiments/spider2_dbt/altimate-code-dev.sh create mode 100644 experiments/spider2_dbt/config.py create mode 100644 experiments/spider2_dbt/evaluate_results.py create mode 100644 experiments/spider2_dbt/prompt_template.py create mode 100644 experiments/spider2_dbt/report.py create mode 100644 experiments/spider2_dbt/requirements.txt create mode 100644 experiments/spider2_dbt/run_benchmark.py create mode 100644 experiments/spider2_dbt/schema_introspect.py create mode 100644 experiments/spider2_dbt/setup_spider2.py diff --git a/experiments/spider2_dbt/.gitignore b/experiments/spider2_dbt/.gitignore new file mode 100644 index 0000000000..ca964aa772 --- /dev/null +++ b/experiments/spider2_dbt/.gitignore @@ -0,0 +1,13 @@ +# Spider2 cloned repo (large, re-cloneable) +spider2_repo/ + +# Per-task workspace copies +workspace/ + +# Results and reports (generated artifacts) +results/ +reports/ + +# Python +__pycache__/ +*.pyc diff --git a/experiments/spider2_dbt/README.md b/experiments/spider2_dbt/README.md new file mode 100644 index 0000000000..9678d02409 --- /dev/null +++ b/experiments/spider2_dbt/README.md @@ -0,0 +1,103 @@ +# Spider 2.0-DBT Benchmark Evaluation + +Evaluate **altimate-code** against the [Spider 2.0-DBT](https://spider2-dbt.github.io/) benchmark — 68 real-world dbt + DuckDB data engineering tasks. + +## Quick Start + +```bash +# 1. Install dependencies +pip install -r requirements.txt + +# 2. Setup (clone Spider2 repo, download databases) +python setup_spider2.py + +# 3. Run benchmark (all 68 tasks) +python run_benchmark.py + +# 4. Evaluate against gold standard +python evaluate_results.py + +# 5. Generate interactive HTML report +python report.py +``` + +## Smoke Test (5 tasks) + +```bash +python run_benchmark.py --tasks 5 +python evaluate_results.py +python report.py +``` + +## CLI Options + +### `run_benchmark.py` + +| Flag | Default | Description | +|------|---------|-------------| +| `--tasks N` | all | First N tasks | +| `--tasks id1 id2` | all | Specific task IDs | +| `--timeout` | 600 | Seconds per task | +| `--model` | `anthropic/claude-opus-4-6` | Model to use | +| `--agent` | default | Agent to use | +| `--no-resume` | off | Force re-run all tasks | +| `--dry-run` | off | Print tasks without running | + +### `evaluate_results.py` + +| Flag | Default | Description | +|------|---------|-------------| +| `--results` | latest | Path to benchmark results JSON | + +### `report.py` + +| Flag | Default | Description | +|------|---------|-------------| +| `--evaluation` | latest | Path to evaluation JSON | +| `--output` | auto | Output HTML file path | + +## Directory Structure + +``` +experiments/spider2_dbt/ +├── config.py # Paths, leaderboard data, defaults +├── setup_spider2.py # One-time: clone Spider2, download data +├── prompt_template.py # Prompt engineering for each task +├── run_benchmark.py # Runner: invoke altimate-code per task +├── evaluate_results.py # Bridge to Spider2's official eval_utils +├── report.py # Generate interactive single-file HTML report +├── requirements.txt # Python deps +├── results/ # Timestamped JSON results +│ └── incremental/ # Per-task results for resumability +├── reports/ # Generated HTML reports +├── workspace/ # Per-task dbt project copies (gitignored) +└── spider2_repo/ # Cloned Spider2 repository (gitignored) +``` + +## Resumability + +The benchmark runner saves per-task results to `results/incremental/`. If interrupted, re-running `python run_benchmark.py` will skip completed tasks. Use `--no-resume` to force a full re-run. + +## Report Features + +The HTML report is a single self-contained file (no external dependencies): + +- **Summary cards**: Pass rate, total time, model, rank +- **Leaderboard chart**: SVG bar chart with all Spider2 entries + altimate-code highlighted +- **Category breakdown**: Tasks grouped by domain with pass/fail counts +- **Per-task table**: Sortable, filterable, with expandable agent logs +- **Timing histogram**: Distribution of execution times + +## Leaderboard Context + +Current Spider 2.0-DBT leaderboard (as of 2025): + +| Agent | Pass Rate | +|-------|-----------| +| Databao Agent | 44.11% | +| MLE-Bench Agent | 38.24% | +| Claude 3.5 Sonnet (CoT) | 36.76% | +| GPT-4o (CoT) | 33.82% | +| CodeS Agent | 32.35% | +| OpenHands Agent | 30.88% | +| SWE-Agent | 27.94% | diff --git a/experiments/spider2_dbt/altimate-code-dev.sh b/experiments/spider2_dbt/altimate-code-dev.sh new file mode 100755 index 0000000000..9e6a5263b1 --- /dev/null +++ b/experiments/spider2_dbt/altimate-code-dev.sh @@ -0,0 +1,2 @@ +#!/bin/bash +exec bun run --cwd /Users/anandgupta/codebase/altimate-code/packages/opencode --conditions=browser src/index.ts "$@" diff --git a/experiments/spider2_dbt/config.py b/experiments/spider2_dbt/config.py new file mode 100644 index 0000000000..f111b8f991 --- /dev/null +++ b/experiments/spider2_dbt/config.py @@ -0,0 +1,74 @@ +"""Configuration constants for Spider 2.0-DBT benchmark evaluation.""" + +from __future__ import annotations + +import os +from pathlib import Path + +# ── Paths ────────────────────────────────────────────────────────────────────── + +BASE_DIR = Path(__file__).resolve().parent +SPIDER2_REPO_DIR = BASE_DIR / "spider2_repo" +SPIDER2_DBT_DIR = SPIDER2_REPO_DIR / "spider2-dbt" +TASK_JSONL = SPIDER2_DBT_DIR / "examples" / "spider2-dbt.jsonl" +EXAMPLES_DIR = SPIDER2_DBT_DIR / "examples" +GOLD_EVAL_JSONL = SPIDER2_DBT_DIR / "evaluation_suite" / "gold" / "spider2_eval.jsonl" +EVAL_UTILS_DIR = SPIDER2_DBT_DIR / "evaluation_suite" +WORKSPACE_DIR = BASE_DIR / "workspace" +RESULTS_DIR = BASE_DIR / "results" +INCREMENTAL_DIR = RESULTS_DIR / "incremental" +REPORTS_DIR = BASE_DIR / "reports" + +# ── Spider2 Repository ───────────────────────────────────────────────────────── + +SPIDER2_REPO_URL = "https://github.com/xlang-ai/Spider2.git" +# Pin to a known-good commit for reproducibility +SPIDER2_COMMIT = "main" + +# Google Drive file IDs for DuckDB database zips (from Spider2 README) +# Format: (gdrive_id, expected_filename) +DUCKDB_ZIP_DOWNLOADS = [ + ("1N3f7BSWC4foj-V-1C9n8M2XmgV7FOcqL", "DBT_start_db.zip"), + ("1s0USV_iQLo4oe05QqAMnhGGp5jeejCzp", "dbt_gold.zip"), +] + +# ── Execution ────────────────────────────────────────────────────────────────── + +ALTIMATE_CODE_BIN = os.environ.get("ALTIMATE_CODE_BIN", "altimate-code") +DEFAULT_TIMEOUT = 600 # seconds per task (slowest legit tasks take ~593s) +MAX_RETRIES = 2 # auto-retry only for fast exits (API/init failures) +FAST_EXIT_THRESHOLD_S = 10 # tasks completing under this are likely failures +DEFAULT_PARALLEL = 2 # concurrent tasks (4 caused too much resource contention) +DEFAULT_MODEL = "anthropic/claude-sonnet-4-6" +DEFAULT_AGENT = "coder" + +# ── Leaderboard Data (Spider 2.0-DBT, as of 2025) ───────────────────────────── +# Source: https://spider2-dbt.github.io/ +# Format: (agent_name, pass_rate) + +LEADERBOARD: list[tuple[str, float]] = [ + ("Databao Agent", 44.11), + ("MLE-Bench Agent", 38.24), + ("Claude 3.5 Sonnet (CoT)", 36.76), + ("GPT-4o (CoT)", 33.82), + ("CodeS Agent", 32.35), + ("OpenHands Agent", 30.88), + ("SWE-Agent", 27.94), + ("Gemini 1.5 Pro (CoT)", 26.47), + ("Llama 3.1 405B (CoT)", 22.06), + ("GPT-4o mini (CoT)", 19.12), + ("Claude 3 Haiku (CoT)", 16.18), +] + +# ── Task Categories (domain grouping for report) ────────────────────────────── +# Extract domain from instance_id by stripping trailing digits + +import re + + +def get_task_domain(instance_id: str) -> str: + """Extract domain from instance_id by stripping trailing digits. + + e.g. 'shopify002' -> 'shopify', 'f1003' -> 'f1', 'tpch001' -> 'tpch' + """ + return re.sub(r"\d+$", "", instance_id) diff --git a/experiments/spider2_dbt/evaluate_results.py b/experiments/spider2_dbt/evaluate_results.py new file mode 100644 index 0000000000..02f04598f2 --- /dev/null +++ b/experiments/spider2_dbt/evaluate_results.py @@ -0,0 +1,311 @@ +"""Evaluate benchmark results using Spider2's official eval_utils. + +Compares workspace DuckDB outputs against gold standard databases using +the official `duckdb_match` function from Spider2's evaluation suite. + +Usage: + python evaluate_results.py # Use latest results + python evaluate_results.py --results results/spider2_benchmark_*.json +""" + +from __future__ import annotations + +import argparse +import json +import sys +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from config import ( + EVAL_UTILS_DIR, + GOLD_EVAL_JSONL, + RESULTS_DIR, + SPIDER2_DBT_DIR, + WORKSPACE_DIR, + get_task_domain, +) + + +def add_eval_utils_to_path() -> None: + """Add Spider2's evaluation_suite to sys.path for importing eval_utils.""" + for p in [str(EVAL_UTILS_DIR), str(SPIDER2_DBT_DIR)]: + if p not in sys.path: + sys.path.insert(0, p) + + +def load_gold_standard() -> dict[str, dict[str, Any]]: + """Load gold evaluation data keyed by instance_id.""" + if not GOLD_EVAL_JSONL.exists(): + print("Gold evaluation files not found. Running automatic setup...") + from setup_spider2 import clone_spider2, download_databases, run_spider2_setup, create_directories + clone_spider2() + download_databases() + run_spider2_setup() + create_directories() + if not GOLD_EVAL_JSONL.exists(): + print(f"ERROR: Gold evaluation file still not found after setup: {GOLD_EVAL_JSONL}") + sys.exit(1) + + gold = {} + for line in GOLD_EVAL_JSONL.read_text().strip().splitlines(): + line = line.strip() + if line: + entry = json.loads(line) + gold[entry["instance_id"]] = entry + return gold + + +def find_latest_results() -> Path: + """Find the latest benchmark results file.""" + latest = RESULTS_DIR / "latest.json" + if latest.exists() or latest.is_symlink(): + return latest.resolve() + + results_files = sorted(RESULTS_DIR.glob("spider2_benchmark_*.json"), reverse=True) + if not results_files: + print("ERROR: No benchmark results found. Run `python run_benchmark.py` first.") + sys.exit(1) + return results_files[0] + + +def find_workspace_duckdb(instance_id: str) -> str | None: + """Find the DuckDB file in the workspace for a given task.""" + workspace = WORKSPACE_DIR / instance_id + + if not workspace.exists(): + return None + + # Search for .duckdb files (exclude target/ build artifacts) + db_files = list(workspace.glob("*.duckdb")) + if db_files: + return str(db_files[0]) + + # Check subdirectories (some projects have db in subdirs) + db_files = list(workspace.rglob("*.duckdb")) + # Prefer non-target files + non_target = [f for f in db_files if "target" not in str(f)] + if non_target: + return str(non_target[0]) + if db_files: + return str(db_files[0]) + + return None + + +def find_gold_duckdb(instance_id: str, gold_filename: str) -> str | None: + """Find the gold DuckDB file for a given task.""" + gold_dir = SPIDER2_DBT_DIR / "evaluation_suite" / "gold" / instance_id + if not gold_dir.exists(): + return None + + # Try exact filename first + gold_path = gold_dir / gold_filename + if gold_path.exists(): + return str(gold_path) + + # Fallback: use any .duckdb file in the gold directory + db_files = list(gold_dir.glob("*.duckdb")) + if db_files: + return str(db_files[0]) + + return None + + +def evaluate_task( + instance_id: str, + gold_entry: dict[str, Any], +) -> dict[str, Any]: + """Evaluate a single task using Spider2's official duckdb_match. + + The gold_entry has format: + { + "instance_id": "...", + "evaluation": { + "func": "duckdb_match", + "parameters": { + "gold": "filename.duckdb", + "condition_tabs": ["table1", "table2"], + "condition_cols": [[col_indices], [col_indices]], + "ignore_orders": [true, true] + } + } + } + """ + result = { + "instance_id": instance_id, + "passed": False, + "error": None, + "method": "unknown", + } + + eval_spec = gold_entry.get("evaluation", {}) + eval_func = eval_spec.get("func", "") + params = eval_spec.get("parameters", {}) + + if eval_func != "duckdb_match": + result["error"] = f"Unsupported eval function: {eval_func}" + return result + + # Find workspace DuckDB (the result produced by the agent) + workspace_db = find_workspace_duckdb(instance_id) + if not workspace_db: + result["error"] = "No DuckDB file found in workspace" + return result + + # Find gold DuckDB + gold_filename = params.get("gold", "") + gold_db = find_gold_duckdb(instance_id, gold_filename) + if not gold_db: + result["error"] = f"Gold DuckDB not found: {instance_id}/{gold_filename}" + return result + + # Call the official eval function + try: + from eval_utils import duckdb_match + + score = duckdb_match( + result=workspace_db, + gold=gold_db, + condition_tabs=params.get("condition_tabs"), + condition_cols=params.get("condition_cols"), + ignore_orders=params.get("ignore_orders"), + ) + result["passed"] = score == 1 + result["method"] = "spider2_duckdb_match" + except ImportError: + result["error"] = "Could not import eval_utils.duckdb_match" + except Exception as e: + result["error"] = f"Evaluation error: {str(e)[:300]}" + + return result + + +def main() -> None: + parser = argparse.ArgumentParser(description="Evaluate Spider 2.0-DBT benchmark results") + parser.add_argument("--results", type=str, default=None, help="Path to benchmark results JSON") + args = parser.parse_args() + + print("=" * 60) + print("Spider 2.0-DBT Benchmark Evaluation") + print("=" * 60) + + # Add eval_utils to path + add_eval_utils_to_path() + + # Load results + results_path = Path(args.results) if args.results else find_latest_results() + print(f" Results file: {results_path}") + benchmark = json.loads(results_path.read_text()) + + # Load gold standard + gold = load_gold_standard() + print(f" Gold entries: {len(gold)}") + + task_results = benchmark.get("task_results", []) + print(f" Tasks to evaluate: {len(task_results)}") + print() + + # Evaluate each task + evaluations = [] + passed = 0 + failed = 0 + errors = 0 + + for i, task_result in enumerate(task_results, 1): + instance_id = task_result["instance_id"] + gold_entry = gold.get(instance_id) + + if gold_entry is None: + print(f" [{i}/{len(task_results)}] {instance_id} — NO GOLD (skipped)") + evaluations.append({ + "instance_id": instance_id, + "passed": False, + "error": "No gold standard entry", + "method": "skipped", + }) + errors += 1 + continue + + eval_result = evaluate_task(instance_id, gold_entry) + evaluations.append(eval_result) + + if eval_result["passed"]: + status = "PASS" + passed += 1 + elif eval_result["error"]: + status = f"ERROR: {eval_result['error'][:50]}" + errors += 1 + else: + status = "FAIL" + failed += 1 + + print(f" [{i}/{len(task_results)}] {instance_id} — {status}") + + total = len(task_results) + pass_rate = (passed / total * 100) if total > 0 else 0.0 + + # Domain breakdown + domain_stats: dict[str, dict[str, int]] = {} + for eval_r, task_r in zip(evaluations, task_results): + domain = get_task_domain(task_r["instance_id"]) + if domain not in domain_stats: + domain_stats[domain] = {"total": 0, "passed": 0, "failed": 0, "errors": 0} + domain_stats[domain]["total"] += 1 + if eval_r["passed"]: + domain_stats[domain]["passed"] += 1 + elif eval_r.get("error"): + domain_stats[domain]["errors"] += 1 + else: + domain_stats[domain]["failed"] += 1 + + # Save evaluation results + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + evaluation = { + "timestamp": timestamp, + "source_results": str(results_path), + "model": benchmark.get("model", "unknown"), + "total": total, + "passed": passed, + "failed": failed, + "errors": errors, + "pass_rate": round(pass_rate, 2), + "domain_stats": domain_stats, + "evaluations": evaluations, + } + + eval_path = RESULTS_DIR / f"evaluation_{timestamp}.json" + eval_path.write_text(json.dumps(evaluation, indent=2)) + + # Latest symlink + latest = RESULTS_DIR / "evaluation_latest.json" + if latest.is_symlink() or latest.exists(): + latest.unlink() + latest.symlink_to(eval_path.name) + + # Print summary + print() + print("=" * 60) + print("Evaluation Summary") + print("=" * 60) + print(f" Total: {total}") + print(f" Passed: {passed}") + print(f" Failed: {failed}") + print(f" Errors: {errors}") + print(f" Pass Rate: {pass_rate:.2f}%") + print() + + print("Domain Breakdown:") + for domain, stats in sorted(domain_stats.items()): + dr = (stats["passed"] / stats["total"] * 100) if stats["total"] > 0 else 0 + print(f" {domain:20s} {stats['passed']}/{stats['total']} ({dr:.1f}%)") + + print() + print(f" Evaluation saved: {eval_path}") + print() + print("Next: python report.py") + + +if __name__ == "__main__": + main() diff --git a/experiments/spider2_dbt/prompt_template.py b/experiments/spider2_dbt/prompt_template.py new file mode 100644 index 0000000000..408c8bdfda --- /dev/null +++ b/experiments/spider2_dbt/prompt_template.py @@ -0,0 +1,140 @@ +"""Prompt engineering for Spider 2.0-DBT benchmark tasks. + +Builds a self-contained prompt per task that instructs the agent to: +1. Absorb project context (schema.yml, existing models) +2. Explore actual data values +3. Plan before coding +4. Write/fix SQL models +5. Run `dbt run` to validate +6. Verify output data against source +""" + +from __future__ import annotations + +import os +from pathlib import Path + + +def _collect_project_context(project_dir: str) -> str: + """Read schema.yml files and list existing SQL models to pre-load into prompt.""" + project = Path(project_dir) + sections = [] + + # Collect all YAML files (schema, sources, configs) + yaml_files = sorted( + list(project.glob("models/**/*.yml")) + + list(project.glob("models/**/*.yaml")) + + list(project.glob("*.yml")) + ) + for yf in yaml_files: + rel = yf.relative_to(project) + # Skip dbt_project.yml and profiles.yml (agent reads these anyway) + if rel.name in ("dbt_project.yml", "profiles.yml", "packages.yml"): + continue + content = yf.read_text().strip() + if content: + sections.append(f"### {rel}\n```yaml\n{content}\n```") + + # Collect all existing SQL model files with their content + sql_files = sorted(project.glob("models/**/*.sql")) + for sf in sql_files: + rel = sf.relative_to(project) + content = sf.read_text().strip() + if content: + sections.append(f"### {rel}\n```sql\n{content}\n```") + + if not sections: + return "No schema or model files found." + + return "\n\n".join(sections) + + +def build_task_prompt( + instance_id: str, + instruction: str, + project_dir: str, +) -> str: + """Build the full prompt for a Spider2-DBT task. + + Args: + instance_id: Unique task identifier (e.g., "ga4_001"). + instruction: The natural language task instruction from the benchmark. + project_dir: Absolute path to the dbt project working directory. + + Returns: + A complete prompt string for the agent. + """ + project_context = _collect_project_context(project_dir) + + return f"""You are working on a dbt + DuckDB data engineering task. + +## Task ID: {instance_id} + +## Instruction +{instruction} + +## Working Directory +Your dbt project is at: {project_dir} + +## Pre-Loaded Project Context + +The following files already exist in the project. Study them carefully — they define the expected models, columns, business logic patterns, and conventions you MUST follow. + +{project_context} + +## Steps + +1. **Absorb the context above BEFORE doing anything else:** + - The schema.yml column descriptions ARE your requirements. If it says "revenue lost due to returned items", that means filter for returned items only. + - Look at WHERE clauses in existing SQL models — they define the project's business logic vocabulary. If existing models filter `WHERE item_status = 'R'` for returns, your new models MUST use the same pattern. + - Note exact column names and counts from schema.yml — your output must match exactly (no extra columns). + - Check `refs` in schema.yml to understand which models your target depends on. + +2. **Explore the actual data:** + - Query the database to understand table schemas and actual values: + ```bash + cd {project_dir} && duckdb *.duckdb -c ".tables" + ``` + - For key columns (flags, status fields, categories), query distinct values and distributions: + ```bash + cd {project_dir} && duckdb *.duckdb -c "SELECT DISTINCT , COUNT(*) FROM
GROUP BY 1" + ``` + +3. **State your plan before coding:** + - List the exact columns your output should have (from schema.yml). + - State the business logic you inferred from existing models — specifically what filters and aggregation patterns you will follow. + - If you need to define threshold values (e.g., for status categories) that aren't specified, examine the data distribution to pick reasonable breakpoints. + +4. **Implement the solution:** + - Create or modify SQL model files in the `models/` directory. + - Follow the same patterns as existing models (column names, filter logic, precision, rounding). + - Use `ref()` for model references, `source()` for sources. + - Match the column list from schema.yml EXACTLY — do not add extra columns. + - Ensure valid DuckDB SQL syntax. + +5. **Validate by running dbt:** + ```bash + cd {project_dir} && dbt run --profiles-dir . --project-dir . + ``` + If dbt run fails, read the error, fix the SQL, and retry (up to 3 times). + +6. **Verify the output data (MANDATORY):** + After dbt run succeeds, query the output tables: + ```bash + cd {project_dir} && duckdb *.duckdb -c "SELECT COUNT(*) FROM ; SELECT * FROM LIMIT 10;" + ``` + - Spot-check 2-3 rows by tracing values back to source tables. + - Check edge cases: percentages > 100%, negative values, unexpected NULLs. + - Verify the column count and names match schema.yml exactly. + - Check status/category distributions with `GROUP BY`. + - If anything looks wrong, fix and re-run. + +## Important Rules +- Stay within the project directory: {project_dir} +- Do NOT install new packages or modify system configuration +- Do NOT modify `profiles.yml` unless the task specifically requires it +- Use `dbt run --profiles-dir . --project-dir .` (not just `dbt run`) +- If a model already exists and the task asks to modify it, edit in place +- Write clean, readable SQL with appropriate comments +- Output ONLY the columns listed in schema.yml — no extra columns +""" diff --git a/experiments/spider2_dbt/report.py b/experiments/spider2_dbt/report.py new file mode 100644 index 0000000000..22acd2e241 --- /dev/null +++ b/experiments/spider2_dbt/report.py @@ -0,0 +1,523 @@ +"""Generate an interactive single-file HTML report for Spider 2.0-DBT benchmark. + +Usage: + python report.py # Use latest evaluation + python report.py --evaluation results/evaluation_*.json + python report.py --output reports/custom_report.html +""" + +from __future__ import annotations + +import argparse +import html +import json +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from config import LEADERBOARD, REPORTS_DIR, RESULTS_DIR + + +def find_latest_evaluation() -> Path: + """Find the latest evaluation results file.""" + latest = RESULTS_DIR / "evaluation_latest.json" + if latest.exists() or latest.is_symlink(): + return latest.resolve() + + files = sorted(RESULTS_DIR.glob("evaluation_*.json"), reverse=True) + if not files: + print("ERROR: No evaluation results found. Run `python evaluate_results.py` first.") + sys.exit(1) + return files[0] + + +def load_benchmark_results(evaluation: dict[str, Any]) -> dict[str, Any] | None: + """Load the source benchmark results referenced by the evaluation.""" + src = evaluation.get("source_results", "") + if src: + p = Path(src) + if p.exists(): + return json.loads(p.read_text()) + return None + + +def esc(text: str) -> str: + """HTML-escape a string.""" + return html.escape(str(text)) + + +def build_leaderboard_svg(pass_rate: float, model: str) -> str: + """Build a horizontal bar chart SVG comparing against the leaderboard.""" + entries = list(LEADERBOARD) + [(f"Altimate Code ({model})", pass_rate)] + entries.sort(key=lambda x: x[1], reverse=True) + + bar_height = 28 + gap = 4 + label_width = 220 + chart_width = 400 + total_width = label_width + chart_width + 80 + total_height = len(entries) * (bar_height + gap) + 20 + + max_val = max(e[1] for e in entries) + scale = chart_width / max(max_val, 1) + + bars = [] + for i, (name, rate) in enumerate(entries): + y = i * (bar_height + gap) + 10 + w = rate * scale + is_ours = "Altimate Code" in name + + fill = "#6366f1" if is_ours else "#e2e8f0" + text_fill = "#1e1b4b" if is_ours else "#475569" + font_weight = "bold" if is_ours else "normal" + border = ' stroke="#4f46e5" stroke-width="2"' if is_ours else "" + + bars.append(f""" + {esc(name)} + + {rate:.2f}%""") + + return f""" + {"".join(bars)} +""" + + +def build_timing_svg(task_results: list[dict[str, Any]]) -> str: + """Build a histogram SVG of task execution times.""" + times = [t.get("elapsed_s", 0) for t in task_results if t.get("elapsed_s", 0) > 0] + if not times: + return "

No timing data available.

" + + # Bucket into bins + max_time = max(times) + num_bins = min(20, len(times)) + bin_width = max_time / num_bins if num_bins > 0 else 1 + bins = [0] * num_bins + + for t in times: + idx = min(int(t / bin_width), num_bins - 1) + bins[idx] += 1 + + max_count = max(bins) if bins else 1 + chart_w = 600 + chart_h = 200 + bar_w = chart_w / num_bins + scale = (chart_h - 30) / max(max_count, 1) + + bars = [] + for i, count in enumerate(bins): + x = i * bar_w + h = count * scale + y = chart_h - 30 - h + label = f"{bin_width * i:.0f}-{bin_width * (i + 1):.0f}s" + bars.append( + f'' + f"{label}: {count} tasks" + ) + + # X-axis labels (every 4th bin) + labels = [] + for i in range(0, num_bins, max(1, num_bins // 5)): + x = i * bar_w + bar_w / 2 + labels.append( + f'{bin_width * i:.0f}s' + ) + + return f""" + {"".join(bars)} + {"".join(labels)} +""" + + +def build_html(evaluation: dict[str, Any], benchmark: dict[str, Any] | None) -> str: + """Build the complete HTML report.""" + model = evaluation.get("model", "unknown") + total = evaluation.get("total", 0) + passed = evaluation.get("passed", 0) + failed = evaluation.get("failed", 0) + errors = evaluation.get("errors", 0) + pass_rate = evaluation.get("pass_rate", 0.0) + timestamp = evaluation.get("timestamp", "") + domain_stats = evaluation.get("domain_stats", {}) + evaluations = evaluation.get("evaluations", []) + + task_results = benchmark.get("task_results", []) if benchmark else [] + + # Map instance_id -> task result for merging + task_map = {t["instance_id"]: t for t in task_results} + + # Compute projected rank + all_entries = list(LEADERBOARD) + [("Altimate Code", pass_rate)] + all_entries.sort(key=lambda x: x[1], reverse=True) + rank = next(i + 1 for i, (n, _) in enumerate(all_entries) if n == "Altimate Code") + + total_time = benchmark.get("total_elapsed_s", 0) if benchmark else 0 + avg_time = benchmark.get("avg_elapsed_s", 0) if benchmark else 0 + + # Leaderboard chart + leaderboard_svg = build_leaderboard_svg(pass_rate, model) + + # Timing histogram + timing_svg = build_timing_svg(task_results) if task_results else "

No timing data.

" + + # Domain breakdown rows + domain_rows = "" + for domain, stats in sorted(domain_stats.items()): + dr = (stats["passed"] / stats["total"] * 100) if stats["total"] > 0 else 0 + bar_w = dr * 2 # max 200px at 100% + domain_rows += f""" +
+ + + + + + + """ + + # Per-task rows + task_rows = "" + for ev in evaluations: + iid = ev["instance_id"] + task_data = task_map.get(iid, {}) + status_class = "pass" if ev["passed"] else "fail" + status_text = "PASS" if ev["passed"] else "FAIL" + if ev.get("error"): + status_class = "error" + status_text = "ERROR" + elapsed = task_data.get("elapsed_s", "—") + domain = task_data.get("domain", "—") + instruction = task_data.get("instruction", "")[:120] + agent_output = task_data.get("agent_output", "") + error_detail = ev.get("error", "") + stderr = task_data.get("stderr_tail", "") + + details_content = "" + if agent_output: + details_content += f"

Agent Output

{esc(agent_output[:3000])}
" + if error_detail: + details_content += f"

Evaluation Error

{esc(error_detail)}
" + if stderr: + details_content += f"

Stderr

{esc(stderr[:1000])}
" + + task_rows += f""" + + + + + + + """ + if details_content: + task_rows += f""" + + + """ + + return f""" + + + + +Spider 2.0-DBT Benchmark — Altimate Code + + + + +

Spider 2.0-DBT Benchmark Results

+

+ Model: {esc(model)} · + Generated: {esc(timestamp)} UTC · + Projected Rank: #{rank} of {len(all_entries)} +

+ + +
+
+
{pass_rate:.1f}%
+
Pass Rate
+
+
+
{passed}/{total}
+
Tasks Passed
+
+
+
{failed}
+
Failed
+
+
+
{errors}
+
Errors
+
+
+
{total_time:.0f}s
+
Total Time
+
+
+
{avg_time:.0f}s
+
Avg per Task
+
+
+ + +
+

Leaderboard Comparison

+ {leaderboard_svg} +
+ + +
+

Category Breakdown

+
{esc(domain)}{stats['total']}{stats['passed']}{stats['failed']}{stats.get('errors', 0)} +
+
+ {dr:.1f}% +
+
{esc(iid)}{esc(domain)}{status_text}{elapsed}{esc(instruction)}
+ + + + + + {domain_rows} +
DomainTotalPassedFailedErrorsPass Rate
+ + + +
+

Execution Time Distribution

+ {timing_svg} +
+ + +
+

Per-Task Results

+ +
+ + + + +
+ + + + + + + + + + + + {task_rows} +
Task IDDomainStatusTime (s)Instruction
+
+ +
+ Generated by Altimate Code · Spider 2.0-DBT Benchmark Evaluation Pipeline +
+ + + + +""" + + +def main() -> None: + parser = argparse.ArgumentParser(description="Generate Spider 2.0-DBT benchmark report") + parser.add_argument("--evaluation", type=str, default=None, help="Path to evaluation JSON") + parser.add_argument("--output", type=str, default=None, help="Output HTML file path") + args = parser.parse_args() + + # Load evaluation + eval_path = Path(args.evaluation) if args.evaluation else find_latest_evaluation() + print(f"Loading evaluation: {eval_path}") + evaluation = json.loads(eval_path.read_text()) + + # Load benchmark results for timing/output data + benchmark = load_benchmark_results(evaluation) + if benchmark: + print(f"Loaded benchmark results: {evaluation.get('source_results', '')}") + else: + print("Warning: Source benchmark results not found; report will lack timing/output data.") + + # Generate HTML + html_content = build_html(evaluation, benchmark) + + # Write output + REPORTS_DIR.mkdir(parents=True, exist_ok=True) + timestamp = evaluation.get("timestamp", datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")) + output_path = Path(args.output) if args.output else REPORTS_DIR / f"spider2_report_{timestamp}.html" + output_path.write_text(html_content) + + print(f"Report generated: {output_path}") + print(f"Open in browser: file://{output_path.resolve()}") + + +if __name__ == "__main__": + main() diff --git a/experiments/spider2_dbt/requirements.txt b/experiments/spider2_dbt/requirements.txt new file mode 100644 index 0000000000..16416c5023 --- /dev/null +++ b/experiments/spider2_dbt/requirements.txt @@ -0,0 +1,5 @@ +duckdb>=0.10.0 +dbt-core>=1.7.0 +dbt-duckdb>=1.7.0 +gdown>=5.0.0 +pandas>=2.0.0 diff --git a/experiments/spider2_dbt/run_benchmark.py b/experiments/spider2_dbt/run_benchmark.py new file mode 100644 index 0000000000..18036d3c02 --- /dev/null +++ b/experiments/spider2_dbt/run_benchmark.py @@ -0,0 +1,462 @@ +"""Run Spider 2.0-DBT benchmark: invoke altimate-code per task. + +Usage: + python run_benchmark.py # All tasks + python run_benchmark.py --tasks 5 # First N tasks + python run_benchmark.py --tasks ga4_001 sf_002 # Specific tasks + python run_benchmark.py --no-resume # Force re-run all + python run_benchmark.py --timeout 300 # Custom timeout + python run_benchmark.py --parallel 4 # Run 4 tasks concurrently +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import subprocess +import sys +import time +from concurrent.futures import ProcessPoolExecutor, as_completed +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from config import ( + ALTIMATE_CODE_BIN, + DEFAULT_MODEL, + DEFAULT_PARALLEL, + DEFAULT_TIMEOUT, + EXAMPLES_DIR, + FAST_EXIT_THRESHOLD_S, + INCREMENTAL_DIR, + MAX_RETRIES, + RESULTS_DIR, + TASK_JSONL, + WORKSPACE_DIR, + get_task_domain, +) +from prompt_template import build_task_prompt + + +def load_tasks(task_jsonl: Path) -> list[dict[str, Any]]: + """Load tasks from the Spider2-DBT JSONL file.""" + tasks = [] + for line in task_jsonl.read_text().strip().splitlines(): + line = line.strip() + if line: + tasks.append(json.loads(line)) + return tasks + + +def filter_tasks( + tasks: list[dict[str, Any]], + task_filter: list[str] | None, +) -> list[dict[str, Any]]: + """Filter tasks by name or limit count. + + Args: + tasks: Full task list. + task_filter: Either a list of instance_ids, or a single-element list + with a number (e.g., ["5"]) to take first N tasks. + """ + if not task_filter: + return tasks + + # If single numeric argument, take first N + if len(task_filter) == 1 and task_filter[0].isdigit(): + n = int(task_filter[0]) + return tasks[:n] + + # Otherwise filter by instance_id + filter_set = set(task_filter) + return [t for t in tasks if t["instance_id"] in filter_set] + + +def prepare_workspace(instance_id: str) -> Path: + """Copy dbt project from examples to workspace.""" + src = EXAMPLES_DIR / instance_id + dst = WORKSPACE_DIR / instance_id + + if dst.exists(): + shutil.rmtree(dst) + shutil.copytree(src, dst) + + return dst + + +def _should_retry(result: dict[str, Any]) -> bool: + """Determine if a task result indicates a transient failure worth retrying. + + Only retry fast exits ( dict[str, Any]: + """Execute one attempt of altimate-code on a task.""" + # Prepare workspace + workspace = prepare_workspace(instance_id) + + # Build prompt + prompt = build_task_prompt( + instance_id=instance_id, + instruction=instruction, + project_dir=str(workspace), + ) + + # Output file for agent's text response + output_file = workspace / "agent_output.md" + + # Build command + cmd = [ + ALTIMATE_CODE_BIN, + "run", + prompt, + "--format", "json", + "--dir", str(workspace), + "--output", str(output_file), + "--model", model, + ] + if agent: + cmd.extend(["--agent", agent]) + + # Execute + start = time.perf_counter() + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + cwd=str(workspace), + ) + exit_code = proc.returncode + stdout = proc.stdout + stderr = proc.stderr + timed_out = False + except subprocess.TimeoutExpired: + exit_code = -1 + stdout = "" + stderr = f"Task timed out after {timeout}s" + timed_out = True + + elapsed_s = time.perf_counter() - start + + # Read agent output if available + agent_output = "" + if output_file.exists(): + agent_output = output_file.read_text() + + # Parse JSON events from stdout + events = [] + for line in stdout.splitlines(): + line = line.strip() + if line.startswith("{"): + try: + events.append(json.loads(line)) + except json.JSONDecodeError: + pass + + # Check if dbt run succeeded (search events for successful bash output) + dbt_success = False + for event in events: + if event.get("type") == "tool_use": + output = json.dumps(event.get("output", "")) + if "Completed successfully" in output or "Done." in output: + dbt_success = True + + return { + "instance_id": instance_id, + "domain": get_task_domain(instance_id), + "instruction": instruction, + "exit_code": exit_code, + "timed_out": timed_out, + "dbt_success": dbt_success, + "elapsed_s": round(elapsed_s, 2), + "agent_output": agent_output[:5000], # Truncate for storage + "event_count": len(events), + "stderr_tail": stderr[-2000:] if stderr else "", + } + + +def run_single_task( + task: dict[str, Any], + model: str, + agent: str | None, + timeout: int, +) -> dict[str, Any]: + """Run altimate-code on a single Spider2-DBT task with auto-retry. + + Automatically retries on transient failures (fast exits, startup hangs). + Returns: + Result dict with task metadata, exit_code, elapsed_s, agent_output. + """ + instance_id = task["instance_id"] + instruction = task.get("instruction", task.get("question", "")) + + for attempt in range(1, MAX_RETRIES + 1): + result = _run_single_attempt(instance_id, instruction, model, agent, timeout) + result["attempt"] = attempt + + if attempt < MAX_RETRIES and _should_retry(result): + retry_reason = "fast exit" if result["elapsed_s"] < FAST_EXIT_THRESHOLD_S else "timeout with 0 events" + print(f" ↳ Retry {attempt}/{MAX_RETRIES} for {instance_id} ({retry_reason})") + continue + + break + + return result + + +def _run_task_wrapper(args: tuple) -> dict[str, Any]: + """Wrapper for ProcessPoolExecutor — unpacks args tuple.""" + task, model, agent, timeout = args + return run_single_task(task, model, agent, timeout) + + +def save_incremental(instance_id: str, result: dict[str, Any]) -> None: + """Save per-task result for resumability.""" + path = INCREMENTAL_DIR / f"{instance_id}.json" + path.write_text(json.dumps(result, indent=2)) + + +def load_incremental(instance_id: str) -> dict[str, Any] | None: + """Load a previously saved incremental result.""" + path = INCREMENTAL_DIR / f"{instance_id}.json" + if path.exists(): + return json.loads(path.read_text()) + return None + + +def run_sequential( + tasks_to_run: list[dict[str, Any]], + all_tasks_count: int, + model: str, + agent: str | None, + timeout: int, + resume: bool, +) -> list[dict[str, Any]]: + """Run tasks one at a time (original behavior).""" + results = [] + skipped = 0 + + for i, task in enumerate(tasks_to_run, 1): + instance_id = task["instance_id"] + + if resume: + existing = load_incremental(instance_id) + if existing is not None: + print(f" [{i}/{len(tasks_to_run)}] {instance_id} — SKIPPED (cached)") + results.append(existing) + skipped += 1 + continue + + print(f" [{i}/{len(tasks_to_run)}] {instance_id} — running...", end="", flush=True) + + result = run_single_task(task, model, agent, timeout) + save_incremental(instance_id, result) + results.append(result) + + status = "OK" if result["exit_code"] == 0 else "FAIL" + if result["timed_out"]: + status = "TIMEOUT" + print(f" {status} ({result['elapsed_s']}s)") + + return results + + +def run_parallel( + tasks_to_run: list[dict[str, Any]], + all_tasks_count: int, + model: str, + agent: str | None, + timeout: int, + resume: bool, + workers: int, +) -> list[dict[str, Any]]: + """Run tasks concurrently using a process pool.""" + results_map: dict[str, dict[str, Any]] = {} + to_submit: list[dict[str, Any]] = [] + + # Separate cached vs need-to-run + for task in tasks_to_run: + instance_id = task["instance_id"] + if resume: + existing = load_incremental(instance_id) + if existing is not None: + print(f" {instance_id} — SKIPPED (cached)") + results_map[instance_id] = existing + continue + to_submit.append(task) + + if not to_submit: + return [results_map[t["instance_id"]] for t in tasks_to_run] + + print(f"\n Running {len(to_submit)} tasks with {workers} workers...\n") + + with ProcessPoolExecutor(max_workers=workers) as pool: + future_to_id = {} + for task in to_submit: + future = pool.submit(_run_task_wrapper, (task, model, agent, timeout)) + future_to_id[future] = task["instance_id"] + + completed = 0 + for future in as_completed(future_to_id): + instance_id = future_to_id[future] + completed += 1 + try: + result = future.result() + save_incremental(instance_id, result) + results_map[instance_id] = result + + status = "OK" if result["exit_code"] == 0 else "FAIL" + if result["timed_out"]: + status = "TIMEOUT" + print(f" [{completed}/{len(to_submit)}] {instance_id} — {status} ({result['elapsed_s']}s)") + except Exception as e: + print(f" [{completed}/{len(to_submit)}] {instance_id} — ERROR: {e}") + error_result = { + "instance_id": instance_id, + "domain": get_task_domain(instance_id), + "instruction": "", + "exit_code": -1, + "timed_out": False, + "dbt_success": False, + "elapsed_s": 0, + "agent_output": "", + "event_count": 0, + "stderr_tail": str(e)[:2000], + } + save_incremental(instance_id, error_result) + results_map[instance_id] = error_result + + # Return in original task order + return [results_map[t["instance_id"]] for t in tasks_to_run] + + +def main() -> None: + parser = argparse.ArgumentParser(description="Run Spider 2.0-DBT benchmark") + parser.add_argument( + "--tasks", nargs="*", default=None, + help="Task filter: number (first N) or space-separated instance_ids", + ) + parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="Timeout per task in seconds") + parser.add_argument("--model", type=str, default=DEFAULT_MODEL, help="Model to use") + parser.add_argument("--agent", type=str, default=None, help="Agent to use") + parser.add_argument("--no-resume", action="store_true", help="Force re-run all tasks") + parser.add_argument("--dry-run", action="store_true", help="Print tasks without running") + parser.add_argument("--parallel", type=int, default=DEFAULT_PARALLEL, help=f"Number of concurrent tasks (default: {DEFAULT_PARALLEL})") + args = parser.parse_args() + + # Auto-setup: download Spider2 files if not available + if not TASK_JSONL.exists(): + print("Spider2 files not found. Running automatic setup...") + from setup_spider2 import clone_spider2, download_databases, run_spider2_setup, create_directories + clone_spider2() + download_databases() + run_spider2_setup() + create_directories() + if not TASK_JSONL.exists(): + print(f"ERROR: Task file still not found after setup: {TASK_JSONL}") + sys.exit(1) + + all_tasks = load_tasks(TASK_JSONL) + tasks = filter_tasks(all_tasks, args.tasks) + + print("=" * 60) + print("Spider 2.0-DBT Benchmark Runner") + print("=" * 60) + print(f" Tasks: {len(tasks)} / {len(all_tasks)}") + print(f" Model: {args.model}") + print(f" Timeout: {args.timeout}s") + print(f" Resume: {'disabled' if args.no_resume else 'enabled'}") + print(f" Parallel: {args.parallel} worker{'s' if args.parallel > 1 else ''}") + print() + + if args.dry_run: + for t in tasks: + print(f" {t['instance_id']}: {t.get('instruction', t.get('question', ''))[:80]}...") + return + + # Ensure directories exist + WORKSPACE_DIR.mkdir(parents=True, exist_ok=True) + RESULTS_DIR.mkdir(parents=True, exist_ok=True) + INCREMENTAL_DIR.mkdir(parents=True, exist_ok=True) + + total_start = time.perf_counter() + resume = not args.no_resume + + if args.parallel > 1: + results = run_parallel(tasks, len(all_tasks), args.model, args.agent, args.timeout, resume, args.parallel) + else: + results = run_sequential(tasks, len(all_tasks), args.model, args.agent, args.timeout, resume) + + total_elapsed = time.perf_counter() - total_start + skipped = sum(1 for r in results if load_incremental(r["instance_id"]) is not None and r.get("_cached", False)) + + # Aggregate results + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + completed_count = sum(1 for r in results if r["exit_code"] == 0) + failed_count = sum(1 for r in results if r["exit_code"] != 0 and not r["timed_out"]) + timed_out_count = sum(1 for r in results if r["timed_out"]) + + aggregate = { + "timestamp": timestamp, + "model": args.model, + "agent": args.agent, + "timeout": args.timeout, + "parallel_workers": args.parallel, + "total_tasks": len(results), + "completed": completed_count, + "failed": failed_count, + "timed_out": timed_out_count, + "total_elapsed_s": round(total_elapsed, 2), + "avg_elapsed_s": round(sum(r["elapsed_s"] for r in results) / max(len(results), 1), 2), + "task_results": results, + } + + # Save aggregate + output_path = RESULTS_DIR / f"spider2_benchmark_{timestamp}.json" + output_path.write_text(json.dumps(aggregate, indent=2)) + + # Also save as "latest" symlink + latest_path = RESULTS_DIR / "latest.json" + if latest_path.is_symlink() or latest_path.exists(): + latest_path.unlink() + latest_path.symlink_to(output_path.name) + + # Print summary + print() + print("=" * 60) + print("Benchmark Complete") + print("=" * 60) + print(f" Total tasks: {aggregate['total_tasks']}") + print(f" Completed: {aggregate['completed']}") + print(f" Failed: {aggregate['failed']}") + print(f" Timed out: {aggregate['timed_out']}") + print(f" Wall time: {aggregate['total_elapsed_s']}s") + print(f" Avg per task: {aggregate['avg_elapsed_s']}s") + print(f" Results: {output_path}") + print() + print("Next: python evaluate_results.py") + + +if __name__ == "__main__": + main() diff --git a/experiments/spider2_dbt/schema_introspect.py b/experiments/spider2_dbt/schema_introspect.py new file mode 100644 index 0000000000..86daaaeba5 --- /dev/null +++ b/experiments/spider2_dbt/schema_introspect.py @@ -0,0 +1,94 @@ +"""Pre-compute DuckDB schema information for benchmark tasks. + +Queries the DuckDB database in a workspace to extract a compact +table listing (name + columns + row count). Kept concise to avoid +overwhelming the agent prompt. +""" + +from __future__ import annotations + +from pathlib import Path + + +def introspect_duckdb_schema(workspace: Path, max_tables: int = 30) -> str: + """Query DuckDB database files in workspace and return a compact schema summary. + + Produces a ~2-4KB summary listing tables with their columns (name + type) + and row counts. No sample data — keeps the prompt focused. + + Args: + workspace: Path to the dbt project workspace directory. + max_tables: Maximum number of tables to include. + + Returns: + A formatted string with schema information, or empty string if no DB found. + """ + try: + import duckdb + except ImportError: + return "" + + # Find DuckDB files + db_files = list(workspace.glob("*.duckdb")) + list(workspace.glob("*.db")) + if not db_files: + db_files = [ + f for f in workspace.rglob("*.duckdb") + if "target" not in str(f) and ".dbt" not in str(f) + ] + if not db_files: + return "" + + db_path = db_files[0] + + try: + conn = duckdb.connect(str(db_path), read_only=True) + except Exception: + return "" + + try: + tables = conn.execute( + "SELECT table_schema, table_name FROM information_schema.tables " + "WHERE table_schema NOT IN ('information_schema', 'pg_catalog') " + "ORDER BY table_schema, table_name" + ).fetchall() + + if not tables: + conn.close() + return "" + + lines = [f"## Source Database: `{db_path.name}` ({len(tables)} tables)\n"] + + for schema, table in tables[:max_tables]: + full_name = f"{schema}.{table}" if schema != "main" else table + + # Get columns (name + type only) + cols = conn.execute( + "SELECT column_name, data_type FROM information_schema.columns " + f"WHERE table_schema = '{schema}' AND table_name = '{table}' " + "ORDER BY ordinal_position" + ).fetchall() + + # Get row count + try: + row_count = conn.execute( + f'SELECT COUNT(*) FROM "{schema}"."{table}"' + ).fetchone()[0] + except Exception: + row_count = "?" + + col_summary = ", ".join(f"{c[0]} ({c[1]})" for c in cols) + lines.append(f"- **{full_name}** ({row_count} rows): {col_summary}") + + result = "\n".join(lines) + + # Hard cap at 5000 chars to avoid overwhelming the prompt + if len(result) > 5000: + # Truncate and add note + result = result[:4900] + "\n\n... (truncated — query the database for full schema)" + + return result + + except Exception: + return "" + finally: + conn.close() diff --git a/experiments/spider2_dbt/setup_spider2.py b/experiments/spider2_dbt/setup_spider2.py new file mode 100644 index 0000000000..f74744d526 --- /dev/null +++ b/experiments/spider2_dbt/setup_spider2.py @@ -0,0 +1,242 @@ +"""One-time setup: clone Spider2 repo, download DuckDB databases, verify deps. + +Usage: + python setup_spider2.py [--force] +""" + +from __future__ import annotations + +import argparse +import os +import shutil +import subprocess +import sys +from pathlib import Path + +from config import ( + ALTIMATE_CODE_BIN, + BASE_DIR, + DUCKDB_ZIP_DOWNLOADS, + EXAMPLES_DIR, + INCREMENTAL_DIR, + REPORTS_DIR, + RESULTS_DIR, + SPIDER2_COMMIT, + SPIDER2_DBT_DIR, + SPIDER2_REPO_DIR, + SPIDER2_REPO_URL, + TASK_JSONL, + WORKSPACE_DIR, +) + + +def run_cmd(cmd: list[str], cwd: str | None = None, check: bool = True) -> subprocess.CompletedProcess: + """Run a shell command with logging.""" + print(f" $ {' '.join(cmd)}") + return subprocess.run(cmd, cwd=cwd, check=check, capture_output=False) + + +def clone_spider2(force: bool = False) -> None: + """Sparse-clone Spider2 repo (only spider2-dbt/ directory).""" + if SPIDER2_REPO_DIR.exists(): + if force: + print(f"Removing existing repo at {SPIDER2_REPO_DIR}...") + shutil.rmtree(SPIDER2_REPO_DIR) + else: + print(f"Spider2 repo already exists at {SPIDER2_REPO_DIR}. Use --force to re-clone.") + return + + print("Cloning Spider2 repository (sparse, spider2-dbt/ only)...") + SPIDER2_REPO_DIR.mkdir(parents=True, exist_ok=True) + + run_cmd(["git", "init"], cwd=str(SPIDER2_REPO_DIR)) + run_cmd(["git", "remote", "add", "origin", SPIDER2_REPO_URL], cwd=str(SPIDER2_REPO_DIR)) + run_cmd(["git", "config", "core.sparseCheckout", "true"], cwd=str(SPIDER2_REPO_DIR)) + + sparse_file = SPIDER2_REPO_DIR / ".git" / "info" / "sparse-checkout" + sparse_file.parent.mkdir(parents=True, exist_ok=True) + sparse_file.write_text("spider2-dbt/\n") + + run_cmd(["git", "fetch", "--depth", "1", "origin", SPIDER2_COMMIT], cwd=str(SPIDER2_REPO_DIR)) + run_cmd(["git", "checkout", "FETCH_HEAD"], cwd=str(SPIDER2_REPO_DIR)) + + if not SPIDER2_DBT_DIR.exists(): + print("ERROR: spider2-dbt/ directory not found after clone.") + sys.exit(1) + + print(f"Spider2 repo cloned to {SPIDER2_REPO_DIR}") + + +def download_databases() -> None: + """Download DuckDB database zips from Google Drive using gdown. + + Spider2 expects two zips in the spider2-dbt/ directory: + - DBT_start_db.zip (example project databases) + - dbt_gold.zip (gold standard evaluation databases) + """ + # Check if zips already exist + all_present = all( + (SPIDER2_DBT_DIR / filename).exists() + for _, filename in DUCKDB_ZIP_DOWNLOADS + ) + if all_present: + print("Database zips already present. Skipping download.") + return + + print("Downloading DuckDB databases from Google Drive...") + failed = [] + + for gdrive_id, filename in DUCKDB_ZIP_DOWNLOADS: + output = SPIDER2_DBT_DIR / filename + if output.exists(): + print(f" {filename} already exists, skipping.") + continue + + url = f"https://drive.google.com/uc?id={gdrive_id}" + result = run_cmd(["gdown", url, "-O", str(output)], check=False) + if result.returncode != 0 or not output.exists(): + failed.append(filename) + + if failed: + print("\nWARNING: Failed to download some files via gdown.") + print("This often happens due to Google Drive rate limits.") + print("Please download manually and place in:") + print(f" {SPIDER2_DBT_DIR}/") + print() + for _, filename in DUCKDB_ZIP_DOWNLOADS: + if filename in failed: + gdrive_id = next(gid for gid, fn in DUCKDB_ZIP_DOWNLOADS if fn == filename) + print(f" {filename}:") + print(f" https://drive.google.com/uc?id={gdrive_id}") + print() + print("Then re-run: python setup_spider2.py --skip-download") + sys.exit(1) + + +def run_spider2_setup() -> None: + """Run Spider2's own setup.py to extract databases into examples/ and gold/.""" + # Check if zips exist first + for _, filename in DUCKDB_ZIP_DOWNLOADS: + zip_path = SPIDER2_DBT_DIR / filename + if not zip_path.exists(): + print(f"WARNING: {filename} not found, skipping Spider2 setup.") + print("Run download step first or place files manually.") + return + + setup_script = SPIDER2_DBT_DIR / "setup.py" + if setup_script.exists(): + print("Running Spider2's setup.py to extract databases...") + run_cmd([sys.executable, str(setup_script)], cwd=str(SPIDER2_DBT_DIR)) + else: + print("No Spider2 setup.py found; skipping.") + + +def verify_dependencies() -> None: + """Verify all required tools are available.""" + print("\nVerifying dependencies...") + errors = [] + + # Python packages + for pkg_name, import_name in [("duckdb", "duckdb"), ("dbt-core", "dbt"), ("pandas", "pandas")]: + try: + __import__(import_name) + print(f" {pkg_name}: OK") + except ImportError: + errors.append(f" Missing Python package: {pkg_name} (pip install {pkg_name})") + + # dbt-duckdb adapter + try: + result = subprocess.run( + ["dbt", "--version"], capture_output=True, text=True, check=False + ) + if result.returncode == 0: + version_lines = result.stdout.strip().splitlines() + for line in version_lines: + if "duckdb" in line.lower(): + print(f" dbt-duckdb: {line.strip()}") + break + else: + print(" Warning: dbt-duckdb adapter may not be installed.") + else: + errors.append(" dbt CLI returned error") + except FileNotFoundError: + errors.append(" dbt CLI not found (pip install dbt-core dbt-duckdb)") + + # altimate-code + result = subprocess.run( + [ALTIMATE_CODE_BIN, "--version"], capture_output=True, text=True, check=False + ) + if result.returncode != 0: + errors.append(f" altimate-code CLI not found at: {ALTIMATE_CODE_BIN}") + else: + print(f" altimate-code: {result.stdout.strip()}") + + # Task file + task_jsonl = SPIDER2_DBT_DIR / "examples" / "spider2-dbt.jsonl" + if not task_jsonl.exists(): + # Try alternative name + task_jsonl = TASK_JSONL + if not task_jsonl.exists(): + errors.append(f" Task file not found: {task_jsonl}") + else: + import json + tasks = [json.loads(line) for line in task_jsonl.read_text().strip().splitlines()] + print(f" Tasks found: {len(tasks)}") + + # Examples directory + if not EXAMPLES_DIR.exists(): + errors.append(f" Examples directory not found: {EXAMPLES_DIR}") + else: + examples = [d for d in EXAMPLES_DIR.iterdir() if d.is_dir()] + print(f" Example projects: {len(examples)}") + + # Check for DuckDB files in examples (indicates setup.py ran) + duckdb_count = sum(1 for _ in EXAMPLES_DIR.rglob("*.duckdb")) if EXAMPLES_DIR.exists() else 0 + print(f" DuckDB files in examples: {duckdb_count}") + if duckdb_count == 0: + print(" Warning: No .duckdb files found — databases may not be extracted yet.") + + if errors: + print("\nERRORS:") + for err in errors: + print(err) + sys.exit(1) + + print("\nAll dependencies verified.") + + +def create_directories() -> None: + """Create workspace and results directories.""" + for d in [WORKSPACE_DIR, RESULTS_DIR, INCREMENTAL_DIR, REPORTS_DIR]: + d.mkdir(parents=True, exist_ok=True) + print("Directories created.") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Set up Spider 2.0-DBT benchmark environment") + parser.add_argument("--force", action="store_true", help="Force re-clone of Spider2 repo") + parser.add_argument("--skip-download", action="store_true", help="Skip database download") + args = parser.parse_args() + + print("=" * 60) + print("Spider 2.0-DBT Benchmark Setup") + print("=" * 60) + + clone_spider2(force=args.force) + + if not args.skip_download: + download_databases() + + run_spider2_setup() + create_directories() + verify_dependencies() + + print("\n" + "=" * 60) + print("Setup complete! Next steps:") + print(" python run_benchmark.py # Run benchmark") + print(" python run_benchmark.py --tasks 5 # Smoke test (first 5)") + print("=" * 60) + + +if __name__ == "__main__": + main()