diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py index 9b5a9ae759a27..c66219f7a1635 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py @@ -117,9 +117,21 @@ def _get_celery_app() -> Celery: return Celery(celery_app_name, config_source=get_celery_configuration()) +# Cache of Celery apps keyed by their resolved (team-suffixed) name. See create_celery_app. +_celery_app_cache: dict[str | None, Celery] = {} + + def create_celery_app(team_conf: ExecutorConf | AirflowConfigParser) -> Celery: """ - Create a Celery app, supporting team-specific configuration. + Create (or reuse) a Celery app, supporting team-specific configuration. + + Apps are cached by their resolved (team-suffixed) name and reused on subsequent + calls. ``create_celery_app`` is invoked on every task send via + ``send_workload_to_executor``; when sends run in the main scheduler process + (a single task per heartbeat, or ``sync_parallelism == 1``) rebuilding the app + each time leaks memory through repeated Celery task-class creation and broker + connections. The cache is module-level, so the ``ProcessPoolExecutor`` send + workers each keep their own (initially empty) cache and discard it on exit. :param team_conf: ExecutorConf instance with team-specific configuration, or global conf :return: Celery app instance @@ -139,6 +151,9 @@ def create_celery_app(team_conf: ExecutorConf | AirflowConfigParser) -> Celery: if team_name: celery_app_name = f"{celery_app_name}_{team_name}" + if celery_app_name in _celery_app_cache: + return _celery_app_cache[celery_app_name] + config = get_default_celery_config(team_conf) # Apply user-provided celery_config_options on top of team config. @@ -157,6 +172,7 @@ def create_celery_app(team_conf: ExecutorConf | AirflowConfigParser) -> Celery: if not AIRFLOW_V_3_0_PLUS: celery_app.task(name="execute_command")(execute_command) + _celery_app_cache[celery_app_name] = celery_app return celery_app @@ -388,8 +404,11 @@ def send_workload_to_executor( """ Send workload to executor (serialized and executed as a Celery task). - This function is called in ProcessPoolExecutor subprocesses. To avoid pickling issues with - team-specific Celery apps, we pass the team_name and reconstruct the Celery app here. + This runs either in a ``ProcessPoolExecutor`` subprocess (when several tasks are sent + in parallel) or directly in the calling process (a single task per heartbeat, or + ``sync_parallelism == 1``). In both cases the Celery app is rebuilt from configuration + here to avoid pickling team-specific apps across processes; ``create_celery_app`` caches + the app by name so the in-process path does not recreate it on every send. """ key, args, queue, team_name = workload_tuple diff --git a/providers/celery/tests/unit/celery/executors/test_celery_executor.py b/providers/celery/tests/unit/celery/executors/test_celery_executor.py index c11ea80a5baaf..6e66b085d5453 100644 --- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py +++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py @@ -743,10 +743,12 @@ class TestMultiTeamCeleryExecutor: """Test multi-team functionality in CeleryExecutor.""" def setup_method(self) -> None: + celery_executor_utils._celery_app_cache.clear() db.clear_db_runs() db.clear_db_jobs() def teardown_method(self) -> None: + celery_executor_utils._celery_app_cache.clear() db.clear_db_runs() db.clear_db_jobs() @@ -1258,6 +1260,13 @@ def test_one_way_tls_ignores_key_cert(self): class TestCreateCeleryAppTeamIsolation: """Tests for create_celery_app() multi-team config isolation.""" + @pytest.fixture(autouse=True) + def _clear_celery_app_cache(self): + """Reset the create_celery_app cache so apps built in one test don't leak into the next.""" + celery_executor_utils._celery_app_cache.clear() + yield + celery_executor_utils._celery_app_cache.clear() + @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="ExecutorConf requires Airflow 3.2+") def test_custom_celery_config_options_applied(self): """User-provided celery_config_options (non-default) should be merged into team config.""" @@ -1330,3 +1339,22 @@ def test_team_app_name_includes_team_name(self): team_conf = ExecutorConf(team_name="team_beta") celery_app = celery_executor_utils.create_celery_app(team_conf) assert "team_beta" in celery_app.main + + def test_app_is_cached_and_reused(self): + """Repeated create_celery_app() calls for the same name reuse one app. + + This guards against the per-send app rebuild that leaks memory in the + scheduler when sends run in-process (single task per heartbeat or + sync_parallelism == 1). + """ + first = celery_executor_utils.create_celery_app(conf) + second = celery_executor_utils.create_celery_app(conf) + assert first is second + + @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="ExecutorConf requires Airflow 3.2+") + def test_distinct_teams_get_distinct_apps(self): + """Different team names must not share a cached app (broker isolation).""" + app_alpha = celery_executor_utils.create_celery_app(ExecutorConf(team_name="team_alpha")) + app_beta = celery_executor_utils.create_celery_app(ExecutorConf(team_name="team_beta")) + assert app_alpha is not app_beta + assert app_alpha is celery_executor_utils.create_celery_app(ExecutorConf(team_name="team_alpha"))