From 8195577ecbcefd1a1371ebd6d8bcfce182a6c90a Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Sun, 31 May 2026 08:58:02 -0700 Subject: [PATCH 1/3] feat: add DuckDB as a supported query engine (closes #40) Wire DuckDB into the engine factory (SQLAlchemy dialect via duckdb-engine, already a declared dependency). Add the frontend connection plugin and register it in the plugin list. Add an end-to-end integration test covering list_tables, execute_sql, and the full agent pipeline against a temp DuckDB file with Olist-like sample tables. Co-Authored-By: Claude Sonnet 4.6 --- .../src/analytics_agent/engines/factory.py | 2 + .../components/Settings/connections/index.ts | 2 + .../Settings/connections/plugins/duckdb.tsx | 25 ++ tests/integration/test_duckdb_e2e.py | 372 ++++++++++++++++++ 4 files changed, 401 insertions(+) create mode 100644 frontend/src/components/Settings/connections/plugins/duckdb.tsx create mode 100644 tests/integration/test_duckdb_e2e.py diff --git a/backend/src/analytics_agent/engines/factory.py b/backend/src/analytics_agent/engines/factory.py index 3269fdd..97df4c7 100644 --- a/backend/src/analytics_agent/engines/factory.py +++ b/backend/src/analytics_agent/engines/factory.py @@ -187,6 +187,7 @@ def get_secret_env_vars(engine_type: str) -> dict[str, str]: "mysql": SQLAlchemyQueryEngine, "sqlite": SQLAlchemyQueryEngine, "postgresql": SQLAlchemyQueryEngine, + "duckdb": SQLAlchemyQueryEngine, "sqlalchemy": SQLAlchemyQueryEngine, }.get(engine_type) return getattr(cls, "secret_env_vars", {}) if cls else {} @@ -208,6 +209,7 @@ def _make_connector(connection_cfg: dict) -> MCPQueryEngine: "mysql": SQLAlchemyQueryEngine, "sqlite": SQLAlchemyQueryEngine, "postgresql": SQLAlchemyQueryEngine, + "duckdb": SQLAlchemyQueryEngine, "sqlalchemy": SQLAlchemyQueryEngine, "mcp": MCPQueryEngine, "mcp-stdio": MCPQueryEngine, diff --git a/frontend/src/components/Settings/connections/index.ts b/frontend/src/components/Settings/connections/index.ts index 9996ad5..5e20d5d 100644 --- a/frontend/src/components/Settings/connections/index.ts +++ b/frontend/src/components/Settings/connections/index.ts @@ -8,6 +8,7 @@ import { hivePlugin } from "./plugins/hive"; import { mysqlPlugin } from "./plugins/mysql"; import { postgresqlPlugin } from "./plugins/postgresql"; import { sqlitePlugin } from "./plugins/sqlite"; +import { duckdbPlugin } from "./plugins/duckdb"; import { datahubPlugin } from "./plugins/datahub"; import { datahubMcpPlugin } from "./plugins/datahub-mcp"; import { customMcpEnginePlugin, customMcpContextPlugin } from "./plugins/custom-mcp"; @@ -24,6 +25,7 @@ export const CONNECTION_PLUGINS: ConnectionPlugin[] = [ mysqlPlugin, postgresqlPlugin, sqlitePlugin, + duckdbPlugin, customMcpEnginePlugin, // Context platforms diff --git a/frontend/src/components/Settings/connections/plugins/duckdb.tsx b/frontend/src/components/Settings/connections/plugins/duckdb.tsx new file mode 100644 index 0000000..94042c4 --- /dev/null +++ b/frontend/src/components/Settings/connections/plugins/duckdb.tsx @@ -0,0 +1,25 @@ +import { SimpleFormShell } from "../SimpleFormShell"; +import type { ConnectionPlugin, NewConnectionPayload } from "../types"; + +const FIELDS = [ + { key: "database", label: "Database file path", type: "mono" as const, + placeholder: "/absolute/path/to/database.duckdb", required: true }, +]; + +export const duckdbPlugin: ConnectionPlugin = { + id: "duckdb", + serviceId: "duckdb", + label: "DuckDB", + category: "engine", + transport: "native", + description: "Connect to a local DuckDB database file", + Form: ({ onDone, onCancel }) => ( + + onDone({ ...payload, config: { dialect: "duckdb", ...payload.config } }) + } + /> + ), +}; diff --git a/tests/integration/test_duckdb_e2e.py b/tests/integration/test_duckdb_e2e.py new file mode 100644 index 0000000..74e101c --- /dev/null +++ b/tests/integration/test_duckdb_e2e.py @@ -0,0 +1,372 @@ +""" +Integration test: DuckDB query engine + DataHub metadata, end-to-end. + +Setup: + - Creates a temporary DuckDB file with three Olist-like tables + (olist_orders, olist_order_items, olist_products — ~50 rows total). + - Pushes table descriptions to the configured DataHub instance under + platform=duckdb, env=DEV so the agent can discover them via search. + +What it proves: + - SQLAlchemyQueryEngine with dialect=duckdb boots and can execute SQL. + - DataHub context tools find the freshly pushed metadata. + - The full agent pipeline (context lookup → SQL → text answer) works. + +Prerequisites: + DataHub credentials: ~/.datahubenv or DATAHUB_GMS_URL + DATAHUB_GMS_TOKEN + LLM key: ANTHROPIC_API_KEY or OPENAI_API_KEY + +Run: + uv run pytest tests/integration/test_duckdb_e2e.py -v -s +""" + +from __future__ import annotations + +import json +import os +import pathlib +import urllib.request +import uuid + +import pytest + +# ── Skip guards ────────────────────────────────────────────────────────────── + +_has_datahub = bool( + (os.environ.get("DATAHUB_GMS_URL") and os.environ.get("DATAHUB_GMS_TOKEN")) + or pathlib.Path("~/.datahubenv").expanduser().exists() +) +_has_llm = bool(os.environ.get("ANTHROPIC_API_KEY") or os.environ.get("OPENAI_API_KEY")) + +_requires_datahub_and_llm = pytest.mark.skipif( + not (_has_datahub and _has_llm), + reason=( + "Needs DataHub credentials (datahub init or DATAHUB_GMS_URL+TOKEN) " + "and an LLM key (ANTHROPIC_API_KEY or OPENAI_API_KEY)" + ), +) + + +# ── DataHub helpers ─────────────────────────────────────────────────────────── + + +def _datahub_creds() -> tuple[str, str]: + """Return (gms_url, token) from env vars or ~/.datahubenv.""" + gms_url = os.environ.get("DATAHUB_GMS_URL", "") + token = os.environ.get("DATAHUB_GMS_TOKEN", "") + if not gms_url: + import yaml + + env_file = pathlib.Path("~/.datahubenv").expanduser() + if env_file.exists(): + cfg = yaml.safe_load(env_file.read_text()) or {} + gms = cfg.get("gms") or {} + gms_url = gms.get("server", "") + token = gms.get("token", "") + return gms_url, token + + +def _emit_table_description( + gms_url: str, token: str, urn: str, table: str, description: str +) -> None: + """Push a minimal dataset description MCE to DataHub.""" + from datahub.emitter.rest_emitter import DatahubRestEmitter + from datahub.metadata.schema_classes import ( + DatasetPropertiesClass, + DatasetSnapshotClass, + MetadataChangeEventClass, + ) + + emitter = DatahubRestEmitter(gms_server=gms_url, token=token or None) + emitter.emit_mce( + MetadataChangeEventClass( + proposedSnapshot=DatasetSnapshotClass( + urn=urn, + aspects=[DatasetPropertiesClass(description=description, name=table)], + ) + ) + ) + emitter.flush() + + +def _delete_entity(gms_url: str, token: str, urn: str) -> None: + """Hard-delete a DataHub entity by URN (best-effort — non-fatal).""" + try: + req = urllib.request.Request( + f"{gms_url}/entities?action=delete", + data=json.dumps({"urn": urn}).encode(), + headers={ + "Content-Type": "application/json", + **({"Authorization": f"Bearer {token}"} if token else {}), + }, + method="POST", + ) + urllib.request.urlopen(req, timeout=10) + except Exception as e: + print(f"[!] DataHub cleanup failed for {urn}: {e}") + + +# ── DataHub table metadata ──────────────────────────────────────────────────── + +_PLATFORM = "duckdb" +_ENV = "DEV" + +# Tables we create + their descriptions for DataHub. +_TABLES: dict[str, str] = { + "olist_orders": ( + "Order lifecycle records. Columns: order_id (PK), customer_id, " + "order_status ('delivered' or 'canceled'), order_purchase_timestamp." + ), + "olist_order_items": ( + "Line items inside each order. Columns: order_id (FK), product_id (FK), " + "price (item price in BRL), freight_value (shipping cost in BRL). " + "Revenue = SUM(price + freight_value) for delivered orders." + ), + "olist_products": ( + "Product catalog. Columns: product_id (PK), product_category_name " + "(e.g. 'electronics', 'furniture', 'clothing', 'books', 'toys')." + ), +} + + +def _dataset_urn(table: str) -> str: + return f"urn:li:dataset:(urn:li:dataPlatform:{_PLATFORM},{table},{_ENV})" + + +# ── Fixtures ────────────────────────────────────────────────────────────────── + + +@pytest.fixture(scope="module") +def duckdb_path(tmp_path_factory): + """Build a temp DuckDB file with three Olist-like tables.""" + import duckdb + + db_file = tmp_path_factory.mktemp("duckdb") / "test.duckdb" + con = duckdb.connect(str(db_file)) + + # olist_orders — 50 rows, 5 canceled (i % 10 == 0) + con.execute(""" + CREATE TABLE olist_orders ( + order_id VARCHAR PRIMARY KEY, + customer_id VARCHAR, + order_status VARCHAR, + order_purchase_timestamp TIMESTAMP + ) + """) + con.execute(""" + INSERT INTO olist_orders + SELECT + 'order_' || i::VARCHAR, + 'customer_' || (i % 20)::VARCHAR, + CASE WHEN i % 10 = 0 THEN 'canceled' ELSE 'delivered' END, + TIMESTAMP '2017-01-01' + INTERVAL (i) DAY + FROM range(1, 51) t(i) + """) + + # olist_order_items — 2 items per order (100 rows) + # product_id cycles through 0-9 so each maps to a distinct category + con.execute(""" + CREATE TABLE olist_order_items ( + order_id VARCHAR, + product_id VARCHAR, + price DOUBLE, + freight_value DOUBLE + ) + """) + con.execute(""" + INSERT INTO olist_order_items + SELECT + 'order_' || (i % 50 + 1)::VARCHAR, + 'product_' || (i % 10)::VARCHAR, + (i % 5 + 1) * 10.0, + (i % 3 + 1) * 2.0 + FROM range(0, 100) t(i) + """) + + # olist_products — 10 products across 5 categories (2 products each) + con.execute(""" + CREATE TABLE olist_products ( + product_id VARCHAR PRIMARY KEY, + product_category_name VARCHAR + ) + """) + con.executemany( + "INSERT INTO olist_products VALUES (?, ?)", + [ + ("product_0", "electronics"), + ("product_1", "furniture"), + ("product_2", "clothing"), + ("product_3", "books"), + ("product_4", "toys"), + ("product_5", "electronics"), + ("product_6", "furniture"), + ("product_7", "clothing"), + ("product_8", "books"), + ("product_9", "toys"), + ], + ) + + con.close() + return str(db_file) + + +@pytest.fixture(scope="module") +def datahub_metadata(): + """Push table descriptions to DataHub; delete them on teardown.""" + gms_url, token = _datahub_creds() + urns = [] + for table, description in _TABLES.items(): + urn = _dataset_urn(table) + _emit_table_description(gms_url, token, urn, table, description) + urns.append(urn) + print(f"[✓] DataHub metadata pushed: {urn}") + + yield urns + + # Teardown + for urn in urns: + _delete_entity(gms_url, token, urn) + print(f"[✓] DataHub entity deleted: {urn}") + + +@pytest.fixture(scope="module") +def duckdb_engine(duckdb_path): + """SQLAlchemyQueryEngine backed by the temp DuckDB file.""" + import asyncio + + from analytics_agent.engines.sqlalchemy.engine import SQLAlchemyQueryEngine + + engine = SQLAlchemyQueryEngine({"dialect": "duckdb", "database": duckdb_path}) + yield engine + asyncio.run(engine.aclose()) + + +@pytest.fixture(scope="module") +def agent_graph(duckdb_engine, datahub_metadata): + """Full agent graph: DuckDB engine tools + DataHub context tools.""" + from analytics_agent.agent.graph import build_graph + from analytics_agent.context.datahub import build_datahub_tools + + context_tools = build_datahub_tools() + engine_tools = duckdb_engine.get_tools() + + assert engine_tools, "DuckDB engine returned no tools" + assert context_tools, "No DataHub context tools loaded — check credentials" + + return build_graph( + engine_name="test_duckdb", + context_tools=context_tools, + engine_tools=engine_tools, + disabled_tools={"create_chart"}, + ) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +async def _run(graph, question: str) -> list[dict]: + """Run one agent turn and collect all events.""" + from analytics_agent.agent.streaming import stream_graph_events + + events: list[dict] = [] + conv_id = f"e2e-duckdb-{uuid.uuid4().hex[:8]}" + async for event in stream_graph_events(graph, question, conv_id, "test_duckdb"): + events.append(event) + label = event["payload"].get("text") or event["payload"].get("tool_name") or "" + print(f" [{event['event']}] {str(label)[:80]}") + return events + + +# ── Tests ───────────────────────────────────────────────────────────────────── + + +@_requires_datahub_and_llm +@pytest.mark.asyncio +async def test_top_categories_by_revenue(agent_graph): + """Agent must run SQL and return top product categories by revenue.""" + events = await _run( + agent_graph, + "What are the top 3 product categories by total revenue (price + freight_value)?", + ) + + event_types = {e["event"] for e in events} + print("\nEvent types:", event_types) + + assert "COMPLETE" in event_types, f"No COMPLETE event — got: {event_types}" + assert "TEXT" in event_types, f"No TEXT event — got: {event_types}" + + # Agent must have issued at least one successful SQL query + sql_events = [e for e in events if e["event"] == "SQL"] + assert sql_events, ( + "No SQL event emitted — agent did not call execute_sql successfully. " + f"All event types: {event_types}" + ) + + # The SQL result should have rows + rows = sql_events[-1]["payload"].get("rows", []) + assert rows, "SQL result has no rows" + assert len(rows) <= 3, f"Expected ≤3 rows (top 3), got {len(rows)}" + + # The answer should mention at least one of the known categories + complete_text = next(e["payload"].get("text", "") for e in events if e["event"] == "COMPLETE") + known_categories = {"electronics", "furniture", "clothing", "books", "toys"} + assert any(cat in complete_text.lower() for cat in known_categories), ( + f"Response doesn't mention any known category.\nResponse: {complete_text[:400]}" + ) + + +@_requires_datahub_and_llm +@pytest.mark.asyncio +async def test_delivered_vs_canceled_order_count(agent_graph): + """Agent must count delivered vs canceled orders accurately.""" + events = await _run( + agent_graph, + "How many orders are delivered versus canceled?", + ) + + event_types = {e["event"] for e in events} + assert "COMPLETE" in event_types + assert "SQL" in event_types, "Agent should query olist_orders for status counts" + + complete_text = next(e["payload"].get("text", "") for e in events if e["event"] == "COMPLETE") + # Dataset has 45 delivered (i % 10 != 0) and 5 canceled (i % 10 == 0) + # Accept any reasonable mention of both statuses + text_lower = complete_text.lower() + assert "delivered" in text_lower and "canceled" in text_lower, ( + f"Response should mention both statuses.\nResponse: {complete_text[:400]}" + ) + + +@pytest.mark.asyncio +async def test_engine_list_tables(duckdb_engine): + """DuckDB engine's list_tables tool should return all three tables.""" + import orjson + + tools = {t.name: t for t in duckdb_engine.get_tools()} + assert "list_tables" in tools + + result = tools["list_tables"].invoke({"schema": ""}) + tables = orjson.loads(result) + table_names = {t["name"] for t in tables} + assert {"olist_orders", "olist_order_items", "olist_products"} == table_names, ( + f"Unexpected tables: {table_names}" + ) + + +@pytest.mark.asyncio +async def test_engine_execute_sql(duckdb_engine): + """DuckDB engine's execute_sql tool should return correct row counts.""" + import orjson + + tools = {t.name: t for t in duckdb_engine.get_tools()} + result = tools["execute_sql"].invoke( + { + "sql": "SELECT order_status, COUNT(*) AS cnt FROM olist_orders GROUP BY order_status ORDER BY cnt DESC" + } + ) + parsed = orjson.loads(result) + assert "error" not in parsed, f"SQL error: {parsed.get('error')}" + + rows = {row["order_status"]: row["cnt"] for row in parsed["rows"]} + assert rows.get("delivered") == 45, f"Expected 45 delivered, got {rows}" + assert rows.get("canceled") == 5, f"Expected 5 canceled, got {rows}" From 35b385bb8767a4376ece2e42e6432e9f808a5953 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Sun, 31 May 2026 10:29:20 -0700 Subject: [PATCH 2/3] fix: duckdb connection shows correct status and tool toggles in Settings UI - Add "duckdb" to the elif branch that renders connection fields so a configured DuckDB connection shows "connected" rather than "unconfigured" - File-based engines (sqlite, duckdb) only need database path to be considered configured; server engines still require host - Add "duckdb" to _KNOWN_TOOLS so the tool toggle panel renders correctly Co-Authored-By: Claude Sonnet 4.6 --- backend/src/analytics_agent/api/settings.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/backend/src/analytics_agent/api/settings.py b/backend/src/analytics_agent/api/settings.py index 64aced0..6151cea 100644 --- a/backend/src/analytics_agent/api/settings.py +++ b/backend/src/analytics_agent/api/settings.py @@ -226,6 +226,12 @@ class UpdateDisplayRequest(BaseModel): {"name": "preview_table", "label": "Preview data"}, {"name": "execute_sql", "label": "Execute SQL"}, ], + "duckdb": [ + {"name": "list_tables", "label": "List tables"}, + {"name": "get_schema", "label": "Table schema"}, + {"name": "preview_table", "label": "Preview data"}, + {"name": "execute_sql", "label": "Execute SQL"}, + ], "chart": [ {"name": "create_chart", "label": "Create visualizations"}, ], @@ -690,13 +696,19 @@ async def list_connections(session: AsyncSession = Depends(get_session)): placeholder='{"type":"service_account",...}', ), ] - elif intg.type in ("mysql", "sqlalchemy", "postgresql", "sqlite"): + elif intg.type in ("mysql", "sqlalchemy", "postgresql", "sqlite", "duckdb"): host = conn_cfg.get("host", "") database = conn_cfg.get("database", conn_cfg.get("db", "")) port = str(conn_cfg.get("port", "")) user = conn_cfg.get("user", conn_cfg.get("username", "")) has_url = bool(conn_cfg.get("url")) - status_str = "connected" if (has_url or (host and database)) else "unconfigured" + # File-based engines (sqlite, duckdb) only need database; server engines need host too. + file_based = intg.type in ("sqlite", "duckdb") + status_str = ( + "connected" + if (has_url or (file_based and bool(database)) or (host and database)) + else "unconfigured" + ) if has_url: fields = [ ConnectionField( From 3fcfa9aee0d12d6f0d88bbfce598b59a0b377d86 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Sun, 31 May 2026 10:43:58 -0700 Subject: [PATCH 3/3] test: add engine connector contract tests + extract _compute_engine_status helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every engine type must be wired through several touchpoints — factory dispatch, secret_env_vars, _KNOWN_TOOLS, the status renderer, and the frontend plugin index. Forgetting any one of them silently degrades the UX (empty tool toggles, connections stuck as 'unconfigured', missing from the picker) without failing CI. The contract test enumerates every known engine type and asserts each touchpoint handles it. Adding a new connector means adding one entry to MINIMAL_CONFIGS — the rest of the wiring is then enforced. Extract _compute_engine_status as a small pure helper so the status check is unit-testable without DB setup. Use it everywhere list_connections computes a status (snowflake, bigquery, sqlalchemy-family, fallback). The new tests caught three pre-existing gaps: hive, postgresql, and sqlite were all missing from _KNOWN_TOOLS — their tool-toggles panel was empty in the Settings UI. Fixed all three. Co-Authored-By: Claude Sonnet 4.6 --- backend/src/analytics_agent/api/settings.py | 65 ++++++++--- tests/unit/test_engine_contract.py | 115 ++++++++++++++++++++ 2 files changed, 162 insertions(+), 18 deletions(-) create mode 100644 tests/unit/test_engine_contract.py diff --git a/backend/src/analytics_agent/api/settings.py b/backend/src/analytics_agent/api/settings.py index 6151cea..81a99b9 100644 --- a/backend/src/analytics_agent/api/settings.py +++ b/backend/src/analytics_agent/api/settings.py @@ -214,13 +214,25 @@ class UpdateDisplayRequest(BaseModel): {"name": "preview_table", "label": "Preview data"}, {"name": "execute_sql", "label": "Execute SQL"}, ], + "hive": [ + {"name": "list_tables", "label": "List tables"}, + {"name": "get_schema", "label": "Table schema"}, + {"name": "preview_table", "label": "Preview data"}, + {"name": "execute_sql", "label": "Execute SQL"}, + ], "mysql": [ {"name": "list_tables", "label": "List tables"}, {"name": "get_schema", "label": "Table schema"}, {"name": "preview_table", "label": "Preview data"}, {"name": "execute_sql", "label": "Execute SQL"}, ], - "sqlalchemy": [ + "postgresql": [ + {"name": "list_tables", "label": "List tables"}, + {"name": "get_schema", "label": "Table schema"}, + {"name": "preview_table", "label": "Preview data"}, + {"name": "execute_sql", "label": "Execute SQL"}, + ], + "sqlite": [ {"name": "list_tables", "label": "List tables"}, {"name": "get_schema", "label": "Table schema"}, {"name": "preview_table", "label": "Preview data"}, @@ -232,6 +244,12 @@ class UpdateDisplayRequest(BaseModel): {"name": "preview_table", "label": "Preview data"}, {"name": "execute_sql", "label": "Execute SQL"}, ], + "sqlalchemy": [ + {"name": "list_tables", "label": "List tables"}, + {"name": "get_schema", "label": "Table schema"}, + {"name": "preview_table", "label": "Preview data"}, + {"name": "execute_sql", "label": "Execute SQL"}, + ], "chart": [ {"name": "create_chart", "label": "Create visualizations"}, ], @@ -255,6 +273,30 @@ def _build_tool_toggles( return result +def _compute_engine_status(engine_type: str, conn_cfg: dict, sso_connected: bool = False) -> str: + """Return 'connected' or 'unconfigured' for an engine connection.""" + from analytics_agent.engines.factory import _CONNECTOR_MAP + + spec = _CONNECTOR_MAP.get(engine_type) + if spec is not None: + return ( + "connected" + if spec.is_configured(conn_cfg, sso_connected=sso_connected) + else "unconfigured" + ) + + if engine_type in ("mysql", "sqlalchemy", "postgresql", "sqlite", "duckdb"): + host = conn_cfg.get("host", "") + database = conn_cfg.get("database", conn_cfg.get("db", "")) + has_url = bool(conn_cfg.get("url")) + # File-based engines need only `database`; server engines need host too. + file_based = engine_type in ("sqlite", "duckdb") + if has_url or (file_based and bool(database)) or (host and database): + return "connected" + + return "unconfigured" + + # --- Connection helpers --- @@ -602,15 +644,9 @@ async def list_connections(session: AsyncSession = Depends(get_session)): is_sso_connected = cred is not None and cred.auth_type == "sso_externalbrowser" if intg.type == "snowflake": - from analytics_agent.engines.factory import _CONNECTOR_MAP as _CM - account = conn_cfg.get("account", "") user = conn_cfg.get("user", "") - status_str = ( - "connected" - if _CM["snowflake"].is_configured(conn_cfg, sso_connected=is_sso_connected) - else "unconfigured" - ) + status_str = _compute_engine_status(intg.type, conn_cfg, sso_connected=is_sso_connected) # Detect active auth method so the frontend can pre-select the right tab. if is_sso_connected: active_auth_method = "sso" @@ -673,7 +709,7 @@ async def list_connections(session: AsyncSession = Depends(get_session)): conn_cfg.get(k) or os.environ.get(_CM["bigquery"].env_map.get(k, ""), "") for k in _CM["bigquery"].credential_keys ) - status_str = "connected" if _CM["bigquery"].is_configured(conn_cfg) else "unconfigured" + status_str = _compute_engine_status(intg.type, conn_cfg) fields = [ ConnectionField( key="project", @@ -702,13 +738,7 @@ async def list_connections(session: AsyncSession = Depends(get_session)): port = str(conn_cfg.get("port", "")) user = conn_cfg.get("user", conn_cfg.get("username", "")) has_url = bool(conn_cfg.get("url")) - # File-based engines (sqlite, duckdb) only need database; server engines need host too. - file_based = intg.type in ("sqlite", "duckdb") - status_str = ( - "connected" - if (has_url or (file_based and bool(database)) or (host and database)) - else "unconfigured" - ) + status_str = _compute_engine_status(intg.type, conn_cfg) if has_url: fields = [ ConnectionField( @@ -748,8 +778,8 @@ async def list_connections(session: AsyncSession = Depends(get_session)): from analytics_agent.engines.factory import _CONNECTOR_MAP as _CM spec = _CM.get(intg.type) + status_str = _compute_engine_status(intg.type, conn_cfg) if spec is not None and spec.display_fields: - status_str = "connected" if spec.is_configured(conn_cfg) else "unconfigured" fields = [] for df in spec.display_fields: raw = conn_cfg.get(df.key, "") or os.environ.get( @@ -767,7 +797,6 @@ async def list_connections(session: AsyncSession = Depends(get_session)): ) ) else: - status_str = "unconfigured" fields = [] oauth_status = ( diff --git a/tests/unit/test_engine_contract.py b/tests/unit/test_engine_contract.py new file mode 100644 index 0000000..a1f243f --- /dev/null +++ b/tests/unit/test_engine_contract.py @@ -0,0 +1,115 @@ +""" +Contract tests for engine connectors. + +Every supported engine type must be wired through several touchpoints — the +engine factory, the secret-env-vars registry, the per-type tools registry, the +status renderer, and the frontend plugin index. Forgetting any one of them +produces silent UX failures (e.g. a connection that always shows as +"unconfigured", or an empty tool-toggles panel) that don't fail in CI. + +This test enumerates every known engine type and asserts each touchpoint +handles it. Add a new entry to MINIMAL_CONFIGS to introduce a connector — +each test will then enforce that the rest of the wiring is in place. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +# Minimal config that should yield a "connected" status for each engine type. +# When adding a new connector, add it here. +MINIMAL_CONFIGS: dict[str, dict[str, str]] = { + "snowflake": {"account": "x", "user": "y", "password": "z"}, + "hive": {"host": "x", "user": "y", "password": "z"}, + "bigquery": {"project": "x", "credentials_json": '{"x":"y"}'}, + "mysql": {"host": "x", "database": "y", "user": "z", "password": "p"}, + "postgresql": {"host": "x", "database": "y", "user": "z", "password": "p"}, + "sqlite": {"dialect": "sqlite", "database": "/tmp/x.db"}, + "duckdb": {"dialect": "duckdb", "database": "/tmp/x.duckdb"}, +} + +ENGINE_TYPES = sorted(MINIMAL_CONFIGS) + +# Every query engine exposes the same four tools — anything missing means the +# tool-toggles panel in Settings will be incomplete. +_REQUIRED_SQL_TOOLS = {"execute_sql", "list_tables", "get_schema", "preview_table"} + +# Plugins in the frontend index follow the convention `${type}Plugin`. +_FRONTEND_INDEX = ( + Path(__file__).resolve().parents[2] + / "frontend" + / "src" + / "components" + / "Settings" + / "connections" + / "index.ts" +) + + +@pytest.mark.parametrize("engine_type", ENGINE_TYPES) +def test_factory_returns_callable(engine_type): + """_engine_cls must return a factory for every known type.""" + from analytics_agent.engines.factory import _engine_cls + + fn = _engine_cls(engine_type) + assert fn is not None, ( + f"_engine_cls({engine_type!r}) returned None — add it to the dispatch dict in factory.py" + ) + assert callable(fn) + + +@pytest.mark.parametrize("engine_type", ENGINE_TYPES) +def test_secret_env_vars_returns_dict(engine_type): + """get_secret_env_vars must return a dict (possibly empty) for every type.""" + from analytics_agent.engines.factory import get_secret_env_vars + + result = get_secret_env_vars(engine_type) + assert isinstance(result, dict), ( + f"get_secret_env_vars({engine_type!r}) returned {type(result).__name__}, expected dict" + ) + + +@pytest.mark.parametrize("engine_type", ENGINE_TYPES) +def test_known_tools_has_standard_sql_tools(engine_type): + """_KNOWN_TOOLS must list the four standard SQL tools — otherwise the toggle UI is empty.""" + from analytics_agent.api.settings import _KNOWN_TOOLS + + assert engine_type in _KNOWN_TOOLS, ( + f"_KNOWN_TOOLS missing entry for {engine_type!r} — tool toggles panel will be empty" + ) + tool_names = {t["name"] for t in _KNOWN_TOOLS[engine_type]} + missing = _REQUIRED_SQL_TOOLS - tool_names + assert not missing, f"_KNOWN_TOOLS[{engine_type!r}] missing tools: {missing}" + + +@pytest.mark.parametrize("engine_type", ENGINE_TYPES) +def test_minimal_config_renders_as_connected(engine_type): + """A minimally-configured connection must show 'connected', not 'unconfigured'.""" + from analytics_agent.api.settings import _compute_engine_status + + status = _compute_engine_status(engine_type, MINIMAL_CONFIGS[engine_type]) + assert status == "connected", ( + f"{engine_type} with minimal config rendered as {status!r}; " + f"add it to _compute_engine_status (or to the engine's ConnectorSpec)" + ) + + +@pytest.mark.parametrize("engine_type", ENGINE_TYPES) +def test_empty_config_renders_as_unconfigured(engine_type): + """An empty config must show 'unconfigured' — the status check is meaningful.""" + from analytics_agent.api.settings import _compute_engine_status + + assert _compute_engine_status(engine_type, {}) == "unconfigured" + + +@pytest.mark.parametrize("engine_type", ENGINE_TYPES) +def test_frontend_plugin_registered(engine_type): + """frontend index.ts must import a `${type}Plugin` — otherwise the type is missing from the picker.""" + content = _FRONTEND_INDEX.read_text() + expected = f"{engine_type}Plugin" + assert expected in content, ( + f"frontend index.ts missing `{expected}` — add the plugin import and " + f"register it in CONNECTION_PLUGINS" + )