From 081e71b4c94ac9d01c36720541694eb519db9769 Mon Sep 17 00:00:00 2001 From: Desdroid Date: Tue, 23 Jun 2026 14:52:54 +0200 Subject: [PATCH] Fix: serialize DagParam like a normal Param. --- .../serialization/serialized_objects.py | 6 ++- .../serialization/test_dag_serialization.py | 41 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 14bbd34335971..78e60fd31539e 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -60,7 +60,7 @@ from airflow.sdk.definitions.deadline import DeadlineAlert from airflow.sdk.definitions.mappedoperator import MappedOperator from airflow.sdk.definitions.operator_resources import Resources -from airflow.sdk.definitions.param import Param, ParamsDict +from airflow.sdk.definitions.param import DagParam, Param, ParamsDict from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup from airflow.sdk.definitions.xcom_arg import serialize_xcom_arg from airflow.sdk.execution_time.context import OutletEventAccessor, OutletEventAccessors @@ -579,6 +579,10 @@ def serialize( return TaskGroupSerialization.serialize_task_group(var) elif isinstance(var, Param): return cls._encode(cls._serialize_param(var), type_=DAT.PARAM) + elif isinstance(var, DagParam): + return cls._encode( + cls._serialize_param(Param(default=var._default, source="dag")), type_=DAT.PARAM + ) elif isinstance(var, XComArg): return cls._encode(serialize_xcom_arg(var), type_=DAT.XCOM_REF) elif isinstance(var, LazySelectSequence): diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 61a66c60fb647..75ee3927fa461 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -4768,6 +4768,47 @@ def test_partial_kwargs_end_to_end_deserialization(self): assert deserialized_task.partial_kwargs["retry_delay"] == timedelta(seconds=600) assert deserialized_task.partial_kwargs["owner"] == "custom_owner" + def test_partial_kwargs_dag_param_serialization_is_stable(self): + """DagParam passed to a mapped task's partial() must serialize to a stable representation. + + Without dedicated handling the serializer falls back to ``str(var)`` which embeds the + object's memory address, producing a different serialized Dag on every parse. + """ + from airflow.sdk import task + + with DAG(dag_id="test_dag_param_partial") as dag: + + @task + def add(value): + return value + + add.partial(value=dag.param("p", "default_value")).expand(value=[1, 2, 3]) + + serialized_dag = DagSerialization.to_dict(dag) + mapped_task = serialized_dag["dag"]["tasks"][0]["__var"] + serialized_value = mapped_task["partial_kwargs"]["op_kwargs"]["__var"]["value"] + + # The DagParam must encode to its stable structure, not a repr with a memory address. + assert serialized_value["__type"] == "param" + assert serialized_value["__var"] == { + "__class": "airflow.sdk.definitions.param.Param", + "description": None, + "source": "dag", + "default": "default_value", + "schema": {"__var": {}, "__type": "dict"}, + } + + # Serializing the same Dag twice must produce identical output. + assert DagSerialization.to_dict(dag) == serialized_dag + + # And the round-trip restores a real DagParam bound to the Dag. + deserialized_dag = DagSerialization.from_dict(serialized_dag) + deserialized_task = deserialized_dag.get_task("add") + restored_param = deserialized_task.partial_kwargs["op_kwargs"]["value"] + assert isinstance(restored_param, SerializedParam) + assert restored_param.value == "default_value" + assert restored_param.source == "dag" + @pytest.mark.parametrize( ("callbacks", "expected_has_flags", "absent_keys"),