From dbdf55f9c6ee701b0f04a4f1131681099b919b01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Sun, 8 Feb 2026 07:36:50 -0300 Subject: [PATCH 1/5] Return 410 Gone for heartbeat when TI was cleared and moved to TIH When a running task instance is cleared, its previous try is archived to the Task Instance History table and the TI receives a new UUID. Subsequent heartbeats from the old process get a 404 because the old UUID no longer exists in the TI table. This change improves the error handling by checking the TIH table when a heartbeat TI is not found. If the UUID exists in TIH, return 410 Gone instead of 404 Not Found, giving the client a more specific signal that the task was cleared rather than never existing. - Server: check TIH on heartbeat NoResultFound, return 410 if found - Supervisor: handle 410 Gone same as 404/409 (terminate process) - Keep 404 for TIs that genuinely never existed closes: #53140 --- .../execution_api/routes/task_instances.py | 22 ++++++++++++ .../versions/head/test_task_instances.py | 35 +++++++++++++++++++ .../airflow/sdk/execution_time/supervisor.py | 2 +- 3 files changed, 58 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 60dd868c2e335..25e37a255789b 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -66,6 +66,7 @@ from airflow.models.dagrun import DagRun as DR from airflow.models.log import Log from airflow.models.taskinstance import TaskInstance as TI, _stop_remaining_tasks +from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH from airflow.models.taskreschedule import TaskReschedule from airflow.models.trigger import Trigger from airflow.models.xcom import XComModel @@ -615,6 +616,9 @@ def ti_skip_downstream( status.HTTP_409_CONFLICT: { "description": "The TI attempting to heartbeat should be terminated for the given reason" }, + status.HTTP_410_GONE: { + "description": "Task Instance not found in the TI table but exists in the Task Instance History table" + }, HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for the state transition"}, }, ) @@ -638,6 +642,24 @@ def ti_heartbeat( "Retrieved current task state", state=previous_state, current_hostname=hostname, current_pid=pid ) except NoResultFound: + # Check if the TI exists in the Task Instance History table. + # If it does, it was likely cleared while running, so return 410 Gone + # instead of 404 Not Found to give the client a more specific signal. + tih_exists = session.scalar( + select(func.count(TIH.task_instance_id)).where(TIH.task_instance_id == ti_id_str) + ) + if tih_exists: + log.error( + "Task Instance not found in TI table but exists in Task Instance History", + ti_id=ti_id_str, + ) + raise HTTPException( + status_code=status.HTTP_410_GONE, + detail={ + "reason": "not_found", + "message": "Task Instance not found, it may have been moved to the Task Instance History table", + }, + ) log.error("Task Instance not found") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 835a8c46139f7..9fad9dac261fe 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -1735,6 +1735,41 @@ def test_ti_heartbeat_non_existent_task(self, client, session, create_task_insta "message": "Task Instance not found", } + def test_ti_heartbeat_cleared_task_returns_410(self, client, session, create_task_instance): + """Test that a 410 error is returned when a TI was cleared and moved to TIH.""" + ti = create_task_instance( + task_id="test_ti_heartbeat_cleared", + state=State.RUNNING, + hostname="random-hostname", + pid=1547, + session=session, + ) + session.commit() + old_ti_id = ti.id + + # Simulate task being cleared: this archives the current try to TIH + # and assigns a new UUID to the TI, mirroring prepare_db_for_next_try(). + ti.prepare_db_for_next_try(session) + session.commit() + + # Pre-condition: old UUID is gone from TI table but exists in TIH + assert session.get(TaskInstance, old_ti_id) is None + tih = session.scalar( + select(TaskInstanceHistory).where(TaskInstanceHistory.task_instance_id == old_ti_id) + ) + assert tih is not None + + response = client.put( + f"/execution/task-instances/{old_ti_id}/heartbeat", + json={"hostname": "random-hostname", "pid": 1547}, + ) + + assert response.status_code == 410 + assert response.json()["detail"] == { + "reason": "not_found", + "message": "Task Instance not found, it may have been moved to the Task Instance History table", + } + @pytest.mark.parametrize( "ti_state", [State.SUCCESS, State.FAILED], diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 9894d4fff3153..39cc8338f8c3a 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1182,7 +1182,7 @@ def _send_heartbeat_if_needed(self): # Reset the counter on success self.failed_heartbeats = 0 except ServerResponseError as e: - if e.response.status_code in {HTTPStatus.NOT_FOUND, HTTPStatus.CONFLICT}: + if e.response.status_code in {HTTPStatus.NOT_FOUND, HTTPStatus.GONE, HTTPStatus.CONFLICT}: log.error( "Server indicated the task shouldn't be running anymore", detail=e.detail, From 83ea20abc6de4980eaa3132d6725e5f5a3b293bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Tue, 17 Feb 2026 05:48:13 -0300 Subject: [PATCH 2/5] Update task_instances.py Co-authored-by: Amogh Desai --- .../airflow/api_fastapi/execution_api/routes/task_instances.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 25e37a255789b..7208180480450 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -650,7 +650,7 @@ def ti_heartbeat( ) if tih_exists: log.error( - "Task Instance not found in TI table but exists in Task Instance History", + "TaskInstance was previously cleared and archived in history, heartbeat skipped", ti_id=ti_id_str, ) raise HTTPException( From 0ba9877c55f20cf2eef7efa8d14a2ded6735320c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Tue, 17 Feb 2026 05:48:23 -0300 Subject: [PATCH 3/5] Update task_instances.py Co-authored-by: Amogh Desai --- .../airflow/api_fastapi/execution_api/routes/task_instances.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 7208180480450..5754e0d272978 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -657,7 +657,7 @@ def ti_heartbeat( status_code=status.HTTP_410_GONE, detail={ "reason": "not_found", - "message": "Task Instance not found, it may have been moved to the Task Instance History table", + "message": "Task Instance not found", }, ) log.error("Task Instance not found") From e5697cb97aeb9deaadca32ba122f813711856b47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Tue, 17 Feb 2026 05:48:32 -0300 Subject: [PATCH 4/5] Update test_task_instances.py Co-authored-by: Amogh Desai --- .../execution_api/versions/head/test_task_instances.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 9fad9dac261fe..21eee99b946e3 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -1752,7 +1752,6 @@ def test_ti_heartbeat_cleared_task_returns_410(self, client, session, create_tas ti.prepare_db_for_next_try(session) session.commit() - # Pre-condition: old UUID is gone from TI table but exists in TIH assert session.get(TaskInstance, old_ti_id) is None tih = session.scalar( select(TaskInstanceHistory).where(TaskInstanceHistory.task_instance_id == old_ti_id) From 0ef9c074ee0213125f0d18d1a1f3b8611c7ced5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Wed, 4 Mar 2026 11:41:23 -0300 Subject: [PATCH 5/5] fix(api): use task_instance_id in heartbeat 410 path and align detail message - Replace undefined ti_id_str with task_instance_id in TIH query and log - Use task_instance_id (UUID) for TIH.task_instance_id comparison - Set 410 Gone detail message to match test expectation --- .../api_fastapi/execution_api/routes/task_instances.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 5754e0d272978..027f28898c851 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -646,18 +646,18 @@ def ti_heartbeat( # If it does, it was likely cleared while running, so return 410 Gone # instead of 404 Not Found to give the client a more specific signal. tih_exists = session.scalar( - select(func.count(TIH.task_instance_id)).where(TIH.task_instance_id == ti_id_str) + select(func.count(TIH.task_instance_id)).where(TIH.task_instance_id == task_instance_id) ) if tih_exists: log.error( "TaskInstance was previously cleared and archived in history, heartbeat skipped", - ti_id=ti_id_str, + ti_id=str(task_instance_id), ) raise HTTPException( status_code=status.HTTP_410_GONE, detail={ "reason": "not_found", - "message": "Task Instance not found", + "message": "Task Instance not found, it may have been moved to the Task Instance History table", }, ) log.error("Task Instance not found")