Skip to content

[Serialization] Fix: Serialize DagParam like a normal Param.#68901

Open
Desdroid wants to merge 1 commit into
apache:mainfrom
Desdroid:bugfix/dagparam-serialization-in-partial
Open

[Serialization] Fix: Serialize DagParam like a normal Param.#68901
Desdroid wants to merge 1 commit into
apache:mainfrom
Desdroid:bugfix/dagparam-serialization-in-partial

Conversation

@Desdroid

Copy link
Copy Markdown
Contributor

When we use a DagParam as a kwarg to partial() of a dynamically mapped task, the DagParam will be serialized with its memory address. This leads to a DAG version inflation, as the memory address is not stable.

This is a simple repro:

from __future__ import annotations

import json

from airflow.sdk import DAG
from airflow.sdk import task
from airflow.serialization.definitions.param import SerializedParam
from airflow.serialization.serialized_objects import DagSerialization

if __name__ == "__main__":
    with DAG(dag_id="repro_dagparam") as dag:

        @task
        def add(value):
            return value

        @task
        def do(something):
            return something

        do(dag.param("some", "some_default_val"))

        add.partial(value=dag.param("p", "p_default_val")).expand(value=[1, 2, 3])

    ser = DagSerialization.to_dict(dag)
    mapped = ser["dag"]["tasks"][1]["__var"]
    print(json.dumps(mapped.get("partial_kwargs"), indent=2, default=str))
    print("ROUNDTRIP:")
    deser = DagSerialization.from_dict(ser)
    t = deser.get_task("add")
    print(type(t.partial_kwargs.get("op_kwargs")), t.partial_kwargs.get("op_kwargs"))

This PR fixes this issue by serializing the DagParam as a normal Param. This behavior matches the serialization that using the DagParam as a normal arg does (like in the do task - it ends up being serialized as a Param).

Note: I am not 100% sure that this is the correct fix as I'm not super deep in the DAG serialization code, but it seems to work.


Was generative AI tooling used to co-author this PR?
  • [] Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@Desdroid

Copy link
Copy Markdown
Contributor Author

@ephraimbuddy @wjddn279 Since you were working on serialization in #63871 - what do you think of this fix I propose here? Does this make sense or is it wrong and some change is needed in serialize_template_field?
The DagParam gets serialized with it's memory address when in partial of DTM.

Comment on lines +582 to +585
elif isinstance(var, DagParam):
return cls._encode(
cls._serialize_param(Param(default=var._default, source="dag")), type_=DAT.PARAM
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested this? Looking at the code, it seems only _default is stored and _name is dropped, and since DagParam resolves at runtime via dag_run.conf[self._name], I think this could be a problem. Or am I misreading the flow?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants