Skip to content

Commit 481c9a7

Browse files
Merge remote-tracking branch 'fresioAS/add_fabric_warehouse' into feat/add-fabric-engine
2 parents 7441d63 + 9c37b9e commit 481c9a7

File tree

11 files changed

+289
-1
lines changed

11 files changed

+289
-1
lines changed

docs/guides/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,7 @@ These pages describe the connection configuration options for each execution eng
767767
* [BigQuery](../integrations/engines/bigquery.md)
768768
* [Databricks](../integrations/engines/databricks.md)
769769
* [DuckDB](../integrations/engines/duckdb.md)
770+
* [Fabric](../integrations/engines/fabric.md)
770771
* [MotherDuck](../integrations/engines/motherduck.md)
771772
* [MySQL](../integrations/engines/mysql.md)
772773
* [MSSQL](../integrations/engines/mssql.md)
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Fabric
2+
3+
## Local/Built-in Scheduler
4+
**Engine Adapter Type**: `fabric`
5+
6+
NOTE: Fabric Warehouse is not recommended to be used for the SQLMesh [state connection](../../reference/configuration.md#connections).
7+
8+
### Installation
9+
#### Microsoft Entra ID / Azure Active Directory Authentication:
10+
```
11+
pip install "sqlmesh[mssql-odbc]"
12+
```
13+
14+
### Connection options
15+
16+
| Option | Description | Type | Required |
17+
| ----------------- | ------------------------------------------------------------ | :----------: | :------: |
18+
| `type` | Engine type name - must be `fabric` | string | Y |
19+
| `host` | The hostname of the Fabric Warehouse server | string | Y |
20+
| `user` | The client id to use for authentication with the Fabric Warehouse server | string | N |
21+
| `password` | The client secret to use for authentication with the Fabric Warehouse server | string | N |
22+
| `port` | The port number of the Fabric Warehouse server | int | N |
23+
| `database` | The target database | string | N |
24+
| `charset` | The character set used for the connection | string | N |
25+
| `timeout` | The query timeout in seconds. Default: no timeout | int | N |
26+
| `login_timeout` | The timeout for connection and login in seconds. Default: 60 | int | N |
27+
| `appname` | The application name to use for the connection | string | N |
28+
| `conn_properties` | The list of connection properties | list[string] | N |
29+
| `autocommit` | Is autocommit mode enabled. Default: false | bool | N |
30+
| `driver` | The driver to use for the connection. Default: pyodbc | string | N |
31+
| `driver_name` | The driver name to use for the connection. E.g., *ODBC Driver 18 for SQL Server* | string | N |
32+
| `odbc_properties` | The dict of ODBC connection properties. E.g., authentication: ActiveDirectoryServicePrincipal. See more [here](https://learn.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver16). | dict | N |

docs/integrations/overview.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ SQLMesh supports the following execution engines for running SQLMesh projects (e
1717
* [ClickHouse](./engines/clickhouse.md) (clickhouse)
1818
* [Databricks](./engines/databricks.md) (databricks)
1919
* [DuckDB](./engines/duckdb.md) (duckdb)
20+
* [Fabric](./engines/fabric.md) (fabric)
2021
* [MotherDuck](./engines/motherduck.md) (motherduck)
2122
* [MSSQL](./engines/mssql.md) (mssql)
2223
* [MySQL](./engines/mysql.md) (mysql)

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ nav:
8383
- integrations/engines/clickhouse.md
8484
- integrations/engines/databricks.md
8585
- integrations/engines/duckdb.md
86+
- integrations/engines/fabric.md
8687
- integrations/engines/motherduck.md
8788
- integrations/engines/mssql.md
8889
- integrations/engines/mysql.md

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ markers = [
252252
"clickhouse_cloud: test for Clickhouse (cloud mode)",
253253
"databricks: test for Databricks",
254254
"duckdb: test for DuckDB",
255+
"fabric: test for Fabric",
255256
"motherduck: test for MotherDuck",
256257
"mssql: test for MSSQL",
257258
"mysql: test for MySQL",

sqlmesh/core/config/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
ConnectionConfig as ConnectionConfig,
1111
DatabricksConnectionConfig as DatabricksConnectionConfig,
1212
DuckDBConnectionConfig as DuckDBConnectionConfig,
13+
FabricConnectionConfig as FabricConnectionConfig,
1314
GCPPostgresConnectionConfig as GCPPostgresConnectionConfig,
1415
MotherDuckConnectionConfig as MotherDuckConnectionConfig,
1516
MSSQLConnectionConfig as MSSQLConnectionConfig,

sqlmesh/core/config/connection.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,13 @@
4343

4444
logger = logging.getLogger(__name__)
4545

46-
RECOMMENDED_STATE_SYNC_ENGINES = {"postgres", "gcp_postgres", "mysql", "mssql", "azuresql"}
46+
RECOMMENDED_STATE_SYNC_ENGINES = {
47+
"postgres",
48+
"gcp_postgres",
49+
"mysql",
50+
"mssql",
51+
"azuresql",
52+
}
4753
FORBIDDEN_STATE_SYNC_ENGINES = {
4854
# Do not support row-level operations
4955
"spark",
@@ -1671,6 +1677,34 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]:
16711677
return {"catalog_support": CatalogSupport.SINGLE_CATALOG_ONLY}
16721678

16731679

1680+
class FabricConnectionConfig(MSSQLConnectionConfig):
1681+
"""
1682+
Fabric Connection Configuration.
1683+
Inherits most settings from MSSQLConnectionConfig and sets the type to 'fabric'.
1684+
It is recommended to use the 'pyodbc' driver for Fabric.
1685+
"""
1686+
1687+
type_: t.Literal["fabric"] = Field(alias="type", default="fabric") # type: ignore
1688+
DIALECT: t.ClassVar[t.Literal["fabric"]] = "fabric" # type: ignore
1689+
DISPLAY_NAME: t.ClassVar[t.Literal["Fabric"]] = "Fabric" # type: ignore
1690+
DISPLAY_ORDER: t.ClassVar[t.Literal[17]] = 17 # type: ignore
1691+
driver: t.Literal["pyodbc"] = "pyodbc"
1692+
autocommit: t.Optional[bool] = True
1693+
1694+
@property
1695+
def _engine_adapter(self) -> t.Type[EngineAdapter]:
1696+
from sqlmesh.core.engine_adapter.fabric import FabricAdapter
1697+
1698+
return FabricAdapter
1699+
1700+
@property
1701+
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
1702+
return {
1703+
"database": self.database,
1704+
"catalog_support": CatalogSupport.REQUIRES_SET_CATALOG,
1705+
}
1706+
1707+
16741708
class SparkConnectionConfig(ConnectionConfig):
16751709
"""
16761710
Vanilla Spark Connection Configuration. Use `DatabricksConnectionConfig` for Databricks.

sqlmesh/core/engine_adapter/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter
2020
from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter
2121
from sqlmesh.core.engine_adapter.risingwave import RisingwaveEngineAdapter
22+
from sqlmesh.core.engine_adapter.fabric import FabricAdapter
2223

2324
DIALECT_TO_ENGINE_ADAPTER = {
2425
"hive": SparkEngineAdapter,
@@ -35,6 +36,7 @@
3536
"trino": TrinoEngineAdapter,
3637
"athena": AthenaEngineAdapter,
3738
"risingwave": RisingwaveEngineAdapter,
39+
"fabric": FabricAdapter,
3840
}
3941

4042
DIALECT_ALIASES = {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from __future__ import annotations
2+
3+
import typing as t
4+
from sqlglot import exp
5+
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
6+
from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery
7+
from sqlmesh.core.engine_adapter.base import EngineAdapter
8+
9+
if t.TYPE_CHECKING:
10+
from sqlmesh.core._typing import TableName
11+
12+
13+
from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin
14+
15+
16+
class FabricAdapter(LogicalMergeMixin, MSSQLEngineAdapter):
17+
"""
18+
Adapter for Microsoft Fabric.
19+
"""
20+
21+
DIALECT = "fabric"
22+
SUPPORTS_INDEXES = False
23+
SUPPORTS_TRANSACTIONS = False
24+
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT
25+
26+
def _insert_overwrite_by_condition(
27+
self,
28+
table_name: TableName,
29+
source_queries: t.List[SourceQuery],
30+
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
31+
where: t.Optional[exp.Condition] = None,
32+
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
33+
**kwargs: t.Any,
34+
) -> None:
35+
"""
36+
Implements the insert overwrite strategy for Fabric using DELETE and INSERT.
37+
38+
This method is overridden to avoid the MERGE statement from the parent
39+
MSSQLEngineAdapter, which is not fully supported in Fabric.
40+
"""
41+
return EngineAdapter._insert_overwrite_by_condition(
42+
self,
43+
table_name=table_name,
44+
source_queries=source_queries,
45+
columns_to_types=columns_to_types,
46+
where=where,
47+
insert_overwrite_strategy_override=InsertOverwriteStrategy.DELETE_INSERT,
48+
**kwargs,
49+
)
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# type: ignore
2+
3+
import typing as t
4+
5+
import pytest
6+
from sqlglot import exp, parse_one
7+
8+
from sqlmesh.core.engine_adapter import FabricAdapter
9+
from tests.core.engine_adapter import to_sql_calls
10+
11+
pytestmark = [pytest.mark.engine, pytest.mark.fabric]
12+
13+
14+
@pytest.fixture
15+
def adapter(make_mocked_engine_adapter: t.Callable) -> FabricAdapter:
16+
return make_mocked_engine_adapter(FabricAdapter)
17+
18+
19+
def test_columns(adapter: FabricAdapter):
20+
adapter.cursor.fetchall.return_value = [
21+
("decimal_ps", "decimal", None, 5, 4),
22+
("decimal", "decimal", None, 18, 0),
23+
("float", "float", None, 53, None),
24+
("char_n", "char", 10, None, None),
25+
("varchar_n", "varchar", 10, None, None),
26+
("nvarchar_max", "nvarchar", -1, None, None),
27+
]
28+
29+
assert adapter.columns("db.table") == {
30+
"decimal_ps": exp.DataType.build("decimal(5, 4)", dialect=adapter.dialect),
31+
"decimal": exp.DataType.build("decimal(18, 0)", dialect=adapter.dialect),
32+
"float": exp.DataType.build("float(53)", dialect=adapter.dialect),
33+
"char_n": exp.DataType.build("char(10)", dialect=adapter.dialect),
34+
"varchar_n": exp.DataType.build("varchar(10)", dialect=adapter.dialect),
35+
"nvarchar_max": exp.DataType.build("nvarchar(max)", dialect=adapter.dialect),
36+
}
37+
38+
# Verify that the adapter queries the uppercase INFORMATION_SCHEMA
39+
adapter.cursor.execute.assert_called_once_with(
40+
"""SELECT [COLUMN_NAME], [DATA_TYPE], [CHARACTER_MAXIMUM_LENGTH], [NUMERIC_PRECISION], [NUMERIC_SCALE] FROM [INFORMATION_SCHEMA].[COLUMNS] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';"""
41+
)
42+
43+
44+
def test_table_exists(adapter: FabricAdapter):
45+
adapter.cursor.fetchone.return_value = (1,)
46+
assert adapter.table_exists("db.table")
47+
# Verify that the adapter queries the uppercase INFORMATION_SCHEMA
48+
adapter.cursor.execute.assert_called_once_with(
49+
"""SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'table' AND [TABLE_SCHEMA] = 'db';"""
50+
)
51+
52+
adapter.cursor.fetchone.return_value = None
53+
assert not adapter.table_exists("db.table")
54+
55+
56+
def test_insert_overwrite_by_time_partition(adapter: FabricAdapter):
57+
adapter.insert_overwrite_by_time_partition(
58+
"test_table",
59+
parse_one("SELECT a, b FROM tbl"),
60+
start="2022-01-01",
61+
end="2022-01-02",
62+
time_column="b",
63+
time_formatter=lambda x, _: exp.Literal.string(x.strftime("%Y-%m-%d")),
64+
columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("STRING")},
65+
)
66+
67+
# Fabric adapter should use DELETE/INSERT strategy, not MERGE.
68+
assert to_sql_calls(adapter) == [
69+
"""DELETE FROM [test_table] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""",
70+
"""INSERT INTO [test_table] ([a], [b]) SELECT [a], [b] FROM (SELECT [a] AS [a], [b] AS [b] FROM [tbl]) AS [_subquery] WHERE [b] BETWEEN '2022-01-01' AND '2022-01-02';""",
71+
]
72+
73+
74+
def test_replace_query(adapter: FabricAdapter):
75+
adapter.cursor.fetchone.return_value = (1,)
76+
adapter.replace_query("test_table", parse_one("SELECT a FROM tbl"), {"a": "int"})
77+
78+
# This behavior is inherited from MSSQLEngineAdapter and should be TRUNCATE + INSERT
79+
assert to_sql_calls(adapter) == [
80+
"""SELECT 1 FROM [INFORMATION_SCHEMA].[TABLES] WHERE [TABLE_NAME] = 'test_table';""",
81+
"TRUNCATE TABLE [test_table];",
82+
"INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];",
83+
]

0 commit comments

Comments
 (0)