Skip to content

Guard 2 to 3+ migration against custom Dag bundle configs using startup-based initialization#63185

Open
jason810496 wants to merge 5 commits into
apache:mainfrom
jason810496:fix/migration/respect-dag-bundle-when-make-bundle-name-not-nullable
Open

Guard 2 to 3+ migration against custom Dag bundle configs using startup-based initialization#63185
jason810496 wants to merge 5 commits into
apache:mainfrom
jason810496:fix/migration/respect-dag-bundle-when-make-bundle-name-not-nullable

Conversation

@jason810496

@jason810496 jason810496 commented Mar 9, 2026

Copy link
Copy Markdown
Member

Why

Some users encounter a flaky error during 2.x → 3.1+ upgrades. If the deployment configures any bundle other than the default dags-folder, the 0082_3_1_0_make_bundle_name_not_nullable migration writes a single hard-coded bundle name that no longer matches the user's runtime config, so triggering a DagRun raises:

def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle:
"""
Get a DAG bundle by name.
:param name: The name of the DAG bundle.
:param version: The version of the DAG bundle you need (optional). If not provided, ``tracking_ref`` will be used instead.
:return: The DAG bundle.
"""
cfg_bundle = self._bundle_config.get(name)
if not cfg_bundle:
raise ValueError(f"Requested bundle '{name}' is not configured.")
return cfg_bundle.bundle_class(name=name, version=version, **cfg_bundle.kwargs)

Reproducible edge cases:

Case 1: Dag bundle name is not dags-folder

AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST='[{"name": "main", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {"path": "/files/dags", "refresh_interval": 1}}]'

Case 2: Multiple custom Dag bundles

AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST='[{"name": "main1", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {"path": "/files/dags", "refresh_interval": 1}}, {"name": "main2", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {"path": "/files/dags", "refresh_interval": 1}}]'

In both cases the migration inserts dags-folder rather than the configured bundle, leaving the Dag rows pointing at a bundle_name that the running deployment cannot resolve.

How

  • At DagFileProcessorManager startup, after sync_bundles_to_db flushes the latest bundle state, scan DagModel for legacy-candidate rows (NULL relative_fileloc and no DagVersion for that dag_id) and route them to the most-specific configured bundle whose absolute path contains the Dag's fileloc. relative_fileloc is written at the same time so fileloc-based stale-detection works later.
  • Rows whose fileloc does not lie under any configured bundle's path are left untouched rather than forced onto a default bundle. Writing bundle_name without a verified relative_fileloc would produce an active row that task workloads cannot execute (no dag_rel_path). Skipped rows then follow the normal staleness lifecycle: when the operator removed the bundle from config, sync_bundles_to_db flips it to active=False and the next deactivate_stale_dags scan marks the row is_stale=True. Once the operator restores a matching bundle, the next parser run resets is_stale and writes fresh bundle_name and relative_fileloc — no manual airflow dags reserialize required.
  • Safe under core.multi_team: a bundle path belongs to at most one team, so routing by the most-specific containing path cannot cross a team boundary. The same routing applies in single- and multi-team deployments.
  • Bundles that are configured but failed to persist (e.g. construction error during sync) are excluded from reassignment targets to avoid FK violations against dag_bundle.
  • The repair is chunked (_REASSIGN_BATCH_SIZE=1000) and each chunk runs in its own internally-owned transaction (no caller-provided session is committed), bounding the row-lock window. Each UPDATE re-asserts the legacy-candidate predicate as a compare-and-swap guard so a concurrent parser write that lands between SELECT and UPDATE wins the race.
  • Parser-loop stale-Dag scan now skips rows with relative_fileloc IS NULL so the standard fileloc-based deactivation does not crash on legacy rows the repair couldn't recover.

What

  • airflow-core/src/airflow/dag_processing/bundles/manager.py
    • DagBundlesManager.reassign_dags_with_unconfigured_bundles — chunked, CAS-guarded fileloc-aware reassignment that owns its own transactions.
    • DagBundlesManager._resolve_active_bundle_paths — returns configured-and-active bundle names (in config order) and {name: path} ordered by path length descending for longest-prefix matching.
    • DagBundlesManager.bundle_names — list of configured bundle names in config order.
    • New module-level _best_bundle_for_fileloc — short-circuits on the first prefix match given descending-by-length input; rejects parent-traversal results.
    • Explicit session.flush() after sync_bundles_to_db so subsequent reads see the freshly synced bundle rows under Airflow's autoflush=False sessions.
  • airflow-core/src/airflow/dag_processing/manager.py
    • Calls reassign_dags_with_unconfigured_bundles once at startup from sync_bundles (not on every bundle refresh).
    • Stale-Dag parser loop skips rows with NULL relative_fileloc to avoid Path(None) crashes on legacy rows.
  • Tests
    • airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py — parametrized coverage for custom names, multiple bundles, overlapping paths (deepest wins), fileloc outside any bundle (row skipped, not forced to default), missing fileloc, multi-team routing, relative_fileloc backfill, FK-safety when a configured bundle is missing from dag_bundle, concurrent-DFP startup, batching boundary, and the full sync → repair → stale-scan → re-parse lifecycle.
    • airflow-core/tests/unit/dag_processing/test_manager.py — asserts reassignment runs once at startup and not on bundle refresh, and that legacy rows with NULL relative_fileloc are still picked up by deactivate_deleted_dags.

Alternative considered: fix it in the DB migration (#63185, reverted)

The earlier approach (PR #63185, commit 450a08e78, reverted in eb7e928d9) tried to fix this inside the 0082_3_1_0_make_bundle_name_not_nullable Alembic migration itself — reading the user's bundle config from inside the migration and rewriting bundle_name per Dag during schema upgrade.

That approach was abandoned because:

  • Migrations should not import Airflow application code. Pulling DagBundlesManager / conf.getjson("dag_processor", "dag_bundle_config_list") into an Alembic revision couples schema upgrades to the live config and the bundle-loading code path, which can change shape between releases and break re-runs on older revisions.
  • The migration runs before bundles are constructible. The Dag processor hasn't started, bundles haven't been instantiated, and their path isn't safely resolvable in every deployment (e.g. remote/git bundles). Inferring the right target during schema upgrade is fragile.
  • It only handled the dags-folder case. The migration wrote a single hard-coded bundle name, so deployments with a custom bundle name or multiple bundles still ended up with the wrong bundle_name and the same runtime error (Flaky “Requested bundle is not configured” error during 2 to 3.1+ upgrade with custom Dag bundle names #63323).
  • Multi-team safety has to live next to the runtime config. Routing across team boundaries is a policy decision that belongs in the component that knows about teams, not in a schema migration.

Doing the reassignment at DagFileProcessorManager startup keeps the migration purely schema-level, lets us use the real bundle objects (get_bundle(name).path) for fileloc matching, and gives a single place to enforce FK correctness against dag_bundle.


Was generative AI tooling used to co-author this PR?

@jason810496 jason810496 self-assigned this Mar 10, 2026

@jason810496 jason810496 left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the two steps are:

Migration: set bundle_name = 'dags-folder' where NULL (satisfies NOT NULL) remains unchanged
sync_bundles_to_db(): reassign DAGs with unconfigured bundle_name to the actual configured bundle

So based on the offline discussion and the context in this thread, I will make the migration file as is, and harden the startup operation for Dag-Processor.

Since running airflow dags reserialize could also workaround this issue, so approach for me will be consolidate some common utils for both dag_reserialize CLI and DagBundlesManager.sync_bundles_to_db (or DagFileProcessorManager.sync_bundles )

def dag_reserialize(args, session: Session = NEW_SESSION) -> None:

@provide_session
def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
self.log.debug("Syncing DAG bundles to the database")

def sync_bundles(self) -> None:
"""Sync configured DAG bundles to the metadata database."""
DagBundlesManager().sync_bundles_to_db()

@jason810496 jason810496 marked this pull request as draft March 10, 2026 11:34
@jason810496 jason810496 changed the title Fix make_bundle_name_not_nullable by respecting user configured dag bundles Guard 2 to 3.1+ migration against custom Dag bundle configs using startup-based initialization Mar 11, 2026
@jason810496 jason810496 force-pushed the fix/migration/respect-dag-bundle-when-make-bundle-name-not-nullable branch from 27c0c04 to 8bf9575 Compare March 11, 2026 03:02
@jason810496 jason810496 marked this pull request as ready for review March 11, 2026 06:26
@feluelle feluelle removed the area:db-migrations PRs with DB migration label Mar 11, 2026
@kaxil kaxil requested a review from Copilot April 10, 2026 19:55

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to prevent a flaky “Requested bundle '{name}' is not configured.” error during 2.x → 3.1+ upgrades by reassigning DAGs that reference unconfigured bundle names to a configured fallback bundle at DagFileProcessorManager startup.

Changes:

  • Add DagBundlesManager.reassign_dags_with_unconfigured_bundles() to bulk-update DagModel.bundle_name to a fallback configured bundle.
  • Invoke the reassignment immediately after sync_bundles_to_db() during DagFileProcessorManager.sync_bundles() startup.
  • Add unit tests covering reassignment behavior and ensuring reassignment is not performed during bundle refresh.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.

File Description
airflow-core/src/airflow/dag_processing/manager.py Calls bundle DB sync + startup reassignment.
airflow-core/src/airflow/dag_processing/bundles/manager.py Implements the DB-level reassignment helper.
airflow-core/tests/unit/dag_processing/test_manager.py Verifies reassignment is called once at startup (not on refresh).
airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py Adds DB tests for reassignment logic and sync+reassign flow.

Comment thread airflow-core/src/airflow/dag_processing/bundles/manager.py Outdated
Comment thread airflow-core/src/airflow/dag_processing/bundles/manager.py
Comment thread airflow-core/src/airflow/dag_processing/bundles/manager.py Outdated
Comment thread airflow-core/src/airflow/dag_processing/bundles/manager.py Outdated
Comment thread airflow-core/src/airflow/dag_processing/bundles/manager.py
Comment thread airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py Outdated
Comment thread airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py Outdated
Comment thread airflow-core/tests/unit/dag_processing/test_manager.py Outdated
@jason810496 jason810496 marked this pull request as draft April 20, 2026 00:57
@jason810496 jason810496 force-pushed the fix/migration/respect-dag-bundle-when-make-bundle-name-not-nullable branch 2 times, most recently from 98e0435 to ea639dd Compare April 20, 2026 02:30
@jason810496 jason810496 marked this pull request as ready for review April 20, 2026 06:11
@jason810496 jason810496 added this to the Airflow 3.2.2 milestone Apr 20, 2026
@jason810496 jason810496 changed the title Guard 2 to 3.1+ migration against custom Dag bundle configs using startup-based initialization Guard 2 to 3+ migration against custom Dag bundle configs using startup-based initialization Apr 20, 2026
Comment thread airflow-core/src/airflow/dag_processing/bundles/manager.py Outdated
Comment thread airflow-core/src/airflow/dag_processing/bundles/manager.py Outdated
Comment thread airflow-core/src/airflow/dag_processing/bundles/manager.py
Comment thread airflow-core/src/airflow/models/dag.py Outdated
Comment thread airflow-core/src/airflow/dag_processing/bundles/manager.py Outdated
@jason810496 jason810496 marked this pull request as draft May 3, 2026 08:14
@jason810496 jason810496 force-pushed the fix/migration/respect-dag-bundle-when-make-bundle-name-not-nullable branch 6 times, most recently from 5e896f9 to 9129aca Compare May 27, 2026 11:28
@jason810496 jason810496 marked this pull request as ready for review May 27, 2026 11:28
@jason810496 jason810496 requested a review from ephraimbuddy June 3, 2026 12:06
@phanikumv phanikumv requested a review from uranusjr June 8, 2026 06:50
Comment thread airflow-core/src/airflow/dag_processing/bundles/manager.py Outdated
Comment thread airflow-core/src/airflow/dag_processing/manager.py Outdated
@jason810496 jason810496 marked this pull request as draft June 9, 2026 02:27
@jason810496 jason810496 force-pushed the fix/migration/respect-dag-bundle-when-make-bundle-name-not-nullable branch 2 times, most recently from 9c3acb8 to 9c1bd43 Compare June 10, 2026 02:57
@jason810496 jason810496 marked this pull request as ready for review June 10, 2026 06:23
The 0082_3_1_0_make_bundle_name_not_nullable migration writes a single
hard-coded ``bundle_name='dags-folder'`` on every legacy DagModel row,
so deployments whose runtime config uses any other bundle name (or
multiple custom bundles) cannot resolve the row at trigger time and
fail with ``Requested bundle 'dags-folder' is not configured.``
(see apache#63323).

Fixing this inside the Alembic migration is wrong: migrations must not
import application code, and the migration runs before bundles are
constructible, so the user's real bundle config is not available there.

Instead, at DagFileProcessorManager startup -- after sync_bundles_to_db
flushes the latest bundle state -- scan DagModel for legacy-candidate
rows (NULL relative_fileloc and no DagVersion for that dag_id) and
route each row to the most-specific configured bundle whose absolute
path contains the Dag's fileloc, writing relative_fileloc at the same
time so fileloc-based stale-detection works later. Rows whose fileloc
is not under any configured bundle's path are left untouched: writing
bundle_name without a verified relative_fileloc would produce an
active row task workers cannot execute. Skipped rows then self-heal
via the staleness lifecycle -- no manual ``airflow dags reserialize``
required.

Concurrency and edge-case hardening on the repair path:

* Fast-skip via ``EXISTS(DagVersion)`` -- DagVersion is written only
  by the parse path, which overwrites both bundle_name and
  relative_fileloc on every parse (DagModelOperation.update_dags), so
  once any 3.x parse has run the parse path is the source of truth
  and reassign has no work it would not do itself. PK-index probe vs.
  a sequential scan of ``dag`` (no index on relative_fileloc).
* Chunked UPDATEs (_REASSIGN_BATCH_SIZE=1000) ordered by dag_id, one
  internally-owned transaction per chunk via create_session(), so the
  row-lock window stays bounded and the repair never commits a
  caller-provided session. Per-row compare-and-swap WHERE clause
  re-asserts the legacy-candidate predicate on the UPDATE so a
  concurrent parser write wins the race.
* SELECT and UPDATE chunks run in separate sessions; per-row fileloc
  matching runs without a DB connection held.
* Parent-traversal guard in _best_bundle_for_fileloc lexically
  normalises both sides with os.path.normpath and rejects any
  relative result that is still absolute or contains ``..``, so a
  stored fileloc like ``/dags/foo/../../outside.py`` cannot escape a
  bundle root. Lexical only -- no symlink resolution.
* multi_team-safe because a bundle path belongs to at most one team.
* Stale-Dag scan skips rows with NULL relative_fileloc and emits one
  INFO line per cycle with the skip count, so operator-visible legacy
  rows that the repair could not route stay observable.

Tests cover custom bundle names, multiple bundles, overlapping paths
(deepest wins), unmatched fileloc (row skipped), missing fileloc,
FK-safety when a configured bundle is missing from dag_bundle, the
legacy relative_fileloc backfill path, concurrent-DFP startup,
chunk-boundary batching, and the full sync -> repair -> stale-scan
-> re-parse lifecycle.

closes: apache#63323
Normalize bundle paths once when building the active-bundle map so the
fileloc match uses plain Path.relative_to instead of mixing os.path with
pathlib per iteration, and explain why the lexical normpath is required.
Expand the cryptic stale-check comment to describe the legacy 2.x NULL
relative_fileloc case and link the tracking issue.
Match BaseDagImporter.get_relative_path instead of normalizing with
os.path.normpath. Filelocs come from the Dag processor parsing
admin-controlled bundle files, so they are trusted and need no
path-traversal defense, and using the same relative_to check means the
startup repair writes the same relative_fileloc the next parse computes.
@jason810496 jason810496 force-pushed the fix/migration/respect-dag-bundle-when-make-bundle-name-not-nullable branch from 9c1bd43 to 0ac8289 Compare June 22, 2026 06:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky “Requested bundle is not configured” error during 2 to 3.1+ upgrade with custom Dag bundle names

9 participants