Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from airflow.models.dagrun import DagRun as DR
from airflow.models.log import Log
from airflow.models.taskinstance import TaskInstance as TI, _stop_remaining_tasks
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.trigger import Trigger
from airflow.models.xcom import XComModel
Expand Down Expand Up @@ -615,6 +616,9 @@ def ti_skip_downstream(
status.HTTP_409_CONFLICT: {
"description": "The TI attempting to heartbeat should be terminated for the given reason"
},
status.HTTP_410_GONE: {
"description": "Task Instance not found in the TI table but exists in the Task Instance History table"
},
HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for the state transition"},
},
)
Expand All @@ -638,6 +642,24 @@ def ti_heartbeat(
"Retrieved current task state", state=previous_state, current_hostname=hostname, current_pid=pid
)
except NoResultFound:
# Check if the TI exists in the Task Instance History table.
# If it does, it was likely cleared while running, so return 410 Gone
# instead of 404 Not Found to give the client a more specific signal.
tih_exists = session.scalar(
select(func.count(TIH.task_instance_id)).where(TIH.task_instance_id == task_instance_id)
)
if tih_exists:
log.error(
"TaskInstance was previously cleared and archived in history, heartbeat skipped",
ti_id=str(task_instance_id),
)
raise HTTPException(
status_code=status.HTTP_410_GONE,
detail={
"reason": "not_found",
"message": "Task Instance not found, it may have been moved to the Task Instance History table",
},
)
log.error("Task Instance not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,40 @@ def test_ti_heartbeat_non_existent_task(self, client, session, create_task_insta
"message": "Task Instance not found",
}

def test_ti_heartbeat_cleared_task_returns_410(self, client, session, create_task_instance):
"""Test that a 410 error is returned when a TI was cleared and moved to TIH."""
ti = create_task_instance(
task_id="test_ti_heartbeat_cleared",
state=State.RUNNING,
hostname="random-hostname",
pid=1547,
session=session,
)
session.commit()
old_ti_id = ti.id

# Simulate task being cleared: this archives the current try to TIH
# and assigns a new UUID to the TI, mirroring prepare_db_for_next_try().
ti.prepare_db_for_next_try(session)
session.commit()

assert session.get(TaskInstance, old_ti_id) is None
tih = session.scalar(
select(TaskInstanceHistory).where(TaskInstanceHistory.task_instance_id == old_ti_id)
)
assert tih is not None

response = client.put(
f"/execution/task-instances/{old_ti_id}/heartbeat",
json={"hostname": "random-hostname", "pid": 1547},
)

assert response.status_code == 410
assert response.json()["detail"] == {
"reason": "not_found",
"message": "Task Instance not found, it may have been moved to the Task Instance History table",
}

@pytest.mark.parametrize(
"ti_state",
[State.SUCCESS, State.FAILED],
Expand Down
2 changes: 1 addition & 1 deletion task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ def _send_heartbeat_if_needed(self):
# Reset the counter on success
self.failed_heartbeats = 0
except ServerResponseError as e:
if e.response.status_code in {HTTPStatus.NOT_FOUND, HTTPStatus.CONFLICT}:
if e.response.status_code in {HTTPStatus.NOT_FOUND, HTTPStatus.GONE, HTTPStatus.CONFLICT}:
log.error(
"Server indicated the task shouldn't be running anymore",
detail=e.detail,
Expand Down
Loading