Skip to content

Commit 0537bc3

Browse files
authored
fix: execution_time was not getting passed in from plan (#2573)
1 parent 375cae1 commit 0537bc3

File tree

12 files changed

+50
-12
lines changed

12 files changed

+50
-12
lines changed

sqlmesh/core/plan/evaluator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ def _backfill(
151151
plan.environment_naming_info,
152152
plan.start,
153153
plan.end,
154+
execution_time=plan.execution_time,
154155
restatements=plan.restatements,
155156
selected_snapshots=selected_snapshots,
156157
deployability_index=deployability_index,
@@ -278,7 +279,6 @@ def _restate(self, plan: Plan) -> None:
278279
(plan.context_diff.snapshots[s_id], interval)
279280
for s_id, interval in plan.restatements.items()
280281
],
281-
plan.execution_time,
282282
remove_shared_versions=not plan.is_dev,
283283
)
284284

@@ -373,6 +373,7 @@ def _apply_plan(self, plan: Plan, plan_request_id: str) -> None:
373373
for change_source, snapshots in plan.indirectly_modified.items()
374374
},
375375
removed_snapshots=list(plan.context_diff.removed_snapshots),
376+
execution_time=plan.execution_time,
376377
)
377378
plan_dag_spec = create_plan_dag_spec(plan_application_request, self.state_sync)
378379
PlanDagState.from_state_sync(self.state_sync).add_dag_spec(plan_dag_spec)
@@ -449,6 +450,7 @@ def _apply_plan(self, plan: Plan, plan_request_id: str) -> None:
449450
for change_source, snapshots in plan.indirectly_modified.items()
450451
},
451452
removed_snapshots=list(plan.context_diff.removed_snapshots),
453+
execution_time=plan.execution_time,
452454
)
453455

454456

sqlmesh/core/snapshot/definition.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -656,14 +656,11 @@ def add_interval(self, start: TimeLike, end: TimeLike, is_dev: bool = False) ->
656656
else:
657657
self.intervals = merged_intervals
658658

659-
def remove_interval(
660-
self, interval: Interval, execution_time: t.Optional[TimeLike] = None
661-
) -> None:
659+
def remove_interval(self, interval: Interval) -> None:
662660
"""Remove an interval from the snapshot.
663661
664662
Args:
665663
interval: The interval to remove.
666-
execution_time: The date/time time reference to use for execution time. Defaults to now.
667664
"""
668665
self.intervals = remove_interval(self.intervals, *interval)
669666
self.dev_intervals = remove_interval(self.dev_intervals, *interval)
@@ -1504,7 +1501,7 @@ def missing_intervals(
15041501
snapshot_start_date, snapshot_end_date = (to_datetime(i) for i in interval)
15051502
snapshot = snapshot.copy()
15061503
snapshot.intervals = snapshot.intervals.copy()
1507-
snapshot.remove_interval(interval, execution_time)
1504+
snapshot.remove_interval(interval)
15081505

15091506
missing_interval_end_date = snapshot_end_date
15101507
node_end_date = snapshot.node.end

sqlmesh/core/state_sync/base.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,6 @@ def invalidate_environment(self, name: str) -> None:
307307
def remove_interval(
308308
self,
309309
snapshot_intervals: t.Sequence[t.Tuple[SnapshotInfoLike, Interval]],
310-
execution_time: t.Optional[TimeLike] = None,
311310
remove_shared_versions: bool = False,
312311
) -> None:
313312
"""Remove an interval from a list of snapshots and sync it to the store.

sqlmesh/core/state_sync/cache.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,11 @@ def _add_snapshot_intervals(self, snapshot_intervals: SnapshotIntervals) -> None
136136
def remove_interval(
137137
self,
138138
snapshot_intervals: t.Sequence[t.Tuple[SnapshotInfoLike, Interval]],
139-
execution_time: t.Optional[TimeLike] = None,
140139
remove_shared_versions: bool = False,
141140
) -> None:
142141
for s, _ in snapshot_intervals:
143142
self.snapshot_cache.pop(s.snapshot_id, None)
144-
self.state_sync.remove_interval(snapshot_intervals, execution_time, remove_shared_versions)
143+
self.state_sync.remove_interval(snapshot_intervals, remove_shared_versions)
145144

146145
def unpause_snapshots(
147146
self, snapshots: t.Collection[SnapshotInfoLike], unpaused_dt: TimeLike

sqlmesh/core/state_sync/engine_adapter.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,6 @@ def remove_partial_intervals(
673673
def remove_interval(
674674
self,
675675
snapshot_intervals: t.Sequence[t.Tuple[SnapshotInfoLike, Interval]],
676-
execution_time: t.Optional[TimeLike] = None,
677676
remove_shared_versions: bool = False,
678677
) -> None:
679678
intervals_to_remove: t.Sequence[

sqlmesh/schedulers/airflow/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from sqlmesh.core.user import User
1818
from sqlmesh.schedulers.airflow import common
1919
from sqlmesh.utils import unique
20+
from sqlmesh.utils.date import TimeLike
2021
from sqlmesh.utils.errors import (
2122
ApiClientError,
2223
ApiServerError,
@@ -201,6 +202,7 @@ def apply_plan(
201202
directly_modified_snapshots: t.Optional[t.List[SnapshotId]] = None,
202203
indirectly_modified_snapshots: t.Optional[t.Dict[str, t.List[SnapshotId]]] = None,
203204
removed_snapshots: t.Optional[t.List[SnapshotId]] = None,
205+
execution_time: t.Optional[TimeLike] = None,
204206
) -> None:
205207
request = common.PlanApplicationRequest(
206208
new_snapshots=list(new_snapshots),
@@ -221,6 +223,7 @@ def apply_plan(
221223
directly_modified_snapshots=directly_modified_snapshots or [],
222224
indirectly_modified_snapshots=indirectly_modified_snapshots or {},
223225
removed_snapshots=removed_snapshots or [],
226+
execution_time=execution_time,
224227
)
225228

226229
response = self._session.post(

sqlmesh/schedulers/airflow/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class PlanApplicationRequest(PydanticModel):
5555
directly_modified_snapshots: t.List[SnapshotId]
5656
indirectly_modified_snapshots: t.Dict[str, t.List[SnapshotId]]
5757
removed_snapshots: t.List[SnapshotId]
58+
execution_time: t.Optional[TimeLike] = None
5859

5960
def is_selected_for_backfill(self, model_fqn: str) -> bool:
6061
return self.models_to_backfill is None or model_fqn in self.models_to_backfill
@@ -89,6 +90,7 @@ class PlanDagSpec(PydanticModel):
8990
directly_modified_snapshots: t.Optional[t.List[SnapshotId]] = None
9091
indirectly_modified_snapshots: t.Optional[t.Dict[str, t.List[SnapshotId]]] = None
9192
removed_snapshots: t.Optional[t.List[SnapshotId]] = None
93+
execution_time: t.Optional[TimeLike] = None
9294

9395

9496
class EnvironmentsResponse(PydanticModel):

sqlmesh/schedulers/airflow/dag_generator.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ def _create_plan_application_dag(self, plan_dag_spec: common.PlanDagSpec) -> DAG
207207
plan_dag_spec.deployability_index,
208208
plan_dag_spec.environment.plan_id,
209209
"before_promote",
210+
plan_dag_spec.execution_time,
210211
)
211212

212213
(
@@ -218,6 +219,7 @@ def _create_plan_application_dag(self, plan_dag_spec: common.PlanDagSpec) -> DAG
218219
plan_dag_spec.deployability_index,
219220
plan_dag_spec.environment.plan_id,
220221
"after_promote",
222+
plan_dag_spec.execution_time,
221223
)
222224

223225
(
@@ -425,6 +427,7 @@ def _create_backfill_tasks(
425427
deployability_index: DeployabilityIndex,
426428
plan_id: str,
427429
task_id_suffix: str,
430+
execution_time: t.Optional[TimeLike],
428431
) -> t.Tuple[BaseOperator, BaseOperator]:
429432
snapshot_to_tasks = {}
430433
for intervals_per_snapshot in backfill_intervals:
@@ -456,6 +459,7 @@ def _create_backfill_tasks(
456459
end=end,
457460
deployability_index=deployability_index,
458461
plan_id=plan_id,
462+
execution_time=execution_time,
459463
)
460464
external_sensor_task = self._create_hwm_external_sensor(
461465
snapshot, start=start, end=end
@@ -593,6 +597,7 @@ def _create_snapshot_evaluation_operator(
593597
task_id: str,
594598
start: t.Optional[TimeLike] = None,
595599
end: t.Optional[TimeLike] = None,
600+
execution_time: t.Optional[TimeLike] = None,
596601
deployability_index: t.Optional[DeployabilityIndex] = None,
597602
plan_id: t.Optional[str] = None,
598603
) -> BaseOperator:
@@ -607,6 +612,7 @@ def _create_snapshot_evaluation_operator(
607612
end=end,
608613
deployability_index=deployability_index or DeployabilityIndex.all_deployable(),
609614
plan_id=plan_id,
615+
execution_time=execution_time,
610616
),
611617
task_id=task_id,
612618
)

sqlmesh/schedulers/airflow/plan.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def create_plan_dag_spec(
130130
[s for s in all_snapshots.values() if request.is_selected_for_backfill(s.name)],
131131
start=request.environment.start_at,
132132
end=end,
133-
execution_time=now(),
133+
execution_time=request.execution_time or now(),
134134
deployability_index=deployability_index_for_evaluation,
135135
restatements=restatements,
136136
end_bounded=request.end_bounded,
@@ -182,6 +182,7 @@ def create_plan_dag_spec(
182182
directly_modified_snapshots=request.directly_modified_snapshots,
183183
indirectly_modified_snapshots=request.indirectly_modified_snapshots,
184184
removed_snapshots=request.removed_snapshots,
185+
execution_time=request.execution_time,
185186
)
186187

187188

sqlmesh/schedulers/airflow/state_sync.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,6 @@ def _add_snapshot_intervals(self, snapshot_intervals: SnapshotIntervals) -> None
225225
def remove_interval(
226226
self,
227227
snapshot_intervals: t.Sequence[t.Tuple[SnapshotInfoLike, Interval]],
228-
execution_time: t.Optional[TimeLike] = None,
229228
remove_shared_versions: bool = False,
230229
) -> None:
231230
"""Remove an interval from a list of snapshots and sync it to the store.

0 commit comments

Comments
 (0)