Skip to content

Clean up CeleryExecutor to use workload terminology and typing#63888

Open
SameerMesiah97 wants to merge 1 commit intoapache:mainfrom
SameerMesiah97:CeleryExecutor-Cleanup
Open

Clean up CeleryExecutor to use workload terminology and typing#63888
SameerMesiah97 wants to merge 1 commit intoapache:mainfrom
SameerMesiah97:CeleryExecutor-Cleanup

Conversation

@SameerMesiah97
Copy link
Contributor

@SameerMesiah97 SameerMesiah97 commented Mar 18, 2026

Description

This PR is a follow-up for PR #61153. It performs a non-functional cleanup of the Celery executor to better align it with the workload-based execution model being introduced in Airflow 3.2.

The changes include:

  • Renaming task-centric methods and variables to use workload-oriented terminology where appropriate.
  • Updating type annotations to consistently use workload abstractions (e.g. WorkloadKey, WorkloadInCelery, workloads.All).
  • Improving docstrings and inline comments to reflect workload-based semantics.
  • Minor naming improvements to reduce ambiguity.
  • Adding missing periods to, fixing capitilization in, and rephrasing docstrings and comments where needed.

Method renames were applied selectively:

  • Internal helper methods and non-public interfaces were updated to reflect workload terminology
  • Methods closely tied to Celery internals or mirroring BaseExecutor / scheduler-invoked interfaces were left unchanged to preserve compatibility

No behavioral or functional changes are introduced.

Rationale

The Celery executor already supports workload-based execution, but parts of the implementation still reflect legacy task-centric terminology. This PR standardizes naming, typing, and documentation to improve consistency and readability.

  • Method names were updated where safe (i.e. internal helpers not part of public or inherited interfaces). Methods that mirror BaseExecutor, are invoked by the scheduler, or are tightly coupled to Celery internals were intentionally left unchanged to avoid breaking contracts.

  • Variables were renamed from “task” to “workload” where they represent Airflow execution units. Fields inherited from BaseExecutor (e.g. queued_tasks) and Celery-specific concepts (e.g. task_id) were preserved to maintain compatibility and clarity.

  • Docstrings and comments were updated to consistently describe workloads while still distinguishing between Airflow workloads and Celery tasks where necessary.

  • Typing was improved by preferring workload-based types while retaining legacy aliases required for compatibility across Airflow versions.

Notes

  • The CeleryKubernetesExecutor has not been altered (save for a type: ignore to satisfy mypy) in this PR as it is now deprecated according to Issue Executor Callback support #62887

  • Metrics (Stats.incr) and configuration paths were intentionally left unchanged, as they form part of Airflow’s external/operational interface (e.g. dashboards, alerts, and user configs). Renaming these could introduce unintended breaking changes or disrupt existing observability. This change is therefore scoped to internal naming, typing, and documentation updates only, without altering behavior or externally visible contracts.

Related

…ing to align with the workload-based executor model.
@SameerMesiah97 SameerMesiah97 force-pushed the CeleryExecutor-Cleanup branch from 8e2dea8 to 29a855f Compare March 18, 2026 19:00
@SameerMesiah97 SameerMesiah97 marked this pull request as ready for review March 18, 2026 19:35
@SameerMesiah97 SameerMesiah97 marked this pull request as draft March 18, 2026 19:35
@SameerMesiah97 SameerMesiah97 marked this pull request as ready for review March 18, 2026 20:58
@ferruzzi ferruzzi mentioned this pull request Mar 18, 2026
2 tasks
@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Mar 20, 2026
@ferruzzi ferruzzi self-requested a review March 26, 2026 19:07
Copy link
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice. I left a couple comments, but it's a great start

if AIRFLOW_V_3_0_PLUS:
# TODO: TaskSDK: move this type change into BaseExecutor
queued_tasks: dict[TaskInstanceKey, workloads.All] # type: ignore[assignment]
queued_tasks: dict[WorkloadKey, workloads.All] # type: ignore[assignment]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a miss here. You are importing WorkloadKey if version is over 3.2 up above, but using it here if airflow version is 3.0. What about using this as the import block?

# Remove this conditional once min version > 3.2
try:
    from airflow.executors.workloads.types import WorkloadKey
except ImportError:
    from airflow.models.taskinstancekey import TaskInstanceKey as WorkloadKey

I know there's some community debate over using try/catch on imports, but I think this feels like the right time to use one.

((I think the same comment goes for celery_executor_utils.py as well))

assert len(executor.queued_tasks) == 1, "Workload should remain in queue"
assert executor.event_buffer == {}
assert f"[Try 1 of 3] Task Timeout Error for Task: ({key})." in caplog.text
assert f"[Try 1 of 3] Celery Task Timeout Error for Workload: ({key})." in caplog.text
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and below, it looks like you changed the expected message in the tests but didn't actually change the message log in the code?

Comment on lines -386 to -387
# Backward compatibility alias
send_task_to_executor = send_workload_to_executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need this? If not, then we need to update references to it such as (but not limited to) celery/provider.yaml


def update_task_state(self, key: TaskInstanceKey, state: str, info: Any) -> None:
"""Update state of a single task."""
def update_workload_state(self, key: WorkloadKey, state: str, info: Any) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double check me, but I don't believe callbacks support this part (yet?), do they? If not, then maybe we should leave this one for now? With this change it makes the method look like it properly handles both types when it doesn't.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants