Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,21 @@ def _get_celery_app() -> Celery:
return Celery(celery_app_name, config_source=get_celery_configuration())


# Cache of Celery apps keyed by their resolved (team-suffixed) name. See create_celery_app.
_celery_app_cache: dict[str | None, Celery] = {}


def create_celery_app(team_conf: ExecutorConf | AirflowConfigParser) -> Celery:
"""
Create a Celery app, supporting team-specific configuration.
Create (or reuse) a Celery app, supporting team-specific configuration.

Apps are cached by their resolved (team-suffixed) name and reused on subsequent
calls. ``create_celery_app`` is invoked on every task send via
``send_workload_to_executor``; when sends run in the main scheduler process
(a single task per heartbeat, or ``sync_parallelism == 1``) rebuilding the app
each time leaks memory through repeated Celery task-class creation and broker
connections. The cache is module-level, so the ``ProcessPoolExecutor`` send
workers each keep their own (initially empty) cache and discard it on exit.

:param team_conf: ExecutorConf instance with team-specific configuration, or global conf
:return: Celery app instance
Expand All @@ -139,6 +151,9 @@ def create_celery_app(team_conf: ExecutorConf | AirflowConfigParser) -> Celery:
if team_name:
celery_app_name = f"{celery_app_name}_{team_name}"

if celery_app_name in _celery_app_cache:
return _celery_app_cache[celery_app_name]

config = get_default_celery_config(team_conf)

# Apply user-provided celery_config_options on top of team config.
Expand All @@ -157,6 +172,7 @@ def create_celery_app(team_conf: ExecutorConf | AirflowConfigParser) -> Celery:
if not AIRFLOW_V_3_0_PLUS:
celery_app.task(name="execute_command")(execute_command)

_celery_app_cache[celery_app_name] = celery_app
return celery_app


Expand Down Expand Up @@ -388,8 +404,11 @@ def send_workload_to_executor(
"""
Send workload to executor (serialized and executed as a Celery task).

This function is called in ProcessPoolExecutor subprocesses. To avoid pickling issues with
team-specific Celery apps, we pass the team_name and reconstruct the Celery app here.
This runs either in a ``ProcessPoolExecutor`` subprocess (when several tasks are sent
in parallel) or directly in the calling process (a single task per heartbeat, or
``sync_parallelism == 1``). In both cases the Celery app is rebuilt from configuration
here to avoid pickling team-specific apps across processes; ``create_celery_app`` caches
the app by name so the in-process path does not recreate it on every send.
"""
key, args, queue, team_name = workload_tuple

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,10 +743,12 @@ class TestMultiTeamCeleryExecutor:
"""Test multi-team functionality in CeleryExecutor."""

def setup_method(self) -> None:
celery_executor_utils._celery_app_cache.clear()
db.clear_db_runs()
db.clear_db_jobs()

def teardown_method(self) -> None:
celery_executor_utils._celery_app_cache.clear()
db.clear_db_runs()
db.clear_db_jobs()

Expand Down Expand Up @@ -1258,6 +1260,13 @@ def test_one_way_tls_ignores_key_cert(self):
class TestCreateCeleryAppTeamIsolation:
"""Tests for create_celery_app() multi-team config isolation."""

@pytest.fixture(autouse=True)
def _clear_celery_app_cache(self):
"""Reset the create_celery_app cache so apps built in one test don't leak into the next."""
celery_executor_utils._celery_app_cache.clear()
yield
celery_executor_utils._celery_app_cache.clear()

@pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="ExecutorConf requires Airflow 3.2+")
def test_custom_celery_config_options_applied(self):
"""User-provided celery_config_options (non-default) should be merged into team config."""
Expand Down Expand Up @@ -1330,3 +1339,22 @@ def test_team_app_name_includes_team_name(self):
team_conf = ExecutorConf(team_name="team_beta")
celery_app = celery_executor_utils.create_celery_app(team_conf)
assert "team_beta" in celery_app.main

def test_app_is_cached_and_reused(self):
"""Repeated create_celery_app() calls for the same name reuse one app.

This guards against the per-send app rebuild that leaks memory in the
scheduler when sends run in-process (single task per heartbeat or
sync_parallelism == 1).
"""
first = celery_executor_utils.create_celery_app(conf)
second = celery_executor_utils.create_celery_app(conf)
assert first is second

@pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="ExecutorConf requires Airflow 3.2+")
def test_distinct_teams_get_distinct_apps(self):
"""Different team names must not share a cached app (broker isolation)."""
app_alpha = celery_executor_utils.create_celery_app(ExecutorConf(team_name="team_alpha"))
app_beta = celery_executor_utils.create_celery_app(ExecutorConf(team_name="team_beta"))
assert app_alpha is not app_beta
assert app_alpha is celery_executor_utils.create_celery_app(ExecutorConf(team_name="team_alpha"))
Loading