Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1761,11 +1761,19 @@ def finalize(

if getattr(ti.task, "overwrite_rtif_after_execution", False):
log.debug("Overwriting Rendered template fields.")
if ti.task.template_fields:
try:
SUPERVISOR_COMMS.send(SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task)))
except Exception:
log.exception("Failed to set rendered fields during finalization", ti=ti, task=ti.task)
# Only update RTIF for terminal states. RTIF table doesn't have try_number column,
# so intermediate states (UP_FOR_RETRY, UP_FOR_RESCHEDULE, DEFERRED) would just
# overwrite each other. We only store the terminal state attempt's rendered fields.
if state in (TaskInstanceState.SUCCESS, TaskInstanceState.FAILED, TaskInstanceState.SKIPPED):
if ti.task.template_fields:
try:
SUPERVISOR_COMMS.send(SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task)))
except Exception:
log.exception(
"Failed to set rendered fields during finalization",
task_id=ti.task_id,
dag_id=ti.dag_id,
)

log.debug("Running finalizers", ti=ti)
if state == TaskInstanceState.SUCCESS:
Expand Down
Loading