Skip to content

Commit 4a23b49

Browse files
authored
Fix: Evaluate a snapshot after promotion if it has paused forward-only parents (#1522)
1 parent cbe8c46 commit 4a23b49

4 files changed

Lines changed: 81 additions & 8 deletions

File tree

sqlmesh/core/plan/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@
99
BuiltInPlanEvaluator,
1010
MWAAPlanEvaluator,
1111
PlanEvaluator,
12+
can_evaluate_before_promote,
1213
)

sqlmesh/core/plan/evaluator.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,16 @@ def __init__(
7272
self.__all_snapshots: t.Dict[str, t.Dict[SnapshotId, Snapshot]] = {}
7373

7474
def evaluate(self, plan: Plan) -> None:
75-
snapshots = plan.snapshots
75+
snapshots = {s.snapshot_id: s for s in plan.snapshots}
76+
all_names = {s.name for s in plan.snapshots}
7677
if plan.is_dev:
77-
before_promote_snapshots = {s.name for s in snapshots}
78+
before_promote_snapshots = all_names
7879
after_promote_snapshots = set()
7980
else:
80-
before_promote_snapshots = {s.name for s in snapshots if not s.is_paused_forward_only}
81-
after_promote_snapshots = {s.name for s in snapshots if s.is_paused_forward_only}
81+
before_promote_snapshots = {
82+
s.name for s in snapshots.values() if can_evaluate_before_promote(s, snapshots)
83+
}
84+
after_promote_snapshots = all_names - before_promote_snapshots
8285

8386
self._push(plan)
8487
self._restate(plan)
@@ -368,3 +371,11 @@ def _apply_plan(self, plan: Plan, plan_request_id: str) -> None:
368371

369372
if stderr:
370373
raise SQLMeshError(f"Failed to submit a plan application request:\n{stderr}")
374+
375+
376+
def can_evaluate_before_promote(
377+
snapshot: Snapshot, snapshots: t.Dict[SnapshotId, Snapshot]
378+
) -> bool:
379+
return not snapshot.is_paused_forward_only and not any(
380+
snapshots[p_id].is_paused_forward_only for p_id in snapshot.parents
381+
)

sqlmesh/schedulers/airflow/plan.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from sqlmesh.core import scheduler
66
from sqlmesh.core.environment import Environment
7+
from sqlmesh.core.plan import can_evaluate_before_promote
78
from sqlmesh.core.snapshot import SnapshotTableInfo
89
from sqlmesh.core.state_sync import StateSync
910
from sqlmesh.schedulers.airflow import common
@@ -59,11 +60,11 @@ def create_plan_dag_spec(
5960

6061
backfill_intervals_per_snapshot = [
6162
common.BackfillIntervalsPerSnapshot(
62-
snapshot_id=snapshot.snapshot_id,
63+
snapshot_id=s.snapshot_id,
6364
intervals=intervals,
64-
before_promote=request.is_dev or not snapshot.is_paused_forward_only,
65+
before_promote=request.is_dev or can_evaluate_before_promote(s, all_snapshots),
6566
)
66-
for snapshot, intervals in backfill_batches.items()
67+
for s, intervals in backfill_batches.items()
6768
]
6869

6970
return common.PlanDagSpec(

tests/core/test_plan_evaluator.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33
from sqlglot import parse_one
44

55
from sqlmesh.core.context import Context
6-
from sqlmesh.core.model import FullKind, SqlModel, ViewKind
6+
from sqlmesh.core.model import FullKind, IncrementalByTimeRangeKind, SqlModel, ViewKind
77
from sqlmesh.core.plan import (
88
AirflowPlanEvaluator,
99
BuiltInPlanEvaluator,
1010
MWAAPlanEvaluator,
1111
Plan,
12+
can_evaluate_before_promote,
1213
)
1314
from sqlmesh.core.snapshot import SnapshotChangeCategory
15+
from sqlmesh.utils.date import now_timestamp
1416
from sqlmesh.utils.errors import SQLMeshError
1517

1618

@@ -162,3 +164,61 @@ def test_mwaa_evaluator_error_from_cli(sushi_plan: Plan, mocker: MockerFixture):
162164
evaluator.evaluate(sushi_plan)
163165

164166
mwaa_client_mock.set_variable.assert_called_once_with(mocker.ANY, plan_dag_spec_json)
167+
168+
169+
def test_can_evaluate_before_promote(sushi_context: Context):
170+
parent_model_a = SqlModel(
171+
name="sushi.new_test_model_a",
172+
kind=IncrementalByTimeRangeKind(time_column="ds"),
173+
cron="@daily",
174+
start="2020-01-01",
175+
query=parse_one("SELECT 1::INT AS one, '2023-01-01' as ds"),
176+
)
177+
parent_model_b = SqlModel(
178+
name="sushi.new_test_model_b",
179+
kind=IncrementalByTimeRangeKind(time_column="ds"),
180+
cron="@daily",
181+
start="2020-01-01",
182+
query=parse_one("SELECT 2::INT AS two, '2023-01-01' as ds"),
183+
)
184+
child_model = SqlModel(
185+
name="sushi.new_test_model_child",
186+
kind=FullKind(),
187+
start="2020-01-01",
188+
query=parse_one("SELECT one, two FROM sushi.new_test_model_a, sushi.new_test_model_b"),
189+
)
190+
191+
sushi_context.upsert_model(parent_model_a)
192+
sushi_context.upsert_model(parent_model_b)
193+
sushi_context.upsert_model(child_model)
194+
195+
snapshots = sushi_context.snapshots
196+
197+
parent_snapshot_a = snapshots[parent_model_a.name]
198+
parent_snapshot_b = snapshots[parent_model_b.name]
199+
child_snapshot = snapshots[child_model.name]
200+
201+
all_snapshots = {
202+
s.snapshot_id: s for s in [parent_snapshot_a, parent_snapshot_b, child_snapshot]
203+
}
204+
205+
parent_snapshot_a.change_category = SnapshotChangeCategory.BREAKING
206+
parent_snapshot_b.change_category = SnapshotChangeCategory.BREAKING
207+
child_snapshot.change_category = SnapshotChangeCategory.BREAKING
208+
assert can_evaluate_before_promote(child_snapshot, all_snapshots)
209+
210+
parent_snapshot_a.change_category = SnapshotChangeCategory.FORWARD_ONLY
211+
parent_snapshot_b.change_category = SnapshotChangeCategory.FORWARD_ONLY
212+
assert not can_evaluate_before_promote(child_snapshot, all_snapshots)
213+
214+
parent_snapshot_a.unpaused_ts = now_timestamp()
215+
assert not can_evaluate_before_promote(child_snapshot, all_snapshots)
216+
217+
parent_snapshot_b.unpaused_ts = now_timestamp()
218+
assert can_evaluate_before_promote(child_snapshot, all_snapshots)
219+
220+
child_snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
221+
assert not can_evaluate_before_promote(child_snapshot, all_snapshots)
222+
223+
child_snapshot.unpaused_ts = now_timestamp()
224+
assert can_evaluate_before_promote(child_snapshot, all_snapshots)

0 commit comments

Comments
 (0)