Move ExecutorCallback execution into a supervised process#62645
Move ExecutorCallback execution into a supervised process#62645ferruzzi wants to merge 19 commits intoapache:mainfrom
Conversation
94ae012 to
1be98c8
Compare
|
Failures don't feel related and don't give any obvious error message, I'll try rebasing tomorrow and see if that clears them |
|
in-deed, these three tests are broken on main. Hope they are fixed soon, else check #ci-cd channel in Slack. |
jscheffl
left a comment
There was a problem hiding this comment.
Okay for Edge (just a doc change), would prefer the core SDK experts making the review.
c449451 to
343ee26
Compare
There was a problem hiding this comment.
Pull request overview
This PR moves synchronous ExecutorCallback execution into a supervised subprocess (similar to task execution), and generalizes executor workload handling so tasks and callbacks share a unified flow across LocalExecutor and CeleryExecutor.
Changes:
- Introduces a callback supervisor (
supervise_callback) and migrates callback execution to run in a supervised subprocess. - Adds generic workload metadata (
key,display_name,success_state,failure_state) to unify executor handling for tasks and callbacks. - Aligns “workload finished” logging and updates executor/provider integrations and tests accordingly.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| task-sdk/tests/task_sdk/execution_time/test_supervisor.py | Updates expected supervisor log fields to the new “Workload finished” format. |
| task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py | Adds unit tests for callback importing/execution behavior. |
| task-sdk/src/airflow/sdk/execution_time/supervisor.py | Renames “Task finished” log event to “Workload finished” and adds workload identifiers. |
| task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py | Adds supervised callback execution implementation and subprocess wrapper. |
| providers/edge3/src/airflow/providers/edge3/cli/worker.py | Updates a reference comment to the renamed LocalExecutor helper. |
| providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py | Switches callback workload execution to supervise_callback. |
| airflow-core/tests/unit/executors/test_local_executor.py | Updates tests for renamed LocalExecutor helper and adds callback supervisor invocation tests. |
| airflow-core/tests/unit/executors/test_base_executor.py | Migrates callback execution tests to use the new execute_callback helper. |
| airflow-core/src/airflow/executors/workloads/task.py | Adds generic workload properties to ExecuteTask. |
| airflow-core/src/airflow/executors/workloads/callback.py | Adds generic workload properties to ExecuteCallback and removes the old inline callback executor. |
| airflow-core/src/airflow/executors/workloads/base.py | Introduces abstract workload interface (key, display_name, success/failure states). |
| airflow-core/src/airflow/executors/workloads/init.py | Adds ExecutorWorkload union for executor-supported workload types. |
| airflow-core/src/airflow/executors/local_executor.py | Unifies task/callback execution into _execute_workload and runs callbacks via supervise_callback. |
| airflow-core/src/airflow/executors/base_executor.py | Updates typing to use ExecutorWorkload for workload queuing/scheduling. |
o-nikolas
left a comment
There was a problem hiding this comment.
Left a few comments, on the whole it looks reasonable to me. But @ashb or @amoghrajesh should definitely look at the supervisor portion.
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
Outdated
Show resolved
Hide resolved
94e0828 to
da22f0d
Compare
e77f801 to
eab540d
Compare
|
@ferruzzi Does the sync deadline callbacks work with Connections defined in DB yet? |
Nope, that needs this PR to be finished, then we need to add a client/comms channel to the supervisor so it can access things like Variables, Connections, Xcom, etc |
eab540d to
32d06f1
Compare
| output.put((workload.key, workload.success_state, None)) | ||
| except Exception as e: | ||
| log.exception("Workload execution failed.", workload_type=type(workload).__name__) | ||
| output.put((workload.key, workload.failure_state, e)) |
There was a problem hiding this comment.
Using .pop(workload.key, None) on both queued_tasks and queued_callbacks for every workload silently swallows missing keys. The old code used del which would raise KeyError if a workload was never queued or got dequeued twice -- surfacing logic bugs rather than hiding them.
There was a problem hiding this comment.
Hm. I can see where you are going. I don't think it's quite that simple though. with the two pops how I have it, one of them will pop and the other is expected to be a no-op. a task will pop from queued_tasks and the queued_callbacks pop will just not do anything, and vice versa. A del would have a problem with the no-op side of that.
But you are right that I missed the case where it's not in either. How do you feel about this:
for workload in workload_list:
self.activity_queue.put(workload)
# A valid workload will exist in exactly one of these dicts.
# One will succeed, the other will fail gracefully and return None.
removed = self.queued_tasks.pop(workload.key, None) or self.queued_callbacks.pop(workload.key, None)
if not removed:
raise KeyError(f"Workload {workload.key} was not found in any queue.")There was a problem hiding this comment.
I implemented that in e558b31, if you have another idea I can revert it
32d06f1 to
30716d0
Compare
…e entire flow more generic to account for future workload types
…ugh to the scheduler
30716d0 to
845db57
Compare
|
@kaxil - I think I addressed all of your comments. Let me know what you think |
| if accepts_context(result): | ||
| result = result(**callback_kwargs) | ||
| else: | ||
| result = result(**kwargs_without_context) | ||
|
|
There was a problem hiding this comment.
The old code called result(context) positionally. The new code calls result(**callback_kwargs), which passes all kwargs.
For BaseNotifier.__call__(self, *args), keyword arguments raise TypeError. The test's CallableClass.__call__(self, context) also breaks: instance(msg="alert") gets "unexpected keyword argument 'msg'" since __call__ expects context, not msg.
test_callable_class_pattern should fail for this reason.
Suggest preserving the old positional call:
context = callback_kwargs.get("context")
if callable(result):
result = result(context)There was a problem hiding this comment.
The old code would dump any intentionally-passed kwargs though, right? That can't be right. It should pass context if it's allowed, not ONLY pass context, should it? For more details, let's consolidate the discussion in the next comment where I put a proposed solution and a lot more details already
| return True | ||
| params = sig.parameters | ||
| return "context" in params or any( | ||
| p.kind in (inspect.Parameter.VAR_KEYWORD, inspect.Parameter.VAR_POSITIONAL) for p in params.values() |
There was a problem hiding this comment.
Adding VAR_POSITIONAL (*args) is a behavioral change from the original _accepts_context, which only matched named context or **kwargs.
CallbackTrigger.run() calls callback(**self.callback_kwargs, context=context) when accepts_context returns True. A function with only *args can't accept keyword arguments, so this would raise TypeError. Previously safe because _accepts_context didn't match *args.
There was a problem hiding this comment.
Adding VAR_POSITIONAL (*args) is a behavioral change from the original _accepts_context, which only matched named context or **kwargs.
That's fine. It's not released yet and it's an experimental feature. If it was implemented wrong, this is the time to fix it. The same with only passing context and dropping the rest of the kwargs. If the user provided kwargs then those need to be passed through. The only question is if it supports getting context or not in addition, right? We (Airflow) are appending that Context, whether they expect it or not, so we need to strip it back out if it would break things.
I can drop the VAR_POSITIONAL, I think you are right that this isn't the answer... what if we use a try/except to attempt with kwargs, catch the TypeError if it's positional-only, and retry it positionally? Something like:
kwargs_without_context = {k: v for k, v in callback_kwargs.items() if k != "context"}
# Call the callable with all kwargs if it accepts context, otherwise strip context.
if accepts_context(callback_callable):
result = callback_callable(**callback_kwargs)
else:
result = callback_callable(**kwargs_without_context)
# If the callback was a class then it is now instantiated and callable, call it.
# Try keyword args first. If the callable only accepts positional args (like
# BaseNotifier.__call__(self, *args)), fall back to passing context positionally.
if callable(result):
try:
if accepts_context(result):
result = result(**callback_kwargs)
else:
result = result(**kwargs_without_context)
except TypeError:
context = callback_kwargs.get("context")
if context is not None:
result = result(context)
else:
result = result()This is pretty complicated, so I used Claude to generate a list with every permutation and this is what it came up with, assuming we drop VAR_POSITIONAL and use the try_with_kwargs/except/retry_without_kwargs:
Step 1 — Initial call (function or class instantiation):
Accepts context (called with all callback_kwargs):
- def my_func(context) — ✓
- def my_func(context=None) — ✓
- def my_func(**kwargs) — ✓
- def my_func(message, context=None) — ✓
- class MyClass.init(self, context=None) — ✓
- class MyClass.init(self, text, context=None) — ✓
- class MyClass.init(self, **kwargs) — ✓
Does not accept context (called with callback_kwargs minus context):
- def my_func() — ✓
- def my_func(message) — ✓ (if kwarg names match)
- class MyClass.init(self) — ✓
- class MyClass.init(self, text) — ✓ (if kwarg names match)
Step 2 — Callable-class call (try keyword call, except fallback to positional):
Try succeeds — accepts context (called with all callback_kwargs):
- call(self, context) — ✓
- call(self, context=None) — ✓
- call(self, **kwargs) — ✓
- call(self, message, context=None) — ✓ gets both message and context
- call(self, *args, **kwargs) — ✓ gets everything
Try succeeds — does not accept context (called with callback_kwargs minus context):
- call(self, message) — ✓ gets message (if kwarg names match)
- call(self) with no non-context kwargs — ✓ called with no args
TypeError fallback (keyword call failed, retry positionally):
- call(self, *args) with only context in kwargs — ✓ gets context positionally
- call(self, *args) with context + other kwargs — context passed positionally, OTHER KWARGS DROPPED (unavoidable — no way to know positional order)
- call(self) with non-context kwargs — ✓ falls back to result() or result(context)
The one lossy case is *args-only call with non-context kwargs. Those kwargs are dropped because we can't know the positional order. This is the best-effort tradeoff.
|
|
||
|
|
||
| def supervise( | ||
| def supervise_task( |
There was a problem hiding this comment.
Edge3 provider still calls supervise() directly (only a comment was updated in this PR), so it'll trigger the deprecation warning immediately after merge.
Consider keeping supervise as the task-specific function (no rename, no deprecation) and adding supervise_workload + supervise_callback alongside it. Less churn, no breakage.
There was a problem hiding this comment.
I think I disagree on this one. That's how deprecation warnings should work, right? It signals to the provider that they should update their call from supervise() to supervise_task(). @wjddn279 already has a draft PR at #63498 to update Edge to use the new API, and is waiting for this to be finalized so we can get that adjusted and merged.Z
We have community members volunteered to update all of the executors with PRs in draft waiting for this, so it should be a near-immediate adoption once this gets merged. Full list here: #62887
task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
Outdated
Show resolved
Hide resolved
- move bundle loading into start() - add _make_process_nondumpable to supervise_callback - fix context passing to callback_callable - revert adding *args check to accepts_context - parameterize tests - top-leveled some imports
Move ExecutorCallback execution into a supervised process and make the entire flow more generic to account for future workload types
key,display_name,success_state, andfailure_statefields)Changes applied to both the LocalExecutor and the Celery Executor (the only executors that currently support synchronous callbacks in the worker)
Promised follow-up to #61153
promised tags for @jason810496 @ashb @amoghrajesh
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.