diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index 91a60cb776f63..cae9d3a8339d4 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -228,6 +228,7 @@ class TriggerDAGRunPostBody(StrictBaseModel): conf: dict | None = Field(default_factory=dict) note: str | None = None partition_key: str | None = None + bundle_version: str | None = None @model_validator(mode="after") def check_data_intervals(self): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index ea6f0a70fcd0a..87be530a9018e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -15108,6 +15108,11 @@ components: - type: string - type: 'null' title: Partition Key + bundle_version: + anyOf: + - type: string + - type: 'null' + title: Bundle Version additionalProperties: false type: object title: MaterializeAssetBody @@ -16641,6 +16646,11 @@ components: - type: string - type: 'null' title: Partition Key + bundle_version: + anyOf: + - type: string + - type: 'null' + title: Bundle Version additionalProperties: false type: object required: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py index 15af36445dd95..831fd23951936 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py @@ -73,7 +73,7 @@ from airflow.api_fastapi.logging.decorators import action_logging from airflow.assets.manager import asset_manager from airflow.configuration import conf -from airflow.exceptions import ParamValidationError +from airflow.exceptions import DagVersionNotFound, ParamValidationError from airflow.models.asset import ( AssetAliasModel, AssetDagRunQueue, @@ -451,8 +451,10 @@ def materialize_asset( f"Dag with dag_id: '{dag_id}' does not allow asset materialization runs", ) + resolved_body = body or MaterializeAssetBody() + try: - params = (body or MaterializeAssetBody()).validate_context(dag) + params = resolved_body.validate_context(dag) return dag.create_dagrun( run_id=params["run_id"], logical_date=params["logical_date"], @@ -466,9 +468,12 @@ def materialize_asset( partition_key=params["partition_key"], note=params["note"], session=session, + bundle_version=resolved_body.bundle_version, ) except (ParamValidationError, ValueError) as e: raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e + except DagVersionNotFound as e: + raise HTTPException(status.HTTP_404_NOT_FOUND, str(e)) from e @assets_router.get( diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 889251bf81e99..3179d70aa38dd 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -108,7 +108,7 @@ perform_clear_dag_run, ) from airflow.api_fastapi.logging.decorators import action_logging -from airflow.exceptions import ParamValidationError +from airflow.exceptions import AirflowBadRequest, DagVersionNotFound, ParamValidationError from airflow.models import DagModel, DagRun from airflow.models.asset import AssetEvent from airflow.models.dag_version import DagVersion @@ -702,9 +702,10 @@ def trigger_dag_run( else: triggered_by = DagRunTriggeredByType.REST_API - dag = get_latest_version_of_dag(dag_bag, dag_id, session) try: + dag = get_latest_version_of_dag(dag_bag, dag_id, session) params = body.validate_context(dag) + dag_run = dag.create_dagrun( run_id=params["run_id"], logical_date=params["logical_date"], @@ -716,17 +717,23 @@ def trigger_dag_run( triggering_user_name=user.get_name(), state=DagRunState.QUEUED, partition_key=params["partition_key"], + bundle_version=body.bundle_version, partition_date=params["partition_date"], session=session, ) + + dag_run_note = body.note + if dag_run_note: + current_user_id = user.get_id() + dag_run.note = (dag_run_note, current_user_id) + return dag_run + except (ParamValidationError, ValueError) as e: raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e - - dag_run_note = body.note - if dag_run_note: - current_user_id = user.get_id() - dag_run.note = (dag_run_note, current_user_id) - return dag_run + except DagVersionNotFound as e: + raise HTTPException(status.HTTP_404_NOT_FOUND, str(e)) from e + except AirflowBadRequest as e: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e @dag_run_router.get( diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index dc7035d31e3c9..da4696d37a94a 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -60,6 +60,9 @@ AddConnectionTestEndpoint, AddAwaitingInputStatePayload, AddTaskInstanceQueueField, + ), + Version( + "2026-06-16", AddRetryPolicyFields, AddTeamNameField, AddTaskAndAssetStateStoreEndpoints, diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py index e89e2ed04cc5d..5c3df074992ba 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py @@ -17,13 +17,7 @@ from __future__ import annotations -from cadwyn import ( - ResponseInfo, - VersionChange, - convert_response_to_previous_version_for, - endpoint, - schema, -) +from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, endpoint, schema from airflow.api_fastapi.execution_api.datamodels.taskinstance import ( DagRun, diff --git a/airflow-core/src/airflow/exceptions.py b/airflow-core/src/airflow/exceptions.py index addd4f8f2a547..cb45b050fce34 100644 --- a/airflow-core/src/airflow/exceptions.py +++ b/airflow-core/src/airflow/exceptions.py @@ -141,6 +141,10 @@ class DagRunNotFound(AirflowNotFoundException): """Raise when a DAG Run is not available in the system.""" +class DagVersionNotFound(AirflowNotFoundException): + """Raised when a DagVersion for the given dag_id / bundle_version is not found.""" + + class DagNotPartitionedError(ValueError): """Raise when a partition_key is supplied for a Dag that is not partitioned.""" diff --git a/airflow-core/src/airflow/models/dag_version.py b/airflow-core/src/airflow/models/dag_version.py index e6564a6da0668..db5fdc103224b 100644 --- a/airflow-core/src/airflow/models/dag_version.py +++ b/airflow-core/src/airflow/models/dag_version.py @@ -154,6 +154,7 @@ def _latest_version_select( bundle_version: str | None = None, load_dag_model: bool = False, load_bundle_model: bool = False, + load_serialized_dag: bool = False, ) -> Select: """ Get the select object to get the latest version of the DAG. @@ -171,6 +172,9 @@ def _latest_version_select( if load_bundle_model: query = query.options(joinedload(cls.bundle)) + if load_serialized_dag: + query = query.options(joinedload(cls.serialized_dag)) + # Order by version_number, not created_at: version_number is monotonic and unique per # dag_id, so it is deterministic even when two versions share a created_at timestamp. # write_dag relies on this select to compute the next version_number; ordering by @@ -188,6 +192,7 @@ def get_latest_version( bundle_version: str | None = None, load_dag_model: bool = False, load_bundle_model: bool = False, + load_serialized_dag: bool = False, session: Session = NEW_SESSION, ) -> DagVersion | None: """ @@ -197,6 +202,7 @@ def get_latest_version( :param session: The database session. :param load_dag_model: Whether to load the DAG model. :param load_bundle_model: Whether to load the DagBundle model. + :param load_serialized_dag: Whether to eagerly load the serialized DAG. :return: The latest version of the DAG or None if not found. """ return session.scalar( @@ -205,6 +211,7 @@ def get_latest_version( bundle_version=bundle_version, load_dag_model=load_dag_model, load_bundle_model=load_bundle_model, + load_serialized_dag=load_serialized_dag, ) ) diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py b/airflow-core/src/airflow/serialization/definitions/dag.py index 9dc816eb9cc45..750f6177ae4e3 100644 --- a/airflow-core/src/airflow/serialization/definitions/dag.py +++ b/airflow-core/src/airflow/serialization/definitions/dag.py @@ -36,6 +36,7 @@ from airflow.exceptions import ( AirflowException, DagNotPartitionedError, + DagVersionNotFound, InvalidPartitionKeyError, NodeNotFound, TaskNotFound, @@ -583,6 +584,8 @@ def create_dagrun( partition_key: str | None = None, partition_date: datetime.datetime | None = None, note: str | None = None, + bundle_version: str | None = None, + dag_version: DagVersion | None = None, session: Session = NEW_SESSION, ) -> DagRun: """ @@ -653,7 +656,25 @@ def create_dagrun( self.validate_partition_key(partition_key) # todo: AIP-78 add verification that if run type is backfill then we have a backfill id - copied_params = self.params.deep_merge(conf) + + # When triggering against a specific bundle version, validate conf against that + # version's param schema (not the live dag's), so callers get the right errors. + if bundle_version is not None: + if self.disable_bundle_versioning: + raise ValueError(f"DAG with dag_id: '{self.dag_id}' does not support bundle versioning") + if dag_version is None: + dag_version = DagVersion.get_latest_version( + self.dag_id, bundle_version=bundle_version, load_serialized_dag=True, session=session + ) + if not dag_version: + raise DagVersionNotFound( + f"DAG with dag_id: '{self.dag_id}' does not have a version for bundle_version '{bundle_version}'" + ) + params_dag = dag_version.serialized_dag.dag + else: + params_dag = self + + copied_params = params_dag.params.deep_merge(conf) copied_params.validate() orm_dagrun = _create_orm_dagrun( dag=self, @@ -672,6 +693,9 @@ def create_dagrun( partition_key=partition_key, partition_date=partition_date, note=note, + bundle_version=bundle_version, + dag_version=dag_version, + resolved_dag=params_dag if bundle_version is not None else None, session=session, ) @@ -1393,14 +1417,23 @@ def _create_orm_dagrun( partition_key: str | None = None, partition_date: datetime.datetime | None = None, note: str | None = None, + bundle_version: str | None = None, + dag_version: DagVersion | None = None, + resolved_dag: SerializedDAG | None = None, session: Session = NEW_SESSION, ) -> DagRun: - bundle_version = None - if not dag.disable_bundle_versioning: - bundle_version = session.scalar( - select(DagModel.bundle_version).where(DagModel.dag_id == dag.dag_id), - ) - dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) + resolved_bundle_version: str | None = None + use_resolved_dag = False + if dag_version is not None: + resolved_bundle_version = bundle_version + use_resolved_dag = True + else: + if not dag.disable_bundle_versioning: + resolved_bundle_version = session.scalar( + select(DagModel.bundle_version).where(DagModel.dag_id == dag.dag_id) + ) + dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) + if not dag_version: raise AirflowException(f"Cannot create DagRun for DAG {dag.dag_id} because the dag is not serialized") @@ -1418,7 +1451,7 @@ def _create_orm_dagrun( triggered_by=triggered_by, triggering_user_name=triggering_user_name, backfill_id=backfill_id, - bundle_version=bundle_version, + bundle_version=resolved_bundle_version, partition_key=partition_key, partition_date=partition_date, note=note, @@ -1431,6 +1464,8 @@ def _create_orm_dagrun( session.add(run) session.flush() run.dag = dag + if use_resolved_dag: + run.dag = resolved_dag if resolved_dag is not None else dag_version.serialized_dag.dag # create the associated task instances # state is None at the moment of creation run.verify_integrity(session=session, dag_version_id=dag_version.id) diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 9685fd6b3cb98..5a8e16e8ce472 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -5463,6 +5463,17 @@ export const $MaterializeAssetBody = { } ], title: 'Partition Key' + }, + bundle_version: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Bundle Version' } }, additionalProperties: false, @@ -7770,6 +7781,17 @@ export const $TriggerDAGRunPostBody = { } ], title: 'Partition Key' + }, + bundle_version: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Bundle Version' } }, additionalProperties: false, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 7bd02c5597926..d22f8d38234cb 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1424,6 +1424,7 @@ export type MaterializeAssetBody = { } | null; note?: string | null; partition_key?: string | null; + bundle_version?: string | null; }; /** @@ -1931,6 +1932,7 @@ export type TriggerDAGRunPostBody = { } | null; note?: string | null; partition_key?: string | null; + bundle_version?: string | null; }; /** diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py index b65036b72d9b3..bd002c192792a 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py @@ -1671,6 +1671,57 @@ def test_should_respond_403_when_user_cannot_trigger_dag(self, test_client): user=mock.ANY, ) + def test_should_respond_with_bundle_version(self, test_client, session, dag_maker): + """Test that asset materialization respects bundle_version parameter.""" + bundle_name = "testing_bundle" + asset = session.get(AssetModel, 1).to_serialized() + + with dag_maker( + self.DAG_ASSET1_ID, + bundle_name=bundle_name, + bundle_version="v1", + schedule=None, + session=session, + ): + EmptyOperator(task_id="task_v1", outlets=asset) + + with dag_maker( + self.DAG_ASSET1_ID, + bundle_name=bundle_name, + bundle_version="v2", + schedule=None, + session=session, + ): + EmptyOperator(task_id="task_v2", outlets=asset) + + response = test_client.post("/assets/1/materialize", json={"bundle_version": "v1"}) + assert response.status_code == 200 + assert response.json()["bundle_version"] == "v1" + + response = test_client.post("/assets/1/materialize", json={"bundle_version": "invalid_version"}) + assert response.status_code == 404 + assert ( + f"DAG with dag_id: '{self.DAG_ASSET1_ID}' does not have a version for bundle_version 'invalid_version'" + in response.json()["detail"] + ) + + with dag_maker( + self.DAG_ASSET1_ID, + bundle_name=bundle_name, + bundle_version="v3", + schedule=None, + session=session, + ): + EmptyOperator(task_id="task_v3", outlets=asset) + dag_maker.dag.disable_bundle_versioning = True + + response = test_client.post("/assets/1/materialize", json={"bundle_version": "v1"}) + assert response.status_code == 400 + assert ( + f"DAG with dag_id: '{self.DAG_ASSET1_ID}' does not support bundle versioning" + in response.json()["detail"] + ) + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") def test_should_respond_400_on_invalid_dag_run_id(self, test_client): """A dag_run_id containing '..' triggers ValueError in DagRun.validate_run_id. diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 5b5a0293ffc4b..b5658f15fbf5e 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -3461,6 +3461,120 @@ def test_custom_timetable_generate_run_id_for_manual_trigger(self, dag_maker, te run = session.scalars(select(DagRun).where(DagRun.run_id == run_id_without_logical_date)).one() assert run.dag_id == custom_dag_id + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_trigger_dag_run_with_bundle_version(self, test_client, session, dag_maker): + """Test triggering a DAG run with a specific bundle version.""" + from tests_common.test_utils.dag import sync_dag_to_db + + dag_id = "test_bundle_version_dag" + bundle_name = "testing_bundle" + + with dag_maker( + dag_id=dag_id, + bundle_name=bundle_name, + bundle_version="v1", + session=session, + ) as dag1: + EmptyOperator(task_id="task_1") + sync_dag_to_db(dag1, bundle_name=bundle_name, bundle_version="v1") + + with dag_maker( + dag_id=dag_id, + bundle_name=bundle_name, + bundle_version="v2", + session=session, + ) as dag2: + EmptyOperator(task_id="task_1") + EmptyOperator(task_id="task_2") + sync_dag_to_db(dag2, bundle_name=bundle_name, bundle_version="v2") + + response = test_client.post( + f"/dags/{dag_id}/dagRuns", json={"logical_date": "2024-01-01T00:00:00Z", "bundle_version": "v1"} + ) + assert response.status_code == 200 + assert response.json()["dag_versions"][0]["bundle_version"] == "v1" + run_id_v1 = response.json()["dag_run_id"] + dr_v1 = session.scalars(select(DagRun).where(DagRun.run_id == run_id_v1)).one() + assert {ti.task_id for ti in dr_v1.task_instances} == {"task_1"} + + response = test_client.post( + f"/dags/{dag_id}/dagRuns", + json={ + "logical_date": "2024-01-02T00:00:00Z", + }, + ) + assert response.status_code == 200 + assert response.json()["dag_versions"][0]["bundle_version"] == "v2" + run_id_v2 = response.json()["dag_run_id"] + dr_v2 = session.scalars(select(DagRun).where(DagRun.run_id == run_id_v2)).one() + assert {ti.task_id for ti in dr_v2.task_instances} == {"task_1", "task_2"} + + response = test_client.post( + f"/dags/{dag_id}/dagRuns", + json={"logical_date": "2024-01-03T00:00:00Z", "bundle_version": "invalid_version"}, + ) + assert response.status_code == 404 + assert ( + f"DAG with dag_id: '{dag_id}' does not have a version for bundle_version 'invalid_version'" + in response.json()["detail"] + ) + + dag2.disable_bundle_versioning = True + sync_dag_to_db(dag2, bundle_name=bundle_name) + + response = test_client.post( + f"/dags/{dag_id}/dagRuns", json={"logical_date": "2024-01-04T00:00:00Z", "bundle_version": "v1"} + ) + assert response.status_code == 400 + assert f"DAG with dag_id: '{dag_id}' does not support bundle versioning" in response.json()["detail"] + + def test_trigger_dag_run_bundle_version_validates_against_old_param_schema( + self, test_client, session, dag_maker + ): + """Conf is validated against the requested bundle version's param schema, not the live dag's.""" + from tests_common.test_utils.dag import sync_dag_to_db + + dag_id = "test_bundle_param_schema_dag" + bundle_name = "param_schema_bundle" + + with dag_maker( + dag_id=dag_id, + bundle_name=bundle_name, + bundle_version="v1", + session=session, + params={"env": Param("staging", type="string", enum=["staging", "prod"])}, + ) as dag1: + EmptyOperator(task_id="task_1") + sync_dag_to_db(dag1, bundle_name=bundle_name, bundle_version="v1") + + with dag_maker( + dag_id=dag_id, + bundle_name=bundle_name, + bundle_version="v2", + session=session, + params={"env": Param("dev", type="string", enum=["dev", "staging", "prod"])}, + ) as dag2: + EmptyOperator(task_id="task_1") + sync_dag_to_db(dag2, bundle_name=bundle_name, bundle_version="v2") + + # "dev" is valid for v2 but not for v1's enum — triggering v1 should reject it. + response = test_client.post( + f"/dags/{dag_id}/dagRuns", + json={"logical_date": "2024-02-01T00:00:00Z", "bundle_version": "v1", "conf": {"env": "dev"}}, + ) + assert response.status_code == 400 + + # "staging" is valid for both v1 and v2 — triggering v1 should accept it. + response = test_client.post( + f"/dags/{dag_id}/dagRuns", + json={ + "logical_date": "2024-02-02T00:00:00Z", + "bundle_version": "v1", + "conf": {"env": "staging"}, + }, + ) + assert response.status_code == 200 + def test_should_respond_400_when_partition_key_given_for_non_partitioned_dag(self, test_client): """Passing partition_key to a non-partitioned Dag via REST trigger must return 400, not 500. diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index 090cd2d4756e0..da0577e91f7bb 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -4282,6 +4282,92 @@ def hello(): assert dr.bundle_version == expected +def test_create_dagrun_uses_resolved_bundle_version_for_integrity(dag_maker, session, clear_dags): + """ + When no explicit bundle_version is passed, the live dag drives TI creation and + created_dag_version points to the latest serialized version. DagRun.bundle_version + still records the DagModel.bundle_version for auditing purposes. + """ + with dag_maker( + dag_id="test_dag_bundle_version_integrity", + session=session, + serialized=True, + bundle_version="v1", + ) as _dag_v1: + EmptyOperator(task_id="t1") + + with dag_maker( + dag_id="test_dag_bundle_version_integrity", + session=session, + serialized=True, + bundle_version="v2", + ) as dag_v2: + EmptyOperator(task_id="t1") + EmptyOperator(task_id="t2") + + dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == dag_v2.dag_id)) + dag_model.bundle_version = "v1" + session.commit() + + dr = dag_v2.create_dagrun( + run_id="bundle_version_integrity", + run_after=pendulum.now(), + run_type="manual", + triggered_by=DagRunTriggeredByType.TEST, + state=None, + ) + + # DagRun.bundle_version records the DagModel value at trigger time (audit field). + assert dr.bundle_version == "v1" + # created_dag_version reflects the latest serialized version (v2), not the DagModel audit value. + assert dr.created_dag_version.bundle_version == "v2" + # TIs come from the live dag (dag_v2 with t1+t2), not from the old serialized version. + assert {ti.task_id for ti in dr.get_task_instances(session=session)} == {"t1", "t2"} + + +def test_create_dagrun_without_bundle_version_uses_live_dag(dag_maker, session, clear_dags): + """ + When no explicit bundle_version is passed, TIs are created from the live dag even if + DagModel.bundle_version points to an older version. This confirms backfills and other + callers that don't pass bundle_version are unaffected by the bundle_version feature. + """ + with dag_maker( + dag_id="test_dag_backfill_bundle_version", + session=session, + serialized=True, + bundle_version="v1", + ) as _dag_v1: + EmptyOperator(task_id="t1") + + with dag_maker( + dag_id="test_dag_backfill_bundle_version", + session=session, + serialized=True, + bundle_version="v2", + ) as dag_v2: + EmptyOperator(task_id="t1") + EmptyOperator(task_id="t2") + + dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == dag_v2.dag_id)) + dag_model.bundle_version = "v1" + session.commit() + + dr = dag_v2.create_dagrun( + run_id="no_bundle_version_uses_live_dag", + run_after=pendulum.now(), + run_type="manual", + triggered_by=DagRunTriggeredByType.TEST, + state=None, + ) + + # TIs come from the live dag (dag_v2), not from the v1 serialized version. + assert {ti.task_id for ti in dr.get_task_instances(session=session)} == {"t1", "t2"} + # created_dag_version reflects the latest serialization (v2). + assert dr.created_dag_version.bundle_version == "v2" + # DagRun.bundle_version still records the DagModel value at trigger time. + assert dr.bundle_version == "v1" + + def test_get_run_data_interval(): with DAG("dag", schedule=None, start_date=DEFAULT_DATE) as dag: EmptyOperator(task_id="empty_task") diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index bd7ce6dd44859..4405513efa5ea 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -792,6 +792,7 @@ class MaterializeAssetBody(BaseModel): conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None note: Annotated[str | None, Field(title="Note")] = None partition_key: Annotated[str | None, Field(title="Partition Key")] = None + bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None class NewTaskResponse(BaseModel): @@ -1122,6 +1123,7 @@ class TriggerDAGRunPostBody(BaseModel): conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None note: Annotated[str | None, Field(title="Note")] = None partition_key: Annotated[str | None, Field(title="Partition Key")] = None + bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None class TriggerResponse(BaseModel): diff --git a/devel-common/src/tests_common/test_utils/dag.py b/devel-common/src/tests_common/test_utils/dag.py index 6e02ddf61ab67..891176eb499fc 100644 --- a/devel-common/src/tests_common/test_utils/dag.py +++ b/devel-common/src/tests_common/test_utils/dag.py @@ -41,15 +41,17 @@ def create_scheduler_dag(dag: DAG | SerializedDAG) -> SerializedDAG: def sync_dag_to_db( dag: DAG, bundle_name: str = "testing", + bundle_version: str | None = None, session: Session = NEW_SESSION, ) -> SerializedDAG: - return sync_dags_to_db([dag], bundle_name=bundle_name, session=session)[0] + return sync_dags_to_db([dag], bundle_name=bundle_name, bundle_version=bundle_version, session=session)[0] @provide_session def sync_dags_to_db( dags: Collection[DAG], bundle_name: str = "testing", + bundle_version: str | None = None, session: Session = NEW_SESSION, ) -> Sequence[SerializedDAG]: """ @@ -68,10 +70,12 @@ def sync_dags_to_db( def _write_dag(dag: DAG) -> SerializedDAG: data = DagSerialization.to_dict(dag) - SerializedDagModel.write_dag(LazyDeserializedDAG(data=data), bundle_name, session=session) + SerializedDagModel.write_dag( + LazyDeserializedDAG(data=data), bundle_name, bundle_version, session=session + ) return DagSerialization.from_dict(data) - SerializedDAG.bulk_write_to_db(bundle_name, None, dags, session=session) + SerializedDAG.bulk_write_to_db(bundle_name, bundle_version, dags, session=session) scheduler_dags = [_write_dag(dag) for dag in dags] session.flush() return scheduler_dags