diff --git a/providers/celery/docs/celery_executor.rst b/providers/celery/docs/celery_executor.rst index 6017c68c83c04..c75d147c7288b 100644 --- a/providers/celery/docs/celery_executor.rst +++ b/providers/celery/docs/celery_executor.rst @@ -203,6 +203,42 @@ During this process, two processes are created: | [11] **WorkerProcess** saves status information in **ResultBackend**. | [13] When **SchedulerProcess** asks **ResultBackend** again about the status, it will get information about the status of the task. +.. _celery_executor:logging: + +Worker logging +-------------- + +By default the Celery worker writes plain-text logs to stdout. To emit structured +JSON instead, enable it via the ``[logging]`` section (applies to all Airflow +components) or override it for the worker alone with the ``[celery]`` section: + +.. code-block:: ini + + # Global — affects the API server, scheduler, and workers: + [logging] + json_logs = True + + # Or override for the Celery worker only, leaving other components unchanged: + [celery] + json_logs = True + +The lookup order is: + +1. ``[celery] json_logs`` — if set, takes precedence. +2. ``[logging] json_logs`` — used when the celery-specific key is absent. +3. ``False`` — the default when neither key is configured. + +This mirrors the existing ``[logging] CELERY_LOGGING_LEVEL`` / +``[logging] LOGGING_LEVEL`` fallback already present in the worker startup +code. + +.. note:: + + ``[logging] json_logs`` was added in Airflow 3.2.0. On older 3.x versions the + global key is silently ignored (``fallback=False``), so setting only + ``[celery] json_logs = True`` is the safe way to enable JSON logs regardless + of the core version. + .. _celery_executor:queue: Queues diff --git a/providers/celery/newsfragments/68916.feature.rst b/providers/celery/newsfragments/68916.feature.rst new file mode 100644 index 0000000000000..cb82fe1594708 --- /dev/null +++ b/providers/celery/newsfragments/68916.feature.rst @@ -0,0 +1 @@ +Add ``[celery] json_logs`` config option so the Celery worker can emit structured JSON logs independently of the global ``[logging] json_logs`` setting. diff --git a/providers/celery/provider.yaml b/providers/celery/provider.yaml index 66e2a0534525f..14e5dc87c30bc 100644 --- a/providers/celery/provider.yaml +++ b/providers/celery/provider.yaml @@ -185,6 +185,17 @@ config: type: boolean example: ~ default: "true" + json_logs: + description: | + Emit Celery worker stdout in JSON format. When set, this takes precedence over + the global ``[logging] json_logs`` setting, allowing the worker to use a + different format than the rest of the deployment. When unset (the default), + the value falls back to ``[logging] json_logs``, which itself defaults to + ``False``. + version_added: ~ + type: boolean + example: ~ + default: ~ broker_url: description: | The Celery broker URL. Celery supports multiple broker types. See: diff --git a/providers/celery/src/airflow/providers/celery/cli/celery_command.py b/providers/celery/src/airflow/providers/celery/cli/celery_command.py index 0e2b66aabf10c..29f2ddd8a97b2 100644 --- a/providers/celery/src/airflow/providers/celery/cli/celery_command.py +++ b/providers/celery/src/airflow/providers/celery/cli/celery_command.py @@ -245,7 +245,10 @@ def worker(args): if AIRFLOW_V_3_0_PLUS: from airflow.sdk.log import configure_logging - configure_logging(output=sys.stdout.buffer) + json_output = conf.getboolean("celery", "json_logs", fallback=None) + if json_output is None: + json_output = conf.getboolean("logging", "json_logs", fallback=False) + configure_logging(output=sys.stdout.buffer, json_output=json_output) else: # Disable connection pool so that celery worker does not hold an unnecessary db connection settings.reconfigure_orm(disable_connection_pool=True) diff --git a/providers/celery/src/airflow/providers/celery/get_provider_info.py b/providers/celery/src/airflow/providers/celery/get_provider_info.py index ce59a55918f11..457d24a869ff7 100644 --- a/providers/celery/src/airflow/providers/celery/get_provider_info.py +++ b/providers/celery/src/airflow/providers/celery/get_provider_info.py @@ -96,6 +96,13 @@ def get_provider_info(): "example": None, "default": "true", }, + "json_logs": { + "description": "Emit Celery worker stdout in JSON format. When set, this takes precedence over\nthe global ``[logging] json_logs`` setting, allowing the worker to use a\ndifferent format than the rest of the deployment. When unset (the default),\nthe value falls back to ``[logging] json_logs``, which itself defaults to\n``False``.\n", + "version_added": None, + "type": "boolean", + "example": None, + "default": None, + }, "broker_url": { "description": "The Celery broker URL. Celery supports multiple broker types. See:\nhttps://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html#broker-overview\n", "version_added": None, diff --git a/providers/celery/tests/unit/celery/cli/test_celery_command.py b/providers/celery/tests/unit/celery/cli/test_celery_command.py index 99a9506f3d625..95a0b5165cdb4 100644 --- a/providers/celery/tests/unit/celery/cli/test_celery_command.py +++ b/providers/celery/tests/unit/celery/cli/test_celery_command.py @@ -386,6 +386,50 @@ def test_worker_starts_when_hostname_is_unique( mock_pools_reset.assert_called_once() +@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="configure_logging only called in Airflow 3+") +@pytest.mark.backend("mysql", "postgres") +@pytest.mark.usefixtures("conf_stale_bundle_cleanup_disabled") +class TestWorkerJsonLogs: + @classmethod + def setup_class(cls): + with conf_vars({("core", "executor"): "CeleryExecutor"}): + importlib.reload(executor_loader) + importlib.reload(cli_parser) + cls.parser = cli_parser.get_parser() + + @mock.patch("airflow.providers.celery.cli.celery_command.Process") + @mock.patch("airflow.providers.celery.executors.celery_executor.app") + @mock.patch("airflow.sdk.log.configure_logging") + def test_json_logs_defaults_to_false(self, mock_configure_logging, mock_celery_app, mock_popen): + args = self.parser.parse_args(["celery", "worker"]) + celery_command.worker(args) + mock_configure_logging.assert_called_once() + _, kwargs = mock_configure_logging.call_args + assert kwargs.get("json_output") is False + + @mock.patch("airflow.providers.celery.cli.celery_command.Process") + @mock.patch("airflow.providers.celery.executors.celery_executor.app") + @mock.patch("airflow.sdk.log.configure_logging") + def test_json_logs_uses_global_logging_config(self, mock_configure_logging, mock_celery_app, mock_popen): + args = self.parser.parse_args(["celery", "worker"]) + with conf_vars({("logging", "json_logs"): "True"}): + celery_command.worker(args) + mock_configure_logging.assert_called_once() + _, kwargs = mock_configure_logging.call_args + assert kwargs.get("json_output") is True + + @mock.patch("airflow.providers.celery.cli.celery_command.Process") + @mock.patch("airflow.providers.celery.executors.celery_executor.app") + @mock.patch("airflow.sdk.log.configure_logging") + def test_celery_json_logs_overrides_global(self, mock_configure_logging, mock_celery_app, mock_popen): + args = self.parser.parse_args(["celery", "worker"]) + with conf_vars({("logging", "json_logs"): "True", ("celery", "json_logs"): "False"}): + celery_command.worker(args) + mock_configure_logging.assert_called_once() + _, kwargs = mock_configure_logging.call_args + assert kwargs.get("json_output") is False + + @pytest.mark.backend("mysql", "postgres") @pytest.mark.usefixtures("conf_stale_bundle_cleanup_disabled") class TestFlowerCommand: