Skip to content

Move ExecutorCallback execution into a supervised process#62645

Open
ferruzzi wants to merge 19 commits intoapache:mainfrom
aws-mwaa:ferruzzi/executor-callbacks/supervisor
Open

Move ExecutorCallback execution into a supervised process#62645
ferruzzi wants to merge 19 commits intoapache:mainfrom
aws-mwaa:ferruzzi/executor-callbacks/supervisor

Conversation

@ferruzzi
Copy link
Contributor

Move ExecutorCallback execution into a supervised process and make the entire flow more generic to account for future workload types

  • Generalized base workload class which both tasks and callbacks inherit (Added key, display_name, success_state, and failure_state fields)
  • Created a callback supervisor
  • Simplified/unified the shared workflow between tasks and callbacks
  • Aligned logging messages

Changes applied to both the LocalExecutor and the Celery Executor (the only executors that currently support synchronous callbacks in the worker)

Promised follow-up to #61153
promised tags for @jason810496 @ashb @amoghrajesh


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@ferruzzi
Copy link
Contributor Author

ferruzzi commented Mar 1, 2026

Failures don't feel related and don't give any obvious error message, I'll try rebasing tomorrow and see if that clears them

@jscheffl
Copy link
Contributor

jscheffl commented Mar 1, 2026

in-deed, these three tests are broken on main. Hope they are fixed soon, else check #ci-cd channel in Slack.

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay for Edge (just a doc change), would prefer the core SDK experts making the review.

@ferruzzi ferruzzi force-pushed the ferruzzi/executor-callbacks/supervisor branch 2 times, most recently from c449451 to 343ee26 Compare March 2, 2026 19:56
@o-nikolas o-nikolas requested a review from Copilot March 2, 2026 20:36
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR moves synchronous ExecutorCallback execution into a supervised subprocess (similar to task execution), and generalizes executor workload handling so tasks and callbacks share a unified flow across LocalExecutor and CeleryExecutor.

Changes:

  • Introduces a callback supervisor (supervise_callback) and migrates callback execution to run in a supervised subprocess.
  • Adds generic workload metadata (key, display_name, success_state, failure_state) to unify executor handling for tasks and callbacks.
  • Aligns “workload finished” logging and updates executor/provider integrations and tests accordingly.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
task-sdk/tests/task_sdk/execution_time/test_supervisor.py Updates expected supervisor log fields to the new “Workload finished” format.
task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py Adds unit tests for callback importing/execution behavior.
task-sdk/src/airflow/sdk/execution_time/supervisor.py Renames “Task finished” log event to “Workload finished” and adds workload identifiers.
task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py Adds supervised callback execution implementation and subprocess wrapper.
providers/edge3/src/airflow/providers/edge3/cli/worker.py Updates a reference comment to the renamed LocalExecutor helper.
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py Switches callback workload execution to supervise_callback.
airflow-core/tests/unit/executors/test_local_executor.py Updates tests for renamed LocalExecutor helper and adds callback supervisor invocation tests.
airflow-core/tests/unit/executors/test_base_executor.py Migrates callback execution tests to use the new execute_callback helper.
airflow-core/src/airflow/executors/workloads/task.py Adds generic workload properties to ExecuteTask.
airflow-core/src/airflow/executors/workloads/callback.py Adds generic workload properties to ExecuteCallback and removes the old inline callback executor.
airflow-core/src/airflow/executors/workloads/base.py Introduces abstract workload interface (key, display_name, success/failure states).
airflow-core/src/airflow/executors/workloads/init.py Adds ExecutorWorkload union for executor-supported workload types.
airflow-core/src/airflow/executors/local_executor.py Unifies task/callback execution into _execute_workload and runs callbacks via supervise_callback.
airflow-core/src/airflow/executors/base_executor.py Updates typing to use ExecutorWorkload for workload queuing/scheduling.

Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments, on the whole it looks reasonable to me. But @ashb or @amoghrajesh should definitely look at the supervisor portion.

@ferruzzi ferruzzi force-pushed the ferruzzi/executor-callbacks/supervisor branch 2 times, most recently from 94e0828 to da22f0d Compare March 3, 2026 06:38
@ferruzzi ferruzzi force-pushed the ferruzzi/executor-callbacks/supervisor branch 2 times, most recently from e77f801 to eab540d Compare March 23, 2026 21:43
@kaxil
Copy link
Member

kaxil commented Mar 24, 2026

@ferruzzi Does the sync deadline callbacks work with Connections defined in DB yet?

@ferruzzi
Copy link
Contributor Author

@ferruzzi Does the sync deadline callbacks work with Connections defined in DB yet?

Nope, that needs this PR to be finished, then we need to add a client/comms channel to the supervisor so it can access things like Variables, Connections, Xcom, etc

@ferruzzi ferruzzi force-pushed the ferruzzi/executor-callbacks/supervisor branch from eab540d to 32d06f1 Compare March 24, 2026 22:36
output.put((workload.key, workload.success_state, None))
except Exception as e:
log.exception("Workload execution failed.", workload_type=type(workload).__name__)
output.put((workload.key, workload.failure_state, e))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using .pop(workload.key, None) on both queued_tasks and queued_callbacks for every workload silently swallows missing keys. The old code used del which would raise KeyError if a workload was never queued or got dequeued twice -- surfacing logic bugs rather than hiding them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I can see where you are going. I don't think it's quite that simple though. with the two pops how I have it, one of them will pop and the other is expected to be a no-op. a task will pop from queued_tasks and the queued_callbacks pop will just not do anything, and vice versa. A del would have a problem with the no-op side of that.

But you are right that I missed the case where it's not in either. How do you feel about this:

for workload in workload_list:
    self.activity_queue.put(workload)
    # A valid workload will exist in exactly one of these dicts.
    # One will succeed, the other will fail gracefully and return None.
    removed = self.queued_tasks.pop(workload.key, None) or self.queued_callbacks.pop(workload.key, None)
    if not removed:
        raise KeyError(f"Workload {workload.key} was not found in any queue.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented that in e558b31, if you have another idea I can revert it

@ferruzzi ferruzzi force-pushed the ferruzzi/executor-callbacks/supervisor branch from 32d06f1 to 30716d0 Compare March 26, 2026 00:01
@ferruzzi ferruzzi force-pushed the ferruzzi/executor-callbacks/supervisor branch from 30716d0 to 845db57 Compare March 26, 2026 00:03
@ferruzzi
Copy link
Contributor Author

@kaxil - I think I addressed all of your comments. Let me know what you think

Comment on lines +99 to +103
if accepts_context(result):
result = result(**callback_kwargs)
else:
result = result(**kwargs_without_context)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code called result(context) positionally. The new code calls result(**callback_kwargs), which passes all kwargs.

For BaseNotifier.__call__(self, *args), keyword arguments raise TypeError. The test's CallableClass.__call__(self, context) also breaks: instance(msg="alert") gets "unexpected keyword argument 'msg'" since __call__ expects context, not msg.

test_callable_class_pattern should fail for this reason.

Suggest preserving the old positional call:

context = callback_kwargs.get("context")
if callable(result):
    result = result(context)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code would dump any intentionally-passed kwargs though, right? That can't be right. It should pass context if it's allowed, not ONLY pass context, should it? For more details, let's consolidate the discussion in the next comment where I put a proposed solution and a lot more details already

return True
params = sig.parameters
return "context" in params or any(
p.kind in (inspect.Parameter.VAR_KEYWORD, inspect.Parameter.VAR_POSITIONAL) for p in params.values()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding VAR_POSITIONAL (*args) is a behavioral change from the original _accepts_context, which only matched named context or **kwargs.

CallbackTrigger.run() calls callback(**self.callback_kwargs, context=context) when accepts_context returns True. A function with only *args can't accept keyword arguments, so this would raise TypeError. Previously safe because _accepts_context didn't match *args.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding VAR_POSITIONAL (*args) is a behavioral change from the original _accepts_context, which only matched named context or **kwargs.

That's fine. It's not released yet and it's an experimental feature. If it was implemented wrong, this is the time to fix it. The same with only passing context and dropping the rest of the kwargs. If the user provided kwargs then those need to be passed through. The only question is if it supports getting context or not in addition, right? We (Airflow) are appending that Context, whether they expect it or not, so we need to strip it back out if it would break things.

I can drop the VAR_POSITIONAL, I think you are right that this isn't the answer... what if we use a try/except to attempt with kwargs, catch the TypeError if it's positional-only, and retry it positionally? Something like:

kwargs_without_context = {k: v for k, v in callback_kwargs.items() if k != "context"}

# Call the callable with all kwargs if it accepts context, otherwise strip context.
if accepts_context(callback_callable):
    result = callback_callable(**callback_kwargs)
else:
    result = callback_callable(**kwargs_without_context)

# If the callback was a class then it is now instantiated and callable, call it.
# Try keyword args first. If the callable only accepts positional args (like
# BaseNotifier.__call__(self, *args)), fall back to passing context positionally.
if callable(result):
    try:
        if accepts_context(result):
            result = result(**callback_kwargs)
        else:
            result = result(**kwargs_without_context)
    except TypeError:
        context = callback_kwargs.get("context")
        if context is not None:
            result = result(context)
        else:
            result = result()

This is pretty complicated, so I used Claude to generate a list with every permutation and this is what it came up with, assuming we drop VAR_POSITIONAL and use the try_with_kwargs/except/retry_without_kwargs:

Step 1 — Initial call (function or class instantiation):

Accepts context (called with all callback_kwargs):

  • def my_func(context) — ✓
  • def my_func(context=None) — ✓
  • def my_func(**kwargs) — ✓
  • def my_func(message, context=None) — ✓
  • class MyClass.init(self, context=None) — ✓
  • class MyClass.init(self, text, context=None) — ✓
  • class MyClass.init(self, **kwargs) — ✓

Does not accept context (called with callback_kwargs minus context):

  • def my_func() — ✓
  • def my_func(message) — ✓ (if kwarg names match)
  • class MyClass.init(self) — ✓
  • class MyClass.init(self, text) — ✓ (if kwarg names match)

Step 2 — Callable-class call (try keyword call, except fallback to positional):

Try succeeds — accepts context (called with all callback_kwargs):

  • call(self, context) — ✓
  • call(self, context=None) — ✓
  • call(self, **kwargs) — ✓
  • call(self, message, context=None) — ✓ gets both message and context
  • call(self, *args, **kwargs) — ✓ gets everything

Try succeeds — does not accept context (called with callback_kwargs minus context):

  • call(self, message) — ✓ gets message (if kwarg names match)
  • call(self) with no non-context kwargs — ✓ called with no args

TypeError fallback (keyword call failed, retry positionally):

  • call(self, *args) with only context in kwargs — ✓ gets context positionally
  • call(self, *args) with context + other kwargs — context passed positionally, OTHER KWARGS DROPPED (unavoidable — no way to know positional order)
  • call(self) with non-context kwargs — ✓ falls back to result() or result(context)

The one lossy case is *args-only call with non-context kwargs. Those kwargs are dropped because we can't know the positional order. This is the best-effort tradeoff.



def supervise(
def supervise_task(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edge3 provider still calls supervise() directly (only a comment was updated in this PR), so it'll trigger the deprecation warning immediately after merge.

Consider keeping supervise as the task-specific function (no rename, no deprecation) and adding supervise_workload + supervise_callback alongside it. Less churn, no breakage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I disagree on this one. That's how deprecation warnings should work, right? It signals to the provider that they should update their call from supervise() to supervise_task(). @wjddn279 already has a draft PR at #63498 to update Edge to use the new API, and is waiting for this to be finalized so we can get that adjusted and merged.Z

We have community members volunteered to update all of the executors with PRs in draft waiting for this, so it should be a near-immediate adoption once this gets merged. Full list here: #62887

- move bundle loading into start()
- add _make_process_nondumpable to supervise_callback
- fix context passing to callback_callable
- revert adding *args check to accepts_context
- parameterize tests
- top-leveled some imports
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Executors-core LocalExecutor & SequentialExecutor area:providers area:task-sdk provider:celery provider:edge Edge Executor / Worker (AIP-69) / edge3

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants