diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 7e92ab3120a34..fa771eeb9999a 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -3494,6 +3494,7 @@ def _purge_task_instances_without_heartbeats( request, ) self.executor.send_callback(request) + ti.handle_failure(error=str(task_instance_heartbeat_timeout_message_details), session=session) executor = self._try_to_load_executor( ti, session, team_name=dag_id_to_team_name.get(ti.dag_id, NOTSET) ) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 67986c0297ee4..76bfabda2b17a 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -8308,6 +8308,38 @@ def test_scheduler_passes_context_from_server_on_heartbeat_timeout(self, dag_mak assert callback_request.context_from_server.dag_run.logical_date == dag_run.logical_date assert callback_request.context_from_server.max_tries == ti.max_tries + def test_heartbeat_timeout_converges_ti_state_before_next_scan(self, dag_maker, session): + """A heartbeat-timed-out TI should not be found again on the next scheduler scan.""" + with dag_maker(dag_id="test_heartbeat_timeout_dedupe", session=session): + EmptyOperator(task_id="test_task", on_failure_callback=lambda context: None) + + dag_run = dag_maker.create_dagrun(run_id="test_run", state=DagRunState.RUNNING) + + mock_executor = MagicMock() + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(scheduler_job, executors=[mock_executor]) + + ti = dag_run.get_task_instance(task_id="test_task") + ti.state = TaskInstanceState.RUNNING + ti.try_number = 1 + ti.max_tries = 0 + ti.queued_by_job_id = scheduler_job.id + ti.start_date = timezone.utcnow() - timedelta(seconds=900) + ti.last_heartbeat_at = timezone.utcnow() - timedelta(seconds=600) + session.merge(ti) + session.commit() + + self.job_runner._find_and_purge_task_instances_without_heartbeats() + + session.expire_all() + ti.refresh_from_db(session=session) + assert ti.state != TaskInstanceState.RUNNING + assert self.job_runner._find_task_instances_without_heartbeats(session=session) == [] + + self.job_runner._find_and_purge_task_instances_without_heartbeats() + + mock_executor.send_callback.assert_called_once() + @pytest.mark.parametrize( ("retries", "callback_kind", "expected"), [