From 0da2c0d15503e186f94239b2fded0a44f1c4a917 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Tue, 27 Jan 2026 18:50:41 +0900 Subject: [PATCH 1/6] fix unmatched serialized result of task group when using airflow command --- airflow-core/src/airflow/models/serialized_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index 23b93f2dae5aa..1a5952ef6aac4 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -375,7 +375,7 @@ def _sort_serialized_dag_dict(cls, serialized_dag: Any): """Recursively sort json_dict and its nested dictionaries and lists.""" if isinstance(serialized_dag, dict): return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())} - if isinstance(serialized_dag, list): + if isinstance(serialized_dag, list) or isinstance(serialized_dag, tuple): if all(isinstance(i, dict) for i in serialized_dag): if all( isinstance(i.get("__var", {}), Iterable) and "task_id" in i.get("__var", {}) From 06c920318e9ff3fa7fe64d0f57240dfb92f4ee9f Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Wed, 28 Jan 2026 00:38:51 +0900 Subject: [PATCH 2/6] add test for dag reserialize --- .../unit/cli/commands/test_dag_command.py | 27 +++++++++++++- .../tests/unit/dags/test_dag_reserialize.py | 37 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 airflow-core/tests/unit/dags/test_dag_reserialize.py diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index 208fd9bb074f3..0e669b0de2174 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -42,6 +42,7 @@ from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.sdk import BaseOperator, task from airflow.sdk.definitions.dag import _run_inline_trigger +from airflow.serialization.serialized_objects import DagSerialization from airflow.triggers.base import TriggerEvent from airflow.utils.session import create_session from airflow.utils.state import DagRunState @@ -1026,6 +1027,7 @@ class TestCliDagsReserialize: "bundle1": TEST_DAGS_FOLDER / "test_example_bash_operator.py", "bundle2": TEST_DAGS_FOLDER / "test_sensor.py", "bundle3": TEST_DAGS_FOLDER / "test_dag_with_no_tags.py", + "bundle4": TEST_DAGS_FOLDER / "test_dag_reserialize.py", } @classmethod @@ -1041,7 +1043,12 @@ def test_reserialize(self, configure_dag_bundles, session): dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize"])) serialized_dag_ids = set(session.execute(select(SerializedDagModel.dag_id)).scalars()) - assert serialized_dag_ids == {"test_example_bash_operator", "test_dag_with_no_tags", "test_sensor"} + assert serialized_dag_ids == { + "test_example_bash_operator", + "test_dag_with_no_tags", + "test_sensor", + "test_dag_reserialize", + } example_bash_op = session.execute( select(DagModel).where(DagModel.dag_id == "test_example_bash_operator") @@ -1070,3 +1077,21 @@ def test_reserialize_should_support_multiple_bundle_name_arguments(self, configu serialized_dag_ids = set(session.execute(select(SerializedDagModel.dag_id)).scalars()) assert serialized_dag_ids == {"test_example_bash_operator", "test_sensor"} + + @conf_vars({("core", "load_examples"): "false"}) + def test_reserialize_should_make_equal_hash(self, configure_dag_bundles, session): + from airflow.serialization.serialized_objects import LazyDeserializedDAG + + with configure_dag_bundles(self.test_bundles_config): + dag_command.dag_reserialize( + self.parser.parse_args(["dags", "reserialize", "--bundle-name", "bundle4"]) + ) + + dagbag = DagBag(self.test_bundles_config["bundle4"], bundle_path=self.test_bundles_config["bundle4"]) + dag_hashes = set( + [LazyDeserializedDAG(data=DagSerialization.to_dict(dag)).hash for dag in dagbag.dags.values()] + ) + + serialized_dag_hash = set(session.execute(select(SerializedDagModel.dag_hash)).scalars()) + + assert dag_hashes == serialized_dag_hash diff --git a/airflow-core/tests/unit/dags/test_dag_reserialize.py b/airflow-core/tests/unit/dags/test_dag_reserialize.py new file mode 100644 index 0000000000000..489f032c6e125 --- /dev/null +++ b/airflow-core/tests/unit/dags/test_dag_reserialize.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime +from time import sleep + +from airflow.providers.standard.operators.python import PythonOperator +from airflow.sdk import DAG + + +def sleep_task(): + sleep(1) + + +with DAG( + "test_dag_reserialize", + start_date=datetime(2026, 1, 20), + schedule="* * * * *", + catchup=False, + max_active_runs=1, +) as dag: + task_b = PythonOperator(task_id="bear", python_callable=sleep_task) From b89903cf173cb2aa26d4e8fd5098df79682da10d Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Wed, 28 Jan 2026 13:37:33 +0900 Subject: [PATCH 3/6] fix test for checking identical hash value with dag_processor --- .../unit/cli/commands/test_dag_command.py | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index 0e669b0de2174..03fa8e4815bef 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -25,6 +25,7 @@ from unittest import mock from unittest.mock import MagicMock +import msgspec import pendulum import pytest import time_machine @@ -42,6 +43,7 @@ from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.sdk import BaseOperator, task from airflow.sdk.definitions.dag import _run_inline_trigger +from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame from airflow.serialization.serialized_objects import DagSerialization from airflow.triggers.base import TriggerEvent from airflow.utils.session import create_session @@ -1079,7 +1081,8 @@ def test_reserialize_should_support_multiple_bundle_name_arguments(self, configu assert serialized_dag_ids == {"test_example_bash_operator", "test_sensor"} @conf_vars({("core", "load_examples"): "false"}) - def test_reserialize_should_make_equal_hash(self, configure_dag_bundles, session): + def test_reserialize_should_make_equal_hash_with_dag_processor(self, configure_dag_bundles, session): + from airflow.dag_processing.processor import DagFileParsingResult, DagFileProcessorProcess from airflow.serialization.serialized_objects import LazyDeserializedDAG with configure_dag_bundles(self.test_bundles_config): @@ -1088,10 +1091,19 @@ def test_reserialize_should_make_equal_hash(self, configure_dag_bundles, session ) dagbag = DagBag(self.test_bundles_config["bundle4"], bundle_path=self.test_bundles_config["bundle4"]) - dag_hashes = set( - [LazyDeserializedDAG(data=DagSerialization.to_dict(dag)).hash for dag in dagbag.dags.values()] + dag_parsing_result = DagFileParsingResult( + fileloc=self.test_bundles_config["bundle4"].name, + serialized_dags=[ + LazyDeserializedDAG(data=DagSerialization.to_dict(dag)) for dag in dagbag.dags.values() + ], ) - serialized_dag_hash = set(session.execute(select(SerializedDagModel.dag_hash)).scalars()) + frame = _ResponseFrame(id=0, body=dag_parsing_result.model_dump()).as_bytes() + request_frame = msgspec.msgpack.Decoder[_RequestFrame](_RequestFrame).decode(frame[4:]) + dag_processor_parsing_result = DagFileProcessorProcess.decoder.validate_python(request_frame.body) - assert dag_hashes == serialized_dag_hash + serialized_dag_hash = list(session.execute(select(SerializedDagModel.dag_hash)).scalars()) + + assert len(dag_processor_parsing_result.serialized_dags) == 1 + assert len(serialized_dag_hash) == 1 + assert dag_processor_parsing_result.serialized_dags[0].hash == serialized_dag_hash[0] From 31de974b7537a7f8d3c5bbdab5f79ad5bfaa9ef7 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Wed, 28 Jan 2026 13:42:52 +0900 Subject: [PATCH 4/6] fix test import --- airflow-core/tests/unit/cli/commands/test_dag_command.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index 03fa8e4815bef..8bbc0e28f98d9 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -43,8 +43,6 @@ from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.sdk import BaseOperator, task from airflow.sdk.definitions.dag import _run_inline_trigger -from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame -from airflow.serialization.serialized_objects import DagSerialization from airflow.triggers.base import TriggerEvent from airflow.utils.session import create_session from airflow.utils.state import DagRunState @@ -1083,7 +1081,8 @@ def test_reserialize_should_support_multiple_bundle_name_arguments(self, configu @conf_vars({("core", "load_examples"): "false"}) def test_reserialize_should_make_equal_hash_with_dag_processor(self, configure_dag_bundles, session): from airflow.dag_processing.processor import DagFileParsingResult, DagFileProcessorProcess - from airflow.serialization.serialized_objects import LazyDeserializedDAG + from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame + from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG with configure_dag_bundles(self.test_bundles_config): dag_command.dag_reserialize( From fa8b5c83a0f31bd4611e9bc6a0ab4545a8434df1 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Fri, 6 Mar 2026 16:35:42 +0900 Subject: [PATCH 5/6] fix nit --- .../src/airflow/models/serialized_dag.py | 2 +- .../tests/unit/cli/commands/test_dag_command.py | 17 ++++++----------- .../tests/unit/dags/test_dag_reserialize.py | 7 +++---- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index 1a5952ef6aac4..05f1b5d141a8e 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -375,7 +375,7 @@ def _sort_serialized_dag_dict(cls, serialized_dag: Any): """Recursively sort json_dict and its nested dictionaries and lists.""" if isinstance(serialized_dag, dict): return {k: cls._sort_serialized_dag_dict(v) for k, v in sorted(serialized_dag.items())} - if isinstance(serialized_dag, list) or isinstance(serialized_dag, tuple): + if isinstance(serialized_dag, (list, tuple)): if all(isinstance(i, dict) for i in serialized_dag): if all( isinstance(i.get("__var", {}), Iterable) and "task_id" in i.get("__var", {}) diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index 8bbc0e28f98d9..26e990197b7f8 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -1027,7 +1027,6 @@ class TestCliDagsReserialize: "bundle1": TEST_DAGS_FOLDER / "test_example_bash_operator.py", "bundle2": TEST_DAGS_FOLDER / "test_sensor.py", "bundle3": TEST_DAGS_FOLDER / "test_dag_with_no_tags.py", - "bundle4": TEST_DAGS_FOLDER / "test_dag_reserialize.py", } @classmethod @@ -1043,12 +1042,7 @@ def test_reserialize(self, configure_dag_bundles, session): dag_command.dag_reserialize(self.parser.parse_args(["dags", "reserialize"])) serialized_dag_ids = set(session.execute(select(SerializedDagModel.dag_id)).scalars()) - assert serialized_dag_ids == { - "test_example_bash_operator", - "test_dag_with_no_tags", - "test_sensor", - "test_dag_reserialize", - } + assert serialized_dag_ids == {"test_example_bash_operator", "test_dag_with_no_tags", "test_sensor"} example_bash_op = session.execute( select(DagModel).where(DagModel.dag_id == "test_example_bash_operator") @@ -1084,14 +1078,15 @@ def test_reserialize_should_make_equal_hash_with_dag_processor(self, configure_d from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG - with configure_dag_bundles(self.test_bundles_config): + bundles = {"bundle_reserialize": TEST_DAGS_FOLDER / "test_dag_reserialize.py"} + with configure_dag_bundles(bundles): dag_command.dag_reserialize( - self.parser.parse_args(["dags", "reserialize", "--bundle-name", "bundle4"]) + self.parser.parse_args(["dags", "reserialize", "--bundle-name", "bundle_reserialize"]) ) - dagbag = DagBag(self.test_bundles_config["bundle4"], bundle_path=self.test_bundles_config["bundle4"]) + dagbag = DagBag(bundles["bundle_reserialize"], bundle_path=bundles["bundle_reserialize"]) dag_parsing_result = DagFileParsingResult( - fileloc=self.test_bundles_config["bundle4"].name, + fileloc=bundles["bundle_reserialize"].name, serialized_dags=[ LazyDeserializedDAG(data=DagSerialization.to_dict(dag)) for dag in dagbag.dags.values() ], diff --git a/airflow-core/tests/unit/dags/test_dag_reserialize.py b/airflow-core/tests/unit/dags/test_dag_reserialize.py index 489f032c6e125..c9eba5ca3e10e 100644 --- a/airflow-core/tests/unit/dags/test_dag_reserialize.py +++ b/airflow-core/tests/unit/dags/test_dag_reserialize.py @@ -17,14 +17,13 @@ from __future__ import annotations from datetime import datetime -from time import sleep from airflow.providers.standard.operators.python import PythonOperator from airflow.sdk import DAG -def sleep_task(): - sleep(1) +def empty_task(): + pass with DAG( @@ -34,4 +33,4 @@ def sleep_task(): catchup=False, max_active_runs=1, ) as dag: - task_b = PythonOperator(task_id="bear", python_callable=sleep_task) + task_b = PythonOperator(task_id="bear", python_callable=empty_task) From fbe3298735ea4643887cfb9c23eb15a73be72aa9 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Fri, 6 Mar 2026 22:48:01 +0900 Subject: [PATCH 6/6] fix import location --- airflow-core/tests/unit/cli/commands/test_dag_command.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index 26e990197b7f8..cb59144f10742 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -36,6 +36,7 @@ from airflow.cli import cli_parser from airflow.cli.commands import dag_command from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db +from airflow.dag_processing.processor import DagFileParsingResult, DagFileProcessorProcess from airflow.exceptions import AirflowException from airflow.models import DagModel, DagRun from airflow.models.dagbag import DBDagBag @@ -43,6 +44,8 @@ from airflow.providers.standard.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.sdk import BaseOperator, task from airflow.sdk.definitions.dag import _run_inline_trigger +from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame +from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG from airflow.triggers.base import TriggerEvent from airflow.utils.session import create_session from airflow.utils.state import DagRunState @@ -1074,10 +1077,6 @@ def test_reserialize_should_support_multiple_bundle_name_arguments(self, configu @conf_vars({("core", "load_examples"): "false"}) def test_reserialize_should_make_equal_hash_with_dag_processor(self, configure_dag_bundles, session): - from airflow.dag_processing.processor import DagFileParsingResult, DagFileProcessorProcess - from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame - from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG - bundles = {"bundle_reserialize": TEST_DAGS_FOLDER / "test_dag_reserialize.py"} with configure_dag_bundles(bundles): dag_command.dag_reserialize(