feat(kubernetes): add executor callback support to KubernetesExecutor#63454
Draft
sjyangkevin wants to merge 4 commits intoapache:mainfrom
Draft
feat(kubernetes): add executor callback support to KubernetesExecutor#63454sjyangkevin wants to merge 4 commits intoapache:mainfrom
sjyangkevin wants to merge 4 commits intoapache:mainfrom
Conversation
Run synchronous executor callbacks (e.g. deadline alerts) as Kubernetes pods, using the same pod pipeline as task execution. Callbacks are dispatched via annotation-based key discrimination in the watcher, and their pod exit code maps to CallbackState.SUCCESS/FAILED. Also extends execute_workload.py (task-sdk) to handle ExecuteCallback workloads inside pods, making it the unified entrypoint for both tasks and callbacks in containerised executors. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ferruzzi
reviewed
Mar 18, 2026
Contributor
ferruzzi
left a comment
There was a problem hiding this comment.
Didn't get through the whole thing, but left a few notes for you to work on for now. Please ping me when you are done and I'll make another pass
| from airflow.executors.workloads import ExecuteCallback, ExecuteTask | ||
| from airflow.utils.state import CallbackState | ||
|
|
||
| # Airflow V3 version |
Contributor
There was a problem hiding this comment.
Please do me a favor and drop this comment while you are in here.
Suggested change
| # Airflow V3 version |
| from airflow.utils.state import CallbackState | ||
|
|
||
| # Airflow V3 version | ||
| for w in workloads: |
Contributor
There was a problem hiding this comment.
I know this was existing code, but can you please rename w to workload for me, we should be avoiding one-letter variable names.
| key = task.key | ||
| self.kube_scheduler.run_next(task) | ||
| self.task_publish_retries.pop(key, None) | ||
| if not isinstance(key, str): |
Contributor
There was a problem hiding this comment.
if not isinstance(key, str):
Should that be
if isinstance(key, TaskInstanceKey):
both here and below? or perhaps
if not isinstance(key, WorkloadKey):
?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
related: #62887
Summary
This PR implements executor callback support for KubernetesExecutor, continuing the work started in #61153 and #62645. Synchronous callbacks are routed to an executor to run as workloads.
Approach
Callbacks run as Kubernetes pods, the same isolation model used for tasks. The workload (an ExecuteCallback JSON payload) is passed to the
execute_workloadentrypoint that tasks use, and the pod's exit code determines success or failure.The implementation threads callbacks through the existing executor pipeline with annotation-based discrimination to distinguish callback pods from task pods:
callback_idannotation instead ofdag_id/task_id/try_number_change_state()dispatches on key type (str = callback, TaskInstanceKey = task)Callback pod lifecycle
Currently
execute_workload.pycallsexecute_callback_workload()inline in the pod process. Oncesupervise_callback()from (#62645) is merged, need to update_execute_callback()inexecute_workload.pyFollow-ups
Was generative AI tooling used to co-author this PR?
Generated-by: [Claude Code (claude-opus-4-6)] following the guidelines
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.