Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions airflow-core/newsfragments/63185.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
Fix 2.x to 3.0+ upgrade failure when a custom Dag bundle is configured

The ``0082_3_1_0_make_bundle_name_not_nullable`` migration assigned every legacy row
``bundle_name='dags-folder'`` and left ``relative_fileloc`` NULL, so triggering a DagRun raised
``Requested bundle 'dags-folder' is not configured.`` on any deployment that uses a bundle other
than the default ``dags-folder``.

A one-shot repair now runs at ``DagFileProcessorManager`` startup, after ``sync_bundles_to_db``:

- Each legacy-candidate row (NULL ``relative_fileloc`` AND no ``DagVersion``) is routed to the
most-specific configured bundle whose absolute path contains the Dag's ``fileloc``; both
``bundle_name`` and ``relative_fileloc`` are written atomically. Rows whose ``fileloc`` is not
under any configured bundle are left untouched (writing ``bundle_name`` without a verified
``relative_fileloc`` would produce a row task workers cannot execute) and self-heal via the
existing staleness lifecycle once the operator adds a matching bundle: ``sync_bundles_to_db``
deactivates the orphan bundle, ``deactivate_stale_dags`` flips the row stale, and the next
successful parse resets everything through ``update_dag_parsing_results_in_db``. No manual
``airflow dags reserialize`` is required, though it can be used to force the parse path to
rewrite ``bundle_name`` and ``relative_fileloc`` immediately.
- A global ``DagVersion`` fast-skip short-circuits the repair on any deployment that has already
completed a 3.x parse cycle. ``DagVersion`` rows are written only by the parse path, which
overwrites both ``bundle_name`` and ``relative_fileloc`` on every parse, so once any row has a
``DagVersion`` the parse loop is doing the same work the repair would do (and the staleness
lifecycle handles rows whose files no longer match any configured bundle). The probe is a PK
hit on ``dag_version`` vs. a sequential scan of ``dag``.
- High-availability deployments running multiple Dag processors are safe: each chunked UPDATE
re-asserts the legacy-candidate predicate as a compare-and-swap guard, so whichever DFP commits
first becomes authoritative and any later UPDATE (from a second DFP or from a parser that
landed a fresh write between SELECT and UPDATE) matches zero rows and is a no-op. Each chunk
runs in its own internally-owned transaction to bound the row-lock window.
- Multi-team deployments are safe: a bundle path belongs to at most one team, so routing by the
most-specific containing path cannot cross a team boundary.
227 changes: 224 additions & 3 deletions airflow-core/src/airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import logging
import os
import warnings
from typing import TYPE_CHECKING
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, cast

from itsdangerous import URLSafeSerializer
from pydantic import BaseModel, ValidationError
from sqlalchemy import delete, select
from sqlalchemy import delete, exists, select, update

from airflow._shared.module_loading import import_string
from airflow.configuration import conf
Expand All @@ -34,17 +36,48 @@
from airflow.models.team import Team
from airflow.providers_manager import ProvidersManager
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.session import NEW_SESSION, create_session, provide_session

if TYPE_CHECKING:
from collections.abc import Iterable

from sqlalchemy.engine import CursorResult
from sqlalchemy.orm import Session

log = logging.getLogger(__name__)

_example_dag_bundle_name = "example_dags"

# Chunk size for the one-time startup repair of unconfigured bundles.
_REASSIGN_BATCH_SIZE = 1000


def _best_bundle_for_fileloc(
fileloc: str, descending_bundle_paths: dict[str, Path]
) -> tuple[str, str] | None:
"""
Return ``(bundle_name, relative_fileloc)`` for the first bundle whose path contains ``fileloc``.

``descending_bundle_paths`` must be sorted by path length descending so
the deepest bundle wins when paths overlap.

Returns ``None`` when ``fileloc`` is not under any bundle's path.

Uses the same plain ``Path.relative_to`` check as
``BaseDagImporter.get_relative_path``, so the ``relative_fileloc`` written
here matches what the next parse computes for the same file. Filelocs are
produced by the Dag processor parsing admin-controlled bundle files, so they
are trusted and need no path-traversal normalization.
"""
file_path = Path(fileloc)
for name, path in descending_bundle_paths.items():
try:
relative = file_path.relative_to(path)
except ValueError:
continue
return name, str(relative)
return None


class _ExternalBundleConfig(BaseModel):
"""Schema defining the user-specified configuration for a DAG bundle."""
Expand Down Expand Up @@ -366,6 +399,194 @@ def _extract_and_sign_template(bundle_name: str) -> tuple[str | None, dict]:
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name == name))
self.log.info("Deleted import errors for bundle %s which is no longer configured", name)

# Airflow sets autoflush=False, so subsequent reads in the same session
# need an explicit flush to see ORM-side bundle state changes.
session.flush()
Comment thread
jason810496 marked this conversation as resolved.

def reassign_dags_with_unconfigured_bundles(self) -> int:
"""
Reassign Dags pointing at unconfigured bundles to a configured one.

Side effect of the ``0082_3_1_0_make_bundle_name_not_nullable``
migration (#63323): legacy rows get ``bundle_name='dags-folder'``
and NULL ``relative_fileloc``, which raises ``Requested bundle
'{name}' is not configured.`` at trigger time when the deployment
uses a custom bundle.

Each legacy-candidate row (NULL ``relative_fileloc`` and no
``DagVersion``) is routed to the most-specific configured bundle
whose path contains its ``fileloc``, writing ``relative_fileloc``
atomically. Rows whose ``fileloc`` is under no configured bundle
are left untouched -- writing ``bundle_name`` without a verified
``relative_fileloc`` would produce a row task workers cannot
execute -- and instead self-heal via the normal staleness
lifecycle: ``sync_bundles_to_db`` deactivates the old bundle, the
stale-scan marks the row stale, and the next successful parse
from any configured bundle resets everything via
``update_dag_parsing_results_in_db``. No manual ``airflow dags
reserialize`` is required.

Multi-team-safe because a bundle path belongs to at most one team.
Each chunk runs in its own internally-owned transaction so the
row-lock window stays bounded and no caller-provided session is
committed.

:return: Number of Dags reassigned.
"""
# Import here to avoid circular import
# (manager -> dag -> dagrun -> taskinstance -> dag_version -> manager)
from airflow.models.dag import DagModel
from airflow.models.dag_version import DagVersion

Comment thread
jason810496 marked this conversation as resolved.
# Fast-skip once any 3.x parse cycle has run. DagVersion is written
# only by the parse path, and that path overwrites both bundle_name
# and relative_fileloc on every parse (see
# ``DagModelOperation.update_dags``), so any legacy row whose file
# is under a configured bundle self-heals at the next parse and any
# row whose file is under no configured bundle self-heals via the
# staleness lifecycle -- reassign has no work the parse path will
# not do itself. The probe is an index hit on dag_version's PK
# vs. a sequential scan of dag (no index on relative_fileloc).
with create_session() as session:
if session.scalar(select(DagVersion.id).limit(1)) is not None:
return 0

with create_session() as session:
if not (active_bundle_paths := self._resolve_active_bundle_paths(session=session)):
self.log.info(
"No active Dag bundles with resolvable paths; skipping reassignment of Dags "
"with unconfigured bundles."
)
return 0

# Chunked UPDATEs ordered by dag_id, one transaction per chunk; repaired
# rows drop out of the predicate because writing relative_fileloc
# makes the IS NULL clause false.
#
# Legacy-candidate predicate (rows never parsed in 3.x): NULL
# relative_fileloc (the 0082 migration leaves it NULL) AND NOT EXISTS
# DagVersion (the parse path writes DagModel.bundle_name before the
# DagVersion). Equivalent under that invariant; both stated as
# defense in depth and repeated on the UPDATE itself as a CAS guard
# so a concurrent parser write wins the race.
movements: dict[tuple[str | None, str], int] = defaultdict(int)
total_reassigned = 0
total_backfilled = 0
total_skipped = 0
last_dag_id: str | None = None

while True:
with create_session() as session:
query = (
select(DagModel.dag_id, DagModel.bundle_name, DagModel.fileloc)
.where(
DagModel.relative_fileloc.is_(None),
~exists().where(DagVersion.dag_id == DagModel.dag_id),
)
.order_by(DagModel.dag_id)
.limit(_REASSIGN_BATCH_SIZE)
)
if last_dag_id is not None:
query = query.where(DagModel.dag_id > last_dag_id)

if not (chunk := session.execute(query).all()):
break
last_dag_id = chunk[-1].dag_id

# Route every legacy row by fileloc, not just those on
# unconfigured bundles, so a migration-assigned dags-folder
# row whose file lives under a different configured bundle
# gets relocated instead of stranded. Classify as skip
# (no match), backfill (match == current bundle), or
# reassign (match != current bundle).
chunk_updates: list[tuple[str, str | None, str, str]] = []
for row in chunk:
match = _best_bundle_for_fileloc(row.fileloc, active_bundle_paths) if row.fileloc else None
if match is None:
total_skipped += 1
continue
target, relative = match
chunk_updates.append((row.dag_id, row.bundle_name, target, relative))

if not chunk_updates:
continue

with create_session() as session:
# create_session commits on context exit, bounding the
# row-lock window to one chunk.
for dag_id, prev_bundle, target, relative in chunk_updates:
result = cast(
"CursorResult",
session.execute(
update(DagModel)
.where(
DagModel.dag_id == dag_id,
DagModel.relative_fileloc.is_(None),
~exists().where(DagVersion.dag_id == DagModel.dag_id),
)
.values(relative_fileloc=relative, bundle_name=target)
.execution_options(synchronize_session=False)
),
)

if result.rowcount:
# Rowcount is the source of truth for whether the CAS actually fired
if target == prev_bundle:
total_backfilled += 1
else:
movements[(prev_bundle, target)] += 1
total_reassigned += 1
else:
self.log.debug("Skipping repair for Dag '%s': lost race to parser.", dag_id)

for (prev, target), n in sorted(movements.items(), key=lambda item: (str(item[0][0]), item[0][1])):
self.log.info(
"Reassigning %d Dag(s) from unconfigured bundle '%s' to '%s'",
n,
prev,
target,
)

if total_backfilled:
self.log.info("Backfilled relative_fileloc for %d legacy Dag(s).", total_backfilled)

if total_skipped:
self.log.warning(
"Skipped %d legacy Dag(s) whose fileloc is not under any configured bundle; "
"triggering them will keep raising \"Requested bundle '{name}' is not configured.\" "
"until a bundle whose path contains the fileloc is added to "
"dag_bundle_config_list. The next parse will then restore them automatically, "
"or run `airflow dags reserialize` to force the parse path to rewrite "
"bundle_name and relative_fileloc immediately.",
total_skipped,
)

return total_reassigned

def _resolve_active_bundle_paths(self, *, session: Session) -> dict[str, Path]:
"""
Return paths for configured-and-active bundles.

A bundle is "configured-and-active" when it is both in the manager's
config and persisted as ``active=True`` in ``dag_bundle`` -- bundles
missing from ``dag_bundle`` are excluded so they can't trigger an FK
violation if used as a reassignment target.

The returned dict is ``{name: path}`` ordered by path length descending
so the most specific bundle wins in ``_best_bundle_for_fileloc``.
"""
active_db_names = set(
session.scalars(select(DagBundleModel.name).where(DagBundleModel.active.is_(True)))
)

active_bundle_paths: dict[str, Path] = {}
for bundle in self.get_all_dag_bundles():
if bundle.name not in active_db_names:
continue
active_bundle_paths[bundle.name] = bundle.path

return dict(sorted(active_bundle_paths.items(), key=lambda item: len(str(item[1])), reverse=True))

@staticmethod
def _extract_template_params(bundle_instance: BaseDagBundle) -> dict:
"""
Expand Down
24 changes: 23 additions & 1 deletion airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ def _exit_gracefully(self, signum, frame):

def sync_bundles(self) -> None:
"""Sync configured DAG bundles to the metadata database."""
DagBundlesManager().sync_bundles_to_db()
dag_bundle_manager = DagBundlesManager()
dag_bundle_manager.sync_bundles_to_db()
dag_bundle_manager.reassign_dags_with_unconfigured_bundles()

def get_all_bundles(self) -> list[BaseDagBundle]:
"""Return configured DAG bundles filtered by ``bundle_names_to_parse`` if provided."""
Expand Down Expand Up @@ -449,6 +451,7 @@ def deactivate_stale_dags(
).where(~DagModel.is_stale)
dags_parsed = session.execute(query)

stuck_legacy_rows = 0
for dag in dags_parsed:
# Dags whose bundle has been removed from config (bundle no longer active) are stale —
# the processor has stopped parsing their files, so the time-based check below would never fire.
Expand All @@ -460,6 +463,16 @@ def deactivate_stale_dags(
)
to_deactivate.add(dag.dag_id)
continue
# A Dag upgraded from Airflow 2.x can still have a NULL relative_fileloc:
# the 0082 migration adds the column as nullable, and the startup repair
# in DagBundlesManager only backfills it when the Dag's fileloc resolves to
# a configured bundle. Rows whose fileloc matches no bundle stay NULL, so
# the time-based stale check below would build Path(None) and crash. Skip
# them here and count them so the total is surfaced after the loop.
# See https://github.com/apache/airflow/issues/63323.
if dag.relative_fileloc is None:
stuck_legacy_rows += 1
continue
# When the Dag's last_parsed_time is more than the stale_dag_threshold older than the
# Dag file's last_finish_time, the Dag is considered stale as has apparently been removed from the file,
# This is especially relevant for Dag files that generate Dags in a dynamic manner.
Expand Down Expand Up @@ -496,6 +509,15 @@ def deactivate_stale_dags(
else:
raise

if stuck_legacy_rows:
# Surface how many legacy rows the startup repair could not route;
# each one keeps raising "Requested bundle is not configured." until
# a matching bundle is added to dag_bundle_config_list.
self.log.info(
"Skipped stale check for %d legacy Dag(s) with NULL relative_fileloc.",
stuck_legacy_rows,
)

def _run_parsing_loop(self):
# initialize cache to mutualize calls to Variable.get in DAGs
# needs to be done before this process is forked to create the DAG parsing processes.
Expand Down
Loading
Loading