From fbecdef9cf7a83c83e71d784bd377c36c5a344af Mon Sep 17 00:00:00 2001 From: Henry Chen Date: Wed, 3 Jun 2026 01:22:31 +0800 Subject: [PATCH] Refactor latest DagRun load options for consistency --- .../src/airflow/dag_processing/collection.py | 40 +++++++------------ 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index 7cdf490d93877..04c84094b6f8f 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -104,6 +104,18 @@ def _create_orm_dags( yield orm_dag +def _build_latest_run_load_options() -> Any: + return load_only( + DagRun.dag_id, + DagRun.logical_date, + DagRun.run_after, + DagRun.data_interval_start, + DagRun.data_interval_end, + DagRun.partition_key, + DagRun.partition_date, + ) + + def _get_latest_runs_stmt(dag_id: str) -> Select: """Build a select statement to retrieve the last automated run for each dag.""" max_logical_date = ( @@ -125,17 +137,7 @@ def _get_latest_runs_stmt(dag_id: str) -> Select: DagRun.dag_id == dag_id, DagRun.logical_date == max_logical_date, ) - .options( - load_only( - DagRun.dag_id, - DagRun.logical_date, - DagRun.run_after, - DagRun.data_interval_start, - DagRun.data_interval_end, - DagRun.partition_key, - DagRun.partition_date, - ) - ) + .options(_build_latest_run_load_options()) ) @@ -161,21 +163,7 @@ def _get_latest_runs_stmt_partitioned(dag_id: str) -> Select: .limit(1) .scalar_subquery() ) - return ( - select(DagRun) - .where(DagRun.id == latest_run_id) - .options( - load_only( - DagRun.dag_id, - DagRun.logical_date, - DagRun.run_after, - DagRun.data_interval_start, - DagRun.data_interval_end, - DagRun.partition_key, - DagRun.partition_date, - ) - ) - ) + return select(DagRun).where(DagRun.id == latest_run_id).options(_build_latest_run_load_options()) class _RunInfo(NamedTuple):