diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index 23b93f2dae5aa..05f1b5d141a8e 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -375,7 +375,7 @@ def _sort_serialized_dag_dict(cls, serialized_dag: Any): """Recursively sort json_dict and its nested dictionaries and lists.""" if isinstance(serialized_dag, dict): return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())} - if isinstance(serialized_dag, list): + if isinstance(serialized_dag, (list, tuple)): if all(isinstance(i, dict) for i in serialized_dag): if all( isinstance(i.get("__var", {}), Iterable) and "task_id" in i.get("__var", {}) diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index 208fd9bb074f3..cb59144f10742 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -25,6 +25,7 @@ from unittest import mock from unittest.mock import MagicMock +import msgspec import pendulum import pytest import time_machine @@ -35,6 +36,7 @@ from airflow.cli import cli_parser from airflow.cli.commands import dag_command from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db +from airflow.dag_processing.processor import DagFileParsingResult, DagFileProcessorProcess from airflow.exceptions import AirflowException from airflow.models import DagModel, DagRun from airflow.models.dagbag import DBDagBag @@ -42,6 +44,8 @@ from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.sdk import BaseOperator, task from airflow.sdk.definitions.dag import _run_inline_trigger +from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame +from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG from airflow.triggers.base import TriggerEvent from airflow.utils.session import create_session from airflow.utils.state import DagRunState @@ -1070,3 +1074,29 @@ def test_reserialize_should_support_multiple_bundle_name_arguments(self, configu serialized_dag_ids = set(session.execute(select(SerializedDagModel.dag_id)).scalars()) assert serialized_dag_ids == {"test_example_bash_operator", "test_sensor"} + + @conf_vars({("core", "load_examples"): "false"}) + def test_reserialize_should_make_equal_hash_with_dag_processor(self, configure_dag_bundles, session): + bundles = {"bundle_reserialize": TEST_DAGS_FOLDER / "test_dag_reserialize.py"} + with configure_dag_bundles(bundles): + dag_command.dag_reserialize( + self.parser.parse_args(["dags", "reserialize", "--bundle-name", "bundle_reserialize"]) + ) + + dagbag = DagBag(bundles["bundle_reserialize"], bundle_path=bundles["bundle_reserialize"]) + dag_parsing_result = DagFileParsingResult( + fileloc=bundles["bundle_reserialize"].name, + serialized_dags=[ + LazyDeserializedDAG(data=DagSerialization.to_dict(dag)) for dag in dagbag.dags.values() + ], + ) + + frame = _ResponseFrame(id=0, body=dag_parsing_result.model_dump()).as_bytes() + request_frame = msgspec.msgpack.Decoder[_RequestFrame](_RequestFrame).decode(frame[4:]) + dag_processor_parsing_result = DagFileProcessorProcess.decoder.validate_python(request_frame.body) + + serialized_dag_hash = list(session.execute(select(SerializedDagModel.dag_hash)).scalars()) + + assert len(dag_processor_parsing_result.serialized_dags) == 1 + assert len(serialized_dag_hash) == 1 + assert dag_processor_parsing_result.serialized_dags[0].hash == serialized_dag_hash[0] diff --git a/airflow-core/tests/unit/dags/test_dag_reserialize.py b/airflow-core/tests/unit/dags/test_dag_reserialize.py new file mode 100644 index 0000000000000..c9eba5ca3e10e --- /dev/null +++ b/airflow-core/tests/unit/dags/test_dag_reserialize.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow.providers.standard.operators.python import PythonOperator +from airflow.sdk import DAG + + +def empty_task(): + pass + + +with DAG( + "test_dag_reserialize", + start_date=datetime(2026, 1, 20), + schedule="* * * * *", + catchup=False, + max_active_runs=1, +) as dag: + task_b = PythonOperator(task_id="bear", python_callable=empty_task)