Clean up CeleryExecutor to use workload terminology and typing#63888
Clean up CeleryExecutor to use workload terminology and typing#63888SameerMesiah97 wants to merge 1 commit intoapache:mainfrom
Conversation
…ing to align with the workload-based executor model.
8e2dea8 to
29a855f
Compare
ferruzzi
left a comment
There was a problem hiding this comment.
Really nice. I left a couple comments, but it's a great start
| if AIRFLOW_V_3_0_PLUS: | ||
| # TODO: TaskSDK: move this type change into BaseExecutor | ||
| queued_tasks: dict[TaskInstanceKey, workloads.All] # type: ignore[assignment] | ||
| queued_tasks: dict[WorkloadKey, workloads.All] # type: ignore[assignment] |
There was a problem hiding this comment.
I think there's a miss here. You are importing WorkloadKey if version is over 3.2 up above, but using it here if airflow version is 3.0. What about using this as the import block?
# Remove this conditional once min version > 3.2
try:
from airflow.executors.workloads.types import WorkloadKey
except ImportError:
from airflow.models.taskinstancekey import TaskInstanceKey as WorkloadKey
I know there's some community debate over using try/catch on imports, but I think this feels like the right time to use one.
((I think the same comment goes for celery_executor_utils.py as well))
| assert len(executor.queued_tasks) == 1, "Workload should remain in queue" | ||
| assert executor.event_buffer == {} | ||
| assert f"[Try 1 of 3] Task Timeout Error for Task: ({key})." in caplog.text | ||
| assert f"[Try 1 of 3] Celery Task Timeout Error for Workload: ({key})." in caplog.text |
There was a problem hiding this comment.
Here and below, it looks like you changed the expected message in the tests but didn't actually change the message log in the code?
| # Backward compatibility alias | ||
| send_task_to_executor = send_workload_to_executor |
There was a problem hiding this comment.
I think we still need this? If not, then we need to update references to it such as (but not limited to) celery/provider.yaml
|
|
||
| def update_task_state(self, key: TaskInstanceKey, state: str, info: Any) -> None: | ||
| """Update state of a single task.""" | ||
| def update_workload_state(self, key: WorkloadKey, state: str, info: Any) -> None: |
There was a problem hiding this comment.
Double check me, but I don't believe callbacks support this part (yet?), do they? If not, then maybe we should leave this one for now? With this change it makes the method look like it properly handles both types when it doesn't.
Description
This PR is a follow-up for PR #61153. It performs a non-functional cleanup of the Celery executor to better align it with the workload-based execution model being introduced in Airflow 3.2.
The changes include:
WorkloadKey,WorkloadInCelery,workloads.All).Method renames were applied selectively:
BaseExecutor/ scheduler-invoked interfaces were left unchanged to preserve compatibilityNo behavioral or functional changes are introduced.
Rationale
The Celery executor already supports workload-based execution, but parts of the implementation still reflect legacy task-centric terminology. This PR standardizes naming, typing, and documentation to improve consistency and readability.
Method names were updated where safe (i.e. internal helpers not part of public or inherited interfaces). Methods that mirror
BaseExecutor, are invoked by the scheduler, or are tightly coupled to Celery internals were intentionally left unchanged to avoid breaking contracts.Variables were renamed from “task” to “workload” where they represent Airflow execution units. Fields inherited from
BaseExecutor(e.g.queued_tasks) and Celery-specific concepts (e.g.task_id) were preserved to maintain compatibility and clarity.Docstrings and comments were updated to consistently describe workloads while still distinguishing between Airflow workloads and Celery tasks where necessary.
Typing was improved by preferring workload-based types while retaining legacy aliases required for compatibility across Airflow versions.
Notes
The
CeleryKubernetesExecutorhas not been altered (save for atype: ignoreto satisfy mypy) in this PR as it is now deprecated according to Issue Executor Callback support #62887Metrics (
Stats.incr) and configuration paths were intentionally left unchanged, as they form part of Airflow’s external/operational interface (e.g. dashboards, alerts, and user configs). Renaming these could introduce unintended breaking changes or disrupt existing observability. This change is therefore scoped to internal naming, typing, and documentation updates only, without altering behavior or externally visible contracts.Related