Skip to content

Commit 9790182

Browse files
authored
Fix: Make sure the plan application DAG fails when one of its tasks fails (#2750)
1 parent 283cf74 commit 9790182

1 file changed

Lines changed: 4 additions & 13 deletions

File tree

sqlmesh/schedulers/airflow/dag_generator.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ def _create_plan_application_dag(self, plan_dag_spec: common.PlanDagSpec) -> DAG
257257

258258
finalize_task = self._create_finalize_task(plan_dag_spec.environment)
259259
before_finalize_task >> finalize_task
260+
finalize_task >> end_task
260261

261262
on_plan_apply_end_task = PythonOperator(
262263
task_id="on_plan_apply_end",
@@ -266,19 +267,15 @@ def _create_plan_application_dag(self, plan_dag_spec: common.PlanDagSpec) -> DAG
266267
)
267268
finalize_task >> on_plan_apply_end_task
268269

269-
self._add_notification_target_tasks(
270-
plan_dag_spec, start_task, end_task, on_plan_apply_end_task
271-
)
270+
self._add_notification_target_tasks(plan_dag_spec, start_task, finalize_task)
272271
return dag
273272

274273
def _add_notification_target_tasks(
275274
self,
276275
request: common.PlanDagSpec,
277276
start_task: BaseOperator,
278277
end_task: BaseOperator,
279-
previous_end_task: BaseOperator,
280278
) -> None:
281-
has_success_or_failed_notification = False
282279
for notification_target in request.notification_targets:
283280
notification_operator_provider = NOTIFICATION_TARGET_TO_OPERATOR_PROVIDER.get(
284281
type(notification_target)
@@ -297,15 +294,9 @@ def _add_notification_target_tasks(
297294
if plan_start_notification_task:
298295
start_task >> plan_start_notification_task
299296
if plan_success_notification_task:
300-
has_success_or_failed_notification = True
301-
previous_end_task >> plan_success_notification_task
302-
plan_success_notification_task >> end_task
297+
end_task >> plan_success_notification_task
303298
if plan_failed_notification_task:
304-
has_success_or_failed_notification = True
305-
previous_end_task >> plan_failed_notification_task
306-
plan_failed_notification_task >> end_task
307-
if not has_success_or_failed_notification:
308-
previous_end_task >> end_task
299+
end_task >> plan_failed_notification_task
309300

310301
def _create_creation_tasks(
311302
self,

0 commit comments

Comments
 (0)