Skip to content

Commit 44d7697

Browse files
authored
Fix: In Airflow delete orphaned snapshot DAGs when new versions supersede the old ones (#1918)
1 parent 225e779 commit 44d7697

1 file changed

Lines changed: 17 additions & 7 deletions

File tree

sqlmesh/schedulers/airflow/integration.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ def dags(self) -> t.List[DAG]:
135135
cadence_dags = (
136136
dag_generator.generate_cadence_dags(prod_env.snapshots) if prod_env else []
137137
)
138+
_delete_orphaned_snapshot_dags({d.dag_id for d in cadence_dags})
138139
else:
139140
cadence_dags = []
140141

@@ -214,13 +215,12 @@ def _janitor_task(
214215
session=session,
215216
)
216217

217-
all_snapshot_dag_ids = set(util.get_snapshot_dag_ids())
218-
active_snapshot_dag_ids = {
219-
common.dag_id_for_snapshot_info(s) for s in state_sync.get_snapshots(None).values()
220-
}
221-
expired_snapshot_dag_ids = all_snapshot_dag_ids - active_snapshot_dag_ids
222-
logger.info("Deleting expired Snapshot DAGs: %s", expired_snapshot_dag_ids)
223-
util.delete_dags(expired_snapshot_dag_ids, session=session)
218+
prod_env = state_sync.get_environment(c.PROD)
219+
if prod_env:
220+
active_snapshot_dag_ids = {
221+
common.dag_id_for_snapshot_info(s) for s in prod_env.snapshots
222+
}
223+
_delete_orphaned_snapshot_dags(active_snapshot_dag_ids, session=session)
224224

225225
plan_application_dag_ids = util.get_finished_plan_application_dag_ids(
226226
ttl=plan_application_dag_ttl, session=session
@@ -237,6 +237,16 @@ def _janitor_task(
237237
state_sync.compact_intervals()
238238

239239

240+
@provide_session
241+
def _delete_orphaned_snapshot_dags(
242+
active_snapshot_dag_ids: t.Set[str], session: Session = util.PROVIDED_SESSION
243+
) -> None:
244+
all_snapshot_dag_ids = set(util.get_snapshot_dag_ids(session=session))
245+
orphaned_snapshot_dag_ids = all_snapshot_dag_ids - active_snapshot_dag_ids
246+
logger.info("Deleting orphaned Snapshot DAGs: %s", orphaned_snapshot_dag_ids)
247+
util.delete_dags(orphaned_snapshot_dag_ids, session=session)
248+
249+
240250
@provide_session
241251
def _get_plan_dag_specs_from_variables(
242252
session: Session = util.PROVIDED_SESSION,

0 commit comments

Comments
 (0)