Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions providers/celery/docs/celery_executor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,42 @@ During this process, two processes are created:
| [11] **WorkerProcess** saves status information in **ResultBackend**.
| [13] When **SchedulerProcess** asks **ResultBackend** again about the status, it will get information about the status of the task.

.. _celery_executor:logging:

Worker logging
--------------

By default the Celery worker writes plain-text logs to stdout. To emit structured
JSON instead, enable it via the ``[logging]`` section (applies to all Airflow
components) or override it for the worker alone with the ``[celery]`` section:

.. code-block:: ini

# Global — affects the API server, scheduler, and workers:
[logging]
json_logs = True

# Or override for the Celery worker only, leaving other components unchanged:
[celery]
json_logs = True

The lookup order is:

1. ``[celery] json_logs`` — if set, takes precedence.
2. ``[logging] json_logs`` — used when the celery-specific key is absent.
3. ``False`` — the default when neither key is configured.

This mirrors the existing ``[logging] CELERY_LOGGING_LEVEL`` /
``[logging] LOGGING_LEVEL`` fallback already present in the worker startup
code.

.. note::

``[logging] json_logs`` was added in Airflow 3.2.0. On older 3.x versions the
global key is silently ignored (``fallback=False``), so setting only
``[celery] json_logs = True`` is the safe way to enable JSON logs regardless
of the core version.

.. _celery_executor:queue:

Queues
Expand Down
1 change: 1 addition & 0 deletions providers/celery/newsfragments/68916.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``[celery] json_logs`` config option so the Celery worker can emit structured JSON logs independently of the global ``[logging] json_logs`` setting.
11 changes: 11 additions & 0 deletions providers/celery/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,17 @@ config:
type: boolean
example: ~
default: "true"
json_logs:
description: |
Emit Celery worker stdout in JSON format. When set, this takes precedence over
the global ``[logging] json_logs`` setting, allowing the worker to use a
different format than the rest of the deployment. When unset (the default),
the value falls back to ``[logging] json_logs``, which itself defaults to
``False``.
version_added: ~
type: boolean
example: ~
default: ~
broker_url:
description: |
The Celery broker URL. Celery supports multiple broker types. See:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ def worker(args):
if AIRFLOW_V_3_0_PLUS:
from airflow.sdk.log import configure_logging

configure_logging(output=sys.stdout.buffer)
json_output = conf.getboolean("celery", "json_logs", fallback=None)
if json_output is None:
json_output = conf.getboolean("logging", "json_logs", fallback=False)
configure_logging(output=sys.stdout.buffer, json_output=json_output)
else:
# Disable connection pool so that celery worker does not hold an unnecessary db connection
settings.reconfigure_orm(disable_connection_pool=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ def get_provider_info():
"example": None,
"default": "true",
},
"json_logs": {
"description": "Emit Celery worker stdout in JSON format. When set, this takes precedence over\nthe global ``[logging] json_logs`` setting, allowing the worker to use a\ndifferent format than the rest of the deployment. When unset (the default),\nthe value falls back to ``[logging] json_logs``, which itself defaults to\n``False``.\n",
"version_added": None,
"type": "boolean",
"example": None,
"default": None,
},
"broker_url": {
"description": "The Celery broker URL. Celery supports multiple broker types. See:\nhttps://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html#broker-overview\n",
"version_added": None,
Expand Down
44 changes: 44 additions & 0 deletions providers/celery/tests/unit/celery/cli/test_celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,50 @@ def test_worker_starts_when_hostname_is_unique(
mock_pools_reset.assert_called_once()


@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="configure_logging only called in Airflow 3+")
@pytest.mark.backend("mysql", "postgres")
@pytest.mark.usefixtures("conf_stale_bundle_cleanup_disabled")
class TestWorkerJsonLogs:
@classmethod
def setup_class(cls):
with conf_vars({("core", "executor"): "CeleryExecutor"}):
importlib.reload(executor_loader)
importlib.reload(cli_parser)
cls.parser = cli_parser.get_parser()

@mock.patch("airflow.providers.celery.cli.celery_command.Process")
@mock.patch("airflow.providers.celery.executors.celery_executor.app")
@mock.patch("airflow.sdk.log.configure_logging")
def test_json_logs_defaults_to_false(self, mock_configure_logging, mock_celery_app, mock_popen):
args = self.parser.parse_args(["celery", "worker"])
celery_command.worker(args)
mock_configure_logging.assert_called_once()
_, kwargs = mock_configure_logging.call_args
assert kwargs.get("json_output") is False

@mock.patch("airflow.providers.celery.cli.celery_command.Process")
@mock.patch("airflow.providers.celery.executors.celery_executor.app")
@mock.patch("airflow.sdk.log.configure_logging")
def test_json_logs_uses_global_logging_config(self, mock_configure_logging, mock_celery_app, mock_popen):
args = self.parser.parse_args(["celery", "worker"])
with conf_vars({("logging", "json_logs"): "True"}):
celery_command.worker(args)
mock_configure_logging.assert_called_once()
_, kwargs = mock_configure_logging.call_args
assert kwargs.get("json_output") is True

@mock.patch("airflow.providers.celery.cli.celery_command.Process")
@mock.patch("airflow.providers.celery.executors.celery_executor.app")
@mock.patch("airflow.sdk.log.configure_logging")
def test_celery_json_logs_overrides_global(self, mock_configure_logging, mock_celery_app, mock_popen):
args = self.parser.parse_args(["celery", "worker"])
with conf_vars({("logging", "json_logs"): "True", ("celery", "json_logs"): "False"}):
celery_command.worker(args)
mock_configure_logging.assert_called_once()
_, kwargs = mock_configure_logging.call_args
assert kwargs.get("json_output") is False


@pytest.mark.backend("mysql", "postgres")
@pytest.mark.usefixtures("conf_stale_bundle_cleanup_disabled")
class TestFlowerCommand:
Expand Down
Loading