Skip to content

fix(celery): cache Celery apps to stop per-send memory leak in CeleryExecutor#68905

Open
arkadiuszbach wants to merge 1 commit into
apache:mainfrom
arkadiuszbach:fix/celery-executor-app-cache-memory-leak
Open

fix(celery): cache Celery apps to stop per-send memory leak in CeleryExecutor#68905
arkadiuszbach wants to merge 1 commit into
apache:mainfrom
arkadiuszbach:fix/celery-executor-app-cache-memory-leak

Conversation

@arkadiuszbach

@arkadiuszbach arkadiuszbach commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

What

create_celery_app() is now cached by the resolved (team-aware) app name, so a
Celery app is built once per process and reused on subsequent sends instead of
being rebuilt on every workload send.

Why / Root cause

CeleryExecutor._send_workloads_to_celery() has two paths:

  • Parallel path — sends are fanned out across a ProcessPoolExecutor. Each
    subprocess builds its own Celery app and is then torn down, so a fresh app per
    send is harmless.
  • In-process hot path — when there is a single task per heartbeat, or
    sync_parallelism == 1, send_workload_to_executor() runs directly in the
    long-lived scheduler/executor process
    .

send_workload_to_executor() calls create_celery_app() on every invocation.
On the in-process path this meant a brand-new Celery app (with new broker /
result-backend connection pools) was constructed for every single send and
never released — a steady memory leak in a long-running scheduler process.

PR #60675 introduced the team-aware app construction under the assumption that
sends always happen in throwaway subprocesses. That assumption does not hold for
the in-process path.

Why this matters in practice (Helm / Kubernetes)

Since #58733, the official Helm chart derives
celery.sync_parallelism from the scheduler's CPU limit (millicores rounded up
to whole cores). Any scheduler with a CPU limit of ≤ 1000m (≤ 1 core)
a very common configuration — therefore renders sync_parallelism = 1, which
makes _send_workloads_to_celery() take the in-process hot path on every
send
. These deployments hit the leak continuously.

Fix

  • Add a module-level _celery_app_cache: dict[str, Celery] keyed by the
    resolved (team-suffixed) app name.
  • In create_celery_app(), return the cached app if one exists for that name,
    otherwise build it and store it before returning.

Celery config is static per app-name per process, so reuse is safe for both call
sites:

  • Subprocesses still build their app once (cache is per-process).
  • The in-process path stops recreating the app on every send, eliminating the leak.

Testing

  • test_app_is_cached_and_reusedcreate_celery_app(conf) returns the same
    instance on repeated calls.
  • test_distinct_teams_get_distinct_apps (3.2+) — different teams get distinct
    apps, same team reuses one app.
  • A _clear_celery_app_cache autouse fixture isolates the cache within
    TestCreateCeleryAppTeamIsolation, and TestMultiTeamCeleryExecutor
    setup/teardown clear it too (the cache, being keyed by app name, otherwise
    leaks built apps across tests). Both clears are required — verified empirically:
    removing them makes test_team_specific_broker_not_overwritten fail.

Notes

The regression tests fail when the production cache change is reverted.

Reproducing

details
  • apache-airflow-providers-celery==3.20.0, CeleryExecutor, sync_parallelism=1
  • test_dag
    def test(**context):
        time.sleep(1)
    
    dag = DAG(
        dag_id='test',
        schedule=timedelta(minutes=1),
        start_date=datetime(2026, 6, 1),
        catchup=False,
        max_active_runs=1,
    )
    
    with dag:
        # 50 tasks on Celery worker
        for i in range(50):
            PythonOperator(
                task_id=f"celery_{i}",
                python_callable=test,
            )
    
        # 50 tasks on Triggerer (deferrable async sleep)
        for i in range(50):
            WaitSensor(
                task_id=f"triggerer_{i}",
                time_to_wait=timedelta(seconds=1),
                deferrable=True,
            )
    
  • let it run for some time, it leaks

Was generative AI tooling used to co-author this PR?

create_celery_app() rebuilt a brand-new Celery app on every workload
send. When sends run in-process (a single task per heartbeat, or
sync_parallelism == 1) this happens in the long-lived scheduler/executor
process, so each rebuilt app — and its broker/result-backend connection
pools — accumulates and is never released, leaking memory over time.

PR apache#60675 assumed sends always run in throwaway ProcessPoolExecutor
subprocesses, where a fresh app per send is harmless. The in-process hot
path in _send_workloads_to_celery() breaks that assumption.

Cache apps by their resolved (team-aware) name so each distinct app is
built once per process and reused. Config is static per app-name per
process, so this is safe for both call sites: subprocesses still build
their own app once, and the in-process path stops recreating it on every
send.

Add regression tests for app caching/reuse and per-team app isolation.
@arkadiuszbach arkadiuszbach force-pushed the fix/celery-executor-app-cache-memory-leak branch from e51915a to b1dab47 Compare June 23, 2026 16:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant