Fix duplicate heartbeat-timeout task callbacks#68008
Conversation
|
@hkc-8010 A few things need addressing before review — see our Pull Request quality criteria.
No rush. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
|
Thanks @potiuk. I addressed the static/pre-commit failure in The failed check was caused by Validated locally:
Could you please take another look when you get a chance? |
|
@hkc-8010 there are conflicts to resolve |
ca401fa to
e06ab5e
Compare
Thanks. I've resolved the conflicts. |
jscheffl
left a comment
There was a problem hiding this comment.
Looks good to me. But as I am not an expert on callbacks - is it OK from "state model" if callback is executed async that the state of the task is already set to failed? Or will this be achange to previous executions (that callbacks assumed the tasks are still in "running" state and will this harm callback logic?
|
Thanks @jscheffl, good question. I think this is OK from the state-model side and does not need an extra code change. This makes the heartbeat-timeout path consistent with the existing scheduler path for externally killed tasks: the scheduler creates/sends the The callback itself is still executed asynchronously from the callback request payload. That payload is built before The main issue this PR fixes is that leaving the DB row as |
|
This closes the duplicate-scan window, but it now makes the heartbeat-timeout request the durable callback path for retryable tasks. Since the request is built before |
closes: #42553
Description
When the scheduler detects a running task instance that has stopped heartbeating, it currently builds and enqueues a
TaskCallbackRequest, but the task instance can remain inRUNNINGuntil the callback is processed asynchronously.That leaves a window where the next scheduler heartbeat-timeout scan can find the same task instance attempt again and enqueue another callback for the same try.
This changes the heartbeat-timeout purge path to enqueue the existing callback request, then immediately run the established task-instance failure transition path so the task instance leaves
RUNNINGin the same scheduler pass.Executor cleanup via
executor.change_state(..., FAILED, remove_running=True)is unchanged.Related: #66767 handles callback type metadata for heartbeat-timed-out retries. This PR is scoped to preventing duplicate same-try callback emission.
Tests
uv run --frozen --no-sync pytest --with-db-init airflow-core/tests/unit/jobs/test_scheduler_job.py::TestSchedulerJob::test_heartbeat_timeout_converges_ti_state_before_next_scan -quv run --frozen --no-sync pytest airflow-core/tests/unit/jobs/test_scheduler_job.py::TestSchedulerJob::test_heartbeat_timeout_converges_ti_state_before_next_scan -quv run --frozen --no-sync pytest airflow-core/tests/unit/jobs/test_scheduler_job.py -k "heartbeat_timeout" -quv run --frozen --no-sync pytest airflow-core/tests/unit/jobs/test_scheduler_job.py -qprek run --files airflow-core/src/airflow/jobs/scheduler_job_runner.py airflow-core/tests/unit/jobs/test_scheduler_job.py.venv/bin/python scripts/ci/prek/run_mypy_full_dist_local_venv_or_breeze_in_ci.py airflow-corebreeze testing core-tests --backend postgres --python 3.10 --db-reset -- airflow-core/tests/unit/jobs/test_scheduler_job.py -k "heartbeat_timeout" -qbreeze testing core-tests --backend sqlite --python 3.10 --force-lowest-dependencies --db-reset -- airflow-core/tests/unit/jobs/test_scheduler_job.py -k "heartbeat_timeout" -q