Skip to content

Commit 49aa99d

Browse files
authored
Feat!: Expose Snowflake dynamic tables through the DBT adapter (#2942)
1 parent 713449c commit 49aa99d

4 files changed

Lines changed: 70 additions & 2 deletions

File tree

sqlmesh/core/model/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
SeedKind as SeedKind,
3333
TimeColumn as TimeColumn,
3434
ViewKind as ViewKind,
35+
ManagedKind as ManagedKind,
3536
model_kind_validator as model_kind_validator,
3637
)
3738
from sqlmesh.core.model.meta import ModelMeta as ModelMeta

sqlmesh/dbt/basemodel.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ class Materialization(str, Enum):
4646
EPHEMERAL = "ephemeral"
4747
SNAPSHOT = "snapshot"
4848

49+
# Snowflake, https://docs.getdbt.com/reference/resource-configs/snowflake-configs#dynamic-tables
50+
DYNAMIC_TABLE = "dynamic_table"
51+
4952

5053
class SnapshotStrategy(str, Enum):
5154
"""DBT snapshot strategies"""

sqlmesh/dbt/model.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
ModelKind,
2020
SCDType2ByColumnKind,
2121
ViewKind,
22+
ManagedKind,
2223
create_sql_model,
2324
)
2425
from sqlmesh.core.model.kind import SCDType2ByTimeKind, OnDestructiveChange
@@ -104,6 +105,9 @@ class ModelConfig(BaseModelConfig):
104105

105106
# snowflake
106107
snowflake_warehouse: t.Optional[str] = None
108+
# note: for Snowflake dynamic tables, in the DBT adapter we only support properties that DBT supports
109+
# which are defined here: https://docs.getdbt.com/reference/resource-configs/snowflake-configs#dynamic-tables
110+
target_lag: t.Optional[str] = None
107111

108112
# Private fields
109113
_sql_embedded_config: t.Optional[SqlStr] = None
@@ -298,6 +302,10 @@ def model_kind(self, context: DbtContext) -> ModelKind:
298302
return SCDType2ByTimeKind(
299303
updated_at_name=self.updated_at, updated_at_as_valid_from=True, **shared_kwargs
300304
)
305+
306+
if materialization == Materialization.DYNAMIC_TABLE:
307+
return ManagedKind()
308+
301309
raise ConfigError(f"{materialization.value} materialization not supported.")
302310

303311
@property
@@ -399,8 +407,20 @@ def to_sqlmesh(self, context: DbtContext) -> Model:
399407
if physical_properties:
400408
model_kwargs["physical_properties"] = physical_properties
401409

402-
if context.target.dialect == "snowflake" and self.snowflake_warehouse is not None:
403-
model_kwargs["session_properties"] = {"warehouse": self.snowflake_warehouse}
410+
if context.target.dialect == "snowflake":
411+
if self.snowflake_warehouse is not None:
412+
model_kwargs["session_properties"] = {"warehouse": self.snowflake_warehouse}
413+
414+
if self.model_materialization == Materialization.DYNAMIC_TABLE:
415+
if not self.snowflake_warehouse:
416+
raise ConfigError("`snowflake_warehouse` must be set for dynamic tables")
417+
if not self.target_lag:
418+
raise ConfigError("`target_lag` must be set for dynamic tables")
419+
420+
model_kwargs["physical_properties"] = {
421+
"warehouse": self.snowflake_warehouse,
422+
"target_lag": self.target_lag,
423+
}
404424

405425
return create_sql_model(
406426
self.canonical_name(context),

tests/dbt/test_transformation.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
IncrementalByTimeRangeKind,
2222
IncrementalByUniqueKeyKind,
2323
IncrementalUnmanagedKind,
24+
ManagedKind,
2425
SqlModel,
2526
ViewKind,
2627
)
@@ -264,6 +265,13 @@ def test_model_kind():
264265
insert_overwrite=True, disable_restatement=True
265266
)
266267

268+
assert (
269+
ModelConfig(materialized=Materialization.DYNAMIC_TABLE, target_lag="1 hour").model_kind(
270+
context
271+
)
272+
== ManagedKind()
273+
)
274+
267275
with pytest.raises(ConfigError):
268276
ModelConfig(
269277
materialized=Materialization.INCREMENTAL,
@@ -1120,3 +1128,39 @@ def test_model_cluster_by():
11201128
materialized=Materialization.TABLE.value,
11211129
)
11221130
assert model.to_sqlmesh(context).clustered_by == ["BAR", "QUX"]
1131+
1132+
1133+
def test_snowflake_dynamic_table():
1134+
context = DbtContext()
1135+
context._target = SnowflakeConfig(
1136+
name="target",
1137+
schema="test",
1138+
database="test",
1139+
account="account",
1140+
user="user",
1141+
password="password",
1142+
)
1143+
1144+
model = ModelConfig(
1145+
name="model",
1146+
alias="model",
1147+
package_name="package",
1148+
target_schema="test",
1149+
sql="SELECT * FROM baz",
1150+
materialized=Materialization.DYNAMIC_TABLE.value,
1151+
target_lag="1 hour",
1152+
snowflake_warehouse="SMALL",
1153+
)
1154+
1155+
as_sqlmesh = model.to_sqlmesh(context)
1156+
assert as_sqlmesh.kind == ManagedKind()
1157+
assert as_sqlmesh.physical_properties == {
1158+
"target_lag": exp.Literal.string("1 hour"),
1159+
"warehouse": exp.Literal.string("SMALL"),
1160+
}
1161+
1162+
# both target_lag and snowflake_warehouse are required properties
1163+
# https://docs.getdbt.com/reference/resource-configs/snowflake-configs#dynamic-tables
1164+
for required_property in ["target_lag", "snowflake_warehouse"]:
1165+
with pytest.raises(ConfigError, match=r".*must be set for dynamic tables"):
1166+
model.copy(update={required_property: None}).to_sqlmesh(context)

0 commit comments

Comments
 (0)