diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index e26344a81e573..8f7895b49c4c0 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1761,11 +1761,19 @@ def finalize( if getattr(ti.task, "overwrite_rtif_after_execution", False): log.debug("Overwriting Rendered template fields.") - if ti.task.template_fields: - try: - SUPERVISOR_COMMS.send(SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task))) - except Exception: - log.exception("Failed to set rendered fields during finalization", ti=ti, task=ti.task) + # Only update RTIF for terminal states. RTIF table doesn't have try_number column, + # so intermediate states (UP_FOR_RETRY, UP_FOR_RESCHEDULE, DEFERRED) would just + # overwrite each other. We only store the terminal state attempt's rendered fields. + if state in (TaskInstanceState.SUCCESS, TaskInstanceState.FAILED, TaskInstanceState.SKIPPED): + if ti.task.template_fields: + try: + SUPERVISOR_COMMS.send(SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task))) + except Exception: + log.exception( + "Failed to set rendered fields during finalization", + task_id=ti.task_id, + dag_id=ti.dag_id, + ) log.debug("Running finalizers", ti=ti) if state == TaskInstanceState.SUCCESS: