Skip to content

Fix duplicate heartbeat-timeout task callbacks#68008

Open
hkc-8010 wants to merge 1 commit into
apache:mainfrom
hkc-8010:fix/zombie-heartbeat-callback-dedupe
Open

Fix duplicate heartbeat-timeout task callbacks#68008
hkc-8010 wants to merge 1 commit into
apache:mainfrom
hkc-8010:fix/zombie-heartbeat-callback-dedupe

Conversation

@hkc-8010

@hkc-8010 hkc-8010 commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

closes: #42553

Description

When the scheduler detects a running task instance that has stopped heartbeating, it currently builds and enqueues a TaskCallbackRequest, but the task instance can remain in RUNNING until the callback is processed asynchronously.

That leaves a window where the next scheduler heartbeat-timeout scan can find the same task instance attempt again and enqueue another callback for the same try.

This changes the heartbeat-timeout purge path to enqueue the existing callback request, then immediately run the established task-instance failure transition path so the task instance leaves RUNNING in the same scheduler pass.

Executor cleanup via executor.change_state(..., FAILED, remove_running=True) is unchanged.

Related: #66767 handles callback type metadata for heartbeat-timed-out retries. This PR is scoped to preventing duplicate same-try callback emission.

Tests

  • uv run --frozen --no-sync pytest --with-db-init airflow-core/tests/unit/jobs/test_scheduler_job.py::TestSchedulerJob::test_heartbeat_timeout_converges_ti_state_before_next_scan -q
    • Failed before the fix as expected.
  • uv run --frozen --no-sync pytest airflow-core/tests/unit/jobs/test_scheduler_job.py::TestSchedulerJob::test_heartbeat_timeout_converges_ti_state_before_next_scan -q
  • uv run --frozen --no-sync pytest airflow-core/tests/unit/jobs/test_scheduler_job.py -k "heartbeat_timeout" -q
  • uv run --frozen --no-sync pytest airflow-core/tests/unit/jobs/test_scheduler_job.py -q
  • prek run --files airflow-core/src/airflow/jobs/scheduler_job_runner.py airflow-core/tests/unit/jobs/test_scheduler_job.py
  • .venv/bin/python scripts/ci/prek/run_mypy_full_dist_local_venv_or_breeze_in_ci.py airflow-core
  • breeze testing core-tests --backend postgres --python 3.10 --db-reset -- airflow-core/tests/unit/jobs/test_scheduler_job.py -k "heartbeat_timeout" -q
  • breeze testing core-tests --backend sqlite --python 3.10 --force-lowest-dependencies --db-reset -- airflow-core/tests/unit/jobs/test_scheduler_job.py -k "heartbeat_timeout" -q

@hkc-8010 hkc-8010 requested review from XD-DENG and ashb as code owners June 4, 2026 08:48
@boring-cyborg boring-cyborg Bot added the area:Scheduler including HA (high availability) scheduler label Jun 4, 2026
@eladkal eladkal added this to the Airflow 3.3.0 milestone Jun 7, 2026
@eladkal eladkal added the type:bug-fix Changelog: Bug Fixes label Jun 7, 2026
@potiuk

potiuk commented Jun 9, 2026

Copy link
Copy Markdown
Member

@hkc-8010 A few things need addressing before review — see our Pull Request quality criteria.

  • Pre-commit / static checks. See docs.

No rush.

Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@hkc-8010

Copy link
Copy Markdown
Contributor Author

Thanks @potiuk. I addressed the static/pre-commit failure in ca401fa6ba9.

The failed check was caused by generated/provider_dependencies.json.sha256sum being generated without a trailing newline, which made end-of-file-fixer modify it during CI. The follow-up commit writes that checksum file with a trailing newline and adds a focused Breeze regression test for the cache-miss path.

Validated locally:

  • uv run --project dev/breeze pytest dev/breeze/tests/test_provider_dependencies.py -q
  • prek run --files dev/breeze/src/airflow_breeze/utils/provider_dependencies.py dev/breeze/tests/test_provider_dependencies.py
  • prek run end-of-file-fixer --files generated/provider_dependencies.json.sha256sum

Could you please take another look when you get a chance?

@eladkal

eladkal commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

@hkc-8010 there are conflicts to resolve

@hkc-8010 hkc-8010 force-pushed the fix/zombie-heartbeat-callback-dedupe branch from ca401fa to e06ab5e Compare June 12, 2026 11:01
@hkc-8010

Copy link
Copy Markdown
Contributor Author

@hkc-8010 there are conflicts to resolve

Thanks. I've resolved the conflicts.

@jscheffl jscheffl left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. But as I am not an expert on callbacks - is it OK from "state model" if callback is executed async that the state of the task is already set to failed? Or will this be achange to previous executions (that callbacks assumed the tasks are still in "running" state and will this harm callback logic?

@hkc-8010

Copy link
Copy Markdown
Contributor Author

Thanks @jscheffl, good question.

I think this is OK from the state-model side and does not need an extra code change. This makes the heartbeat-timeout path consistent with the existing scheduler path for externally killed tasks: the scheduler creates/sends the TaskCallbackRequest, then calls ti.handle_failure(...) to converge the metastore state.

The callback itself is still executed asynchronously from the callback request payload. That payload is built before handle_failure() and uses the simplified execution API TI datamodel, so it is not passing a live ORM object whose state mutates underneath the callback. If callback code explicitly queries the metastore, it will now see the task as failed, which seems like the correct state once the scheduler has determined the task heartbeat timed out.

The main issue this PR fixes is that leaving the DB row as running after queueing the callback lets the next scheduler heartbeat find the same TI again and enqueue another same-try failure callback. Moving through handle_failure() in the same purge pass closes that gap while keeping the callback asynchronous.

@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Jun 17, 2026
@Vamsi-klu

Copy link
Copy Markdown
Contributor

This closes the duplicate-scan window, but it now makes the heartbeat-timeout request the durable callback path for retryable tasks. Since the request is built before handle_failure() and does not set task_callback_type, a timed-out task with retries left can dispatch on_failure_callback instead of on_retry_callback. The regression test only covers max_tries = 0. Please set task_callback_type from retry eligibility before send_callback() and add a retryable heartbeat-timeout case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler ready for maintainer review Set after triaging when all criteria pass. type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Same task instance attempt is detected as a zombie multiple times and fails without retries

5 participants