Skip to content

feat(kubernetes): add executor callback support to KubernetesExecutor#63454

Draft
sjyangkevin wants to merge 4 commits intoapache:mainfrom
sjyangkevin:feat/kubernetes-executor-callback-support
Draft

feat(kubernetes): add executor callback support to KubernetesExecutor#63454
sjyangkevin wants to merge 4 commits intoapache:mainfrom
sjyangkevin:feat/kubernetes-executor-callback-support

Conversation

@sjyangkevin
Copy link
Contributor

related: #62887

Summary

This PR implements executor callback support for KubernetesExecutor, continuing the work started in #61153 and #62645. Synchronous callbacks are routed to an executor to run as workloads.

Approach

Callbacks run as Kubernetes pods, the same isolation model used for tasks. The workload (an ExecuteCallback JSON payload) is passed to the execute_workload entrypoint that tasks use, and the pod's exit code determines success or failure.

The implementation threads callbacks through the existing executor pipeline with annotation-based discrimination to distinguish callback pods from task pods:

  • Callback pods carry a callback_id annotation instead of dag_id/task_id/try_number
  • The watcher branches on this annotation to extract the right key type
  • _change_state() dispatches on key type (str = callback, TaskInstanceKey = task)

Callback pod lifecycle

  Scheduler
    └── _process_workloads(ExecuteCallback)
          └── task_queue.put(KubernetesJob(callback_key, [workload], None, None))

  AirflowKubernetesScheduler.run_next()
    └── _run_next_callback()
          └── pod: python -m airflow.sdk.execution_time.execute_workload
                     --json-string <ExecuteCallback JSON>
                     annotations: {callback_id: <uuid>}
                     labels: {airflow-worker: <job_id>}

  KubernetesJobWatcher  (sees callback_id annotation → passes CallbackKey)
    └── result_queue.put(KubernetesResults(callback_key, state, ...))

  KubernetesExecutor._change_state()
    └── isinstance(key, str) → _change_callback_state()
          └── event_buffer[key] = CallbackState.SUCCESS / FAILED

Currently execute_workload.py calls execute_callback_workload() inline in the pod process. Once supervise_callback() from (#62645) is merged, need to update _execute_callback() in execute_workload.py

Follow-ups

  • Pod adoption on scheduler restart: Orphaned callback pods after a scheduler restart are not currently adopted (unlike task pods).
  • Callback revocation: No revoke_callback() support yet — callbacks can't be cancelled mid-flight.
  • supervise_callback() integration: Pending Move ExecutorCallback execution into a supervised process #62645; see above.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: [Claude Code (claude-opus-4-6)] following the guidelines


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

sjyangkevin and others added 4 commits March 12, 2026 10:58
Run synchronous executor callbacks (e.g. deadline alerts) as Kubernetes
pods, using the same pod pipeline as task execution. Callbacks are
dispatched via annotation-based key discrimination in the watcher, and
their pod exit code maps to CallbackState.SUCCESS/FAILED.

Also extends execute_workload.py (task-sdk) to handle ExecuteCallback
workloads inside pods, making it the unified entrypoint for both tasks
and callbacks in containerised executors.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@boring-cyborg boring-cyborg bot added area:providers area:task-sdk provider:cncf-kubernetes Kubernetes (k8s) provider related issues labels Mar 12, 2026
@ferruzzi ferruzzi mentioned this pull request Mar 12, 2026
2 tasks
@ferruzzi ferruzzi self-requested a review March 13, 2026 19:05
Copy link
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't get through the whole thing, but left a few notes for you to work on for now. Please ping me when you are done and I'll make another pass

from airflow.executors.workloads import ExecuteCallback, ExecuteTask
from airflow.utils.state import CallbackState

# Airflow V3 version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do me a favor and drop this comment while you are in here.

Suggested change
# Airflow V3 version

from airflow.utils.state import CallbackState

# Airflow V3 version
for w in workloads:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this was existing code, but can you please rename w to workload for me, we should be avoiding one-letter variable names.

key = task.key
self.kube_scheduler.run_next(task)
self.task_publish_retries.pop(key, None)
if not isinstance(key, str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not isinstance(key, str):

Should that be

if isinstance(key, TaskInstanceKey):

both here and below? or perhaps

if not isinstance(key, WorkloadKey):

?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers area:task-sdk provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants