Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/wren/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pip install wren-engine[mysql] # MySQL
pip install wren-engine[bigquery] # BigQuery
pip install wren-engine[snowflake] # Snowflake
pip install wren-engine[clickhouse] # ClickHouse
pip install wren-engine[ytsaurus] # YTsaurus (via CHYT)
pip install wren-engine[trino] # Trino
pip install wren-engine[mssql] # SQL Server
pip install wren-engine[databricks] # Databricks
Expand Down
15 changes: 15 additions & 0 deletions core/wren/docs/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,18 @@ Both formats are accepted. The CLI auto-flattens the envelope format.
"format": "parquet"
}
```

## YTsaurus (via CHYT)

```json
{
"datasource": "ytsaurus",
"proxy": "yt-proxy.example.com",
"clique": "*ch_public",
"token": "y0_AgAA..."
}
```

`token` is optional — if omitted, the connector reads `YT_TOKEN` from the
environment. See [`connectors/ytsaurus.md`](connectors/ytsaurus.md) for the
full field reference and CHYT-specific behavior.
95 changes: 95 additions & 0 deletions core/wren/docs/connectors/ytsaurus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# YTsaurus connector

Connects Wren Engine to a [YTsaurus](https://ytsaurus.tech/en) cluster through
its **CHYT** clique (ClickHouse-over-YT). CHYT exposes a ClickHouse HTTP
protocol on the YT HTTP proxy, so this connector reuses Wren's existing
ClickHouse / Ibis path with YT-flavored auth (`Authorization: OAuth <YT_TOKEN>`
and a clique alias as the ClickHouse `database`).

The sqlglot dialect is `clickhouse`, so all CHYT-compatible ClickHouse SQL —
including `toUnixTimestamp`, `startsWith`, `now() - INTERVAL N DAY`,
`COUNT(DISTINCT ...)` — works as-is.

## Install

```bash
pip install "wren-engine[ytsaurus]"
```

The `ytsaurus` extra pulls `ibis-framework[clickhouse]`.

## Connection info

```python
from wren.model import YTsaurusConnectionInfo
from wren.model.data_source import DataSource
from wren.connector.factory import get_connector

info = DataSource.ytsaurus.get_connection_info({
"proxy": "yt-proxy.example.com", # YT HTTP proxy host
"clique": "*ch_public", # CHYT clique alias incl. leading "*"
# "token": "y0_AgAA...", # optional — falls back to YT_TOKEN env
# "secure": True, # default
# "port": 443, # default 443 / 80 by secure flag
# "settings": {"max_threads": "8"},
# "kwargs": {"connect_timeout": "30"},
})

connector = get_connector(DataSource.ytsaurus, info)
table = connector.query("SELECT now()", limit=1)
print(table.to_pandas())
```

| Field | Type | Default | Meaning |
|---|---|---|---|
| `proxy` | str (required) | — | YT HTTP proxy host (no scheme). |
| `clique` | str (required) | — | CHYT clique alias including the `*` prefix. |
| `token` | SecretStr | env `YT_TOKEN` | YT OAuth token. |
| `secure` | bool | `True` | HTTPS vs HTTP. |
| `port` | int | 443 / 80 | Override proxy port. |
| `settings` | dict | `None` | ClickHouse session settings (e.g. `max_execution_time`). |
| `kwargs` | dict | `None` | Passed to `clickhouse_connect.get_client()`. Supports `http_headers` (the connector merges `Authorization` in automatically). |

JSON form for use with `--connection-info` / `--connection-file`:

```json
{
"datasource": "ytsaurus",
"proxy": "yt-proxy.example.com",
"clique": "*ch_public",
"token": "y0_AgAA..."
}
```

## Auth

The connector resolves the YT OAuth token in this order:

1. `connection_info.token` if provided
2. `YT_TOKEN` environment variable

The token is sent both as `Authorization: OAuth <token>` (current CHYT auth)
and as the ClickHouse `password` (legacy). Either works on any modern YT
proxy.

If neither source produces a token, the connector raises
`WrenError(INVALID_CONNECTION_INFO)`.

## Statement timeout

Like the ClickHouse connector, the YTsaurus connector honors the
`x-wren-db-statement-timeout` header by setting the CHYT session's
`max_execution_time` (defaults to 180 seconds).

## Limitations

- **CHYT only**: the connector targets the ClickHouse-over-YT engine.
Query-Tracker-only features (raw YQL, SPYT) are not exposed. If you need a
YT-native YQL path, fork the connector and replace
`get_ytsaurus_connection` with a Query Tracker REST client; the rest of
the Wren plumbing (factory, enum, connection info) stays the same.
- **Clique availability**: queries fail if the named CHYT clique is not
running. Cliques are managed in the YT UI under "CHYT cliques".
- **Schema discovery**: `system.tables` works for CHYT-attached tables.
Static YT tables outside the clique's exposed schema must be referenced
by their full YT path (`"//home/.../table"`) inside CHYT queries.
3 changes: 2 additions & 1 deletion core/wren/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ mysql = ["mysqlclient>=2.2", "ibis-framework[mysql]"]
bigquery = ["ibis-framework[bigquery]", "google-auth"]
snowflake = ["ibis-framework[snowflake]"]
clickhouse = ["ibis-framework[clickhouse]"]
ytsaurus = ["ibis-framework[clickhouse]"]
trino = ["ibis-framework[trino]", "trino>=0.321"]
mssql = ["ibis-framework[mssql]"]
databricks = ["databricks-sql-connector", "databricks-sdk"]
Expand All @@ -60,7 +61,7 @@ interactive = ["InquirerPy>=0.3.4"]
ui = ["starlette>=0.37", "uvicorn>=0.29", "jinja2>=3.1", "python-multipart>=0.0.9"]
main = ["wren-engine[interactive,ui]"]
all = [
"wren-engine[postgres,mysql,bigquery,snowflake,clickhouse,trino,mssql,databricks,redshift,athena,oracle,spark,main,memory]",
"wren-engine[postgres,mysql,bigquery,snowflake,clickhouse,ytsaurus,trino,mssql,databricks,redshift,athena,oracle,spark,main,memory]",
]
dev = [
"pytest>=8",
Expand Down
1 change: 1 addition & 0 deletions core/wren/src/wren/connector/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
DataSource.oracle: "wren.connector.oracle",
DataSource.snowflake: "wren.connector.ibis",
DataSource.athena: "wren.connector.ibis",
DataSource.ytsaurus: "wren.connector.ytsaurus",
}

# Map data sources to the correct pip extra when they share a connector module
Expand Down
118 changes: 118 additions & 0 deletions core/wren/src/wren/connector/ytsaurus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""YTsaurus (CHYT) connector.

Talks to a YTsaurus cluster through its CHYT (ClickHouse-over-YT) clique. CHYT
exposes a ClickHouse-compatible HTTP protocol on the YT HTTP proxy, so the
underlying machinery is ibis' ClickHouse backend with YT-flavored auth.

Auth: YT OAuth token. Resolution order:
1. ``connection_info.token`` (SecretStr) if provided
2. ``YT_TOKEN`` environment variable

CHYT diverges from a stock ClickHouse server in two ways the IbisConnector
default can't handle:

* **No CREATE VIEW.** ibis introspects query schemas by creating a temporary
view, but CHYT is read-only at the SQL layer and rejects DDL with
``std::out_of_range``. This connector overrides ``query`` and
``dry_run`` to bypass ibis and talk to the underlying ``clickhouse_connect``
HttpClient directly via ``query_arrow``.
* **OAuth-only auth.** The token is sent as ``Authorization: OAuth <token>``
(the ``Bearer`` and ``Basic`` schemes are explicitly rejected by the YT
proxy). The clique alias is passed via the ``chyt.clique_alias`` URL
parameter, both wired in :func:`wren.model.data_source.DataSourceExtension.get_ytsaurus_connection`.
"""

from __future__ import annotations

import pyarrow as pa

from wren.connector.base import IbisConnector
from wren.model.data_source import DataSource
from wren.model.error import DIALECT_SQL, ErrorCode, ErrorPhase, WrenError

try:
import clickhouse_connect

_ClickHouseDbError = clickhouse_connect.driver.exceptions.DatabaseError
except ImportError:

class _ClickHouseDbError(Exception):
"""Fallback stand-in when ``clickhouse_connect`` is not installed."""


class YTsaurusConnector(IbisConnector):
"""Connector for YTsaurus clusters via their CHYT (ClickHouse-over-YT) clique."""

def __init__(self, connection_info):
"""Build the connector with a :class:`YTsaurusConnectionInfo`-shaped payload."""
super().__init__(DataSource.ytsaurus, connection_info)

@property
def _ch_client(self):
"""Underlying clickhouse_connect HttpClient (set up by data_source.py)."""
return self.connection.con

def query(self, sql: str, limit: int | None = None) -> pa.Table:
"""Execute ``sql`` against CHYT and return the result as a ``pyarrow.Table``.

``limit``, if given, is appended as a ``LIMIT`` on a wrapping ``SELECT``.
Non-timeout backend errors are remapped to ``WrenError(INVALID_SQL)``.
"""
wrapped = sql
if limit is not None:
# ``limit`` is interpolated into the SQL string, so refuse anything
# that isn't a non-negative integer to make the f-string safe even
# if a caller bypasses the type hint.
if isinstance(limit, bool) or not isinstance(limit, int) or limit < 0:
raise ValueError(f"limit must be a non-negative int, got {limit!r}")
wrapped = f"SELECT * FROM (\n{sql}\n) LIMIT {limit}"
Comment thread
coderabbitai[bot] marked this conversation as resolved.
try:
# CHYT speaks the ClickHouse Native protocol but rejects
# ``query_arrow`` (UNKNOWN_FORMAT for Arrow). Fall back to native
# rows + columns and assemble a pyarrow.Table here.
result = self._ch_client.query(wrapped)
columns = list(result.column_names)
data = list(result.result_columns)
if len(columns) != len(data):
raise WrenError(
ErrorCode.INVALID_SQL,
f"CHYT returned mismatched column metadata: "
f"{len(columns)} names vs {len(data)} column arrays",
phase=ErrorPhase.SQL_EXECUTION,
metadata={DIALECT_SQL: sql},
)
return pa.table({name: col for name, col in zip(columns, data)})
Comment thread
coderabbitai[bot] marked this conversation as resolved.
except _ClickHouseDbError as e:
if "TIMEOUT_EXCEEDED" not in str(e):
raise WrenError(
ErrorCode.INVALID_SQL,
str(e),
phase=ErrorPhase.SQL_EXECUTION,
metadata={DIALECT_SQL: sql},
) from e
raise
except (WrenError, TimeoutError):
raise

def dry_run(self, sql: str) -> None:
"""Validate ``sql`` against CHYT via ``EXPLAIN AST`` without materializing rows."""
# CHYT supports `EXPLAIN AST` for syntax/planning validation without
# materializing rows. Wrap the user SQL and let CHYT parse it.
try:
self._ch_client.query(f"EXPLAIN AST {sql}")
except _ClickHouseDbError as e:
if "TIMEOUT_EXCEEDED" not in str(e):
raise WrenError(
ErrorCode.INVALID_SQL,
str(e),
phase=ErrorPhase.SQL_DRY_RUN,
metadata={DIALECT_SQL: sql},
) from e
raise
except (WrenError, TimeoutError):
raise


def create_connector(connection_info) -> YTsaurusConnector:
"""Factory hook used by :mod:`wren.connector.factory`."""
return YTsaurusConnector(connection_info)
Loading
Loading