Skip to content

Commit da2e403

Browse files
authored
Fix: Make sure physical tables are recreated for seed models after a failure occurs during the table creation (#2742)
1 parent 3d3d5ab commit da2e403

File tree

2 files changed

+120
-22
lines changed

2 files changed

+120
-22
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ def _get_data_objects(schema: exp.Table) -> t.Set[str]:
278278

279279
snapshots_to_create = []
280280
for snapshot, table_names in snapshots_with_table_names.items():
281-
if table_names - existing_objects:
281+
if table_names - existing_objects or (snapshot.is_seed and not snapshot.intervals):
282282
snapshots_to_create.append(snapshot)
283283
elif on_complete:
284284
on_complete(snapshot)
@@ -1357,16 +1357,24 @@ def create(
13571357
is_snapshot_deployable: bool,
13581358
**render_kwargs: t.Any,
13591359
) -> None:
1360-
super().create(snapshot, name, is_table_deployable, is_snapshot_deployable, **render_kwargs)
1361-
1362-
# For seeds we insert data at the time of table creation.
13631360
if is_table_deployable:
1361+
# For seeds we insert data at the time of table creation.
13641362
model = t.cast(SeedModel, snapshot.model)
1365-
for index, df in enumerate(model.render_seed()):
1366-
if index == 0:
1367-
self._replace_query_for_model(model, name, df)
1368-
else:
1369-
self.adapter.insert_append(name, df, columns_to_types=model.columns_to_types)
1363+
try:
1364+
for index, df in enumerate(model.render_seed()):
1365+
if index == 0:
1366+
self._replace_query_for_model(model, name, df)
1367+
else:
1368+
self.adapter.insert_append(
1369+
name, df, columns_to_types=model.columns_to_types
1370+
)
1371+
except Exception:
1372+
self.adapter.drop_table(name)
1373+
raise
1374+
else:
1375+
super().create(
1376+
snapshot, name, is_table_deployable, is_snapshot_deployable, **render_kwargs
1377+
)
13701378

13711379
def insert(
13721380
self,

tests/core/test_snapshot_evaluator.py

Lines changed: 103 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1707,19 +1707,17 @@ def test_create_seed(mocker: MockerFixture, adapter_mock, make_snapshot):
17071707
table_description=None,
17081708
)
17091709

1710-
adapter_mock.create_table.assert_has_calls(
1711-
[
1712-
call(
1713-
f"sqlmesh__db.db__seed__{snapshot.version}__temp",
1714-
column_descriptions=None,
1715-
**common_create_kwargs,
1716-
),
1717-
call(
1718-
f"sqlmesh__db.db__seed__{snapshot.version}",
1719-
column_descriptions={},
1720-
**common_create_kwargs,
1721-
),
1722-
]
1710+
adapter_mock.replace_query.assert_called_once_with(
1711+
f"sqlmesh__db.db__seed__{snapshot.version}",
1712+
mocker.ANY,
1713+
column_descriptions={},
1714+
**common_create_kwargs,
1715+
)
1716+
1717+
adapter_mock.create_table.assert_called_once_with(
1718+
f"sqlmesh__db.db__seed__{snapshot.version}__temp",
1719+
column_descriptions=None,
1720+
**common_create_kwargs,
17231721
)
17241722

17251723
replace_query_calls = adapter_mock.replace_query.call_args_list
@@ -1747,6 +1745,98 @@ def test_create_seed(mocker: MockerFixture, adapter_mock, make_snapshot):
17471745
]
17481746

17491747

1748+
def test_create_seed_on_error(mocker: MockerFixture, adapter_mock, make_snapshot):
1749+
adapter_mock.insert_append.side_effect = Exception("test error")
1750+
1751+
expressions = d.parse(
1752+
"""
1753+
MODEL (
1754+
name db.seed,
1755+
kind SEED (
1756+
path '../seeds/waiter_names.csv',
1757+
batch_size 5,
1758+
)
1759+
);
1760+
"""
1761+
)
1762+
1763+
model = load_sql_based_model(expressions, path=Path("./examples/sushi/models/test_model.sql"))
1764+
1765+
snapshot = make_snapshot(model)
1766+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
1767+
1768+
evaluator = SnapshotEvaluator(adapter_mock)
1769+
evaluator.create([snapshot], {})
1770+
1771+
adapter_mock.replace_query.assert_called_once_with(
1772+
f"sqlmesh__db.db__seed__{snapshot.version}",
1773+
mocker.ANY,
1774+
column_descriptions={},
1775+
columns_to_types={"id": exp.DataType.build("bigint"), "name": exp.DataType.build("text")},
1776+
storage_format=None,
1777+
partitioned_by=[],
1778+
partition_interval_unit=IntervalUnit.DAY,
1779+
clustered_by=[],
1780+
table_properties={},
1781+
table_description=None,
1782+
)
1783+
1784+
adapter_mock.drop_table.assert_called_once_with(f"sqlmesh__db.db__seed__{snapshot.version}")
1785+
1786+
1787+
def test_create_seed_no_intervals(mocker: MockerFixture, adapter_mock, make_snapshot):
1788+
expressions = d.parse(
1789+
"""
1790+
MODEL (
1791+
name db.seed,
1792+
kind SEED (
1793+
path '../seeds/waiter_names.csv',
1794+
)
1795+
);
1796+
"""
1797+
)
1798+
1799+
model = load_sql_based_model(expressions, path=Path("./examples/sushi/models/test_model.sql"))
1800+
1801+
snapshot = make_snapshot(model)
1802+
snapshot.categorize_as(SnapshotChangeCategory.BREAKING)
1803+
snapshot.intervals = [(to_timestamp("2020-01-01"), to_timestamp("2020-01-02"))]
1804+
1805+
# Simulate that the table already exists.
1806+
adapter_mock.get_data_objects.return_value = [
1807+
DataObject(
1808+
name=f"db__seed__{snapshot.version}",
1809+
schema="sqlmesh__db",
1810+
type=DataObjectType.TABLE,
1811+
),
1812+
DataObject(
1813+
name=f"db__seed__{snapshot.version}__temp",
1814+
schema="sqlmesh__db",
1815+
type=DataObjectType.TABLE,
1816+
),
1817+
]
1818+
1819+
evaluator = SnapshotEvaluator(adapter_mock)
1820+
evaluator.create([snapshot], {})
1821+
1822+
snapshot.intervals = []
1823+
evaluator.create([snapshot], {})
1824+
1825+
# The replace query should only be called once when there are no intervals.
1826+
adapter_mock.replace_query.assert_called_once_with(
1827+
f"sqlmesh__db.db__seed__{snapshot.version}",
1828+
mocker.ANY,
1829+
column_descriptions={},
1830+
columns_to_types={"id": exp.DataType.build("bigint"), "name": exp.DataType.build("text")},
1831+
storage_format=None,
1832+
partitioned_by=[],
1833+
partition_interval_unit=IntervalUnit.DAY,
1834+
clustered_by=[],
1835+
table_properties={},
1836+
table_description=None,
1837+
)
1838+
1839+
17501840
def test_standalone_audit(mocker: MockerFixture, adapter_mock, make_snapshot):
17511841
evaluator = SnapshotEvaluator(adapter_mock)
17521842

0 commit comments

Comments
 (0)