diff --git a/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst b/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst index fa34e74f3140b..629cb5065513e 100644 --- a/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst +++ b/airflow-core/docs/administration-and-deployment/logging-monitoring/advanced-logging-configuration.rst @@ -46,13 +46,14 @@ that Python objects log to loggers that follow naming convention of ``. You can read more about standard python logging classes (Loggers, Handlers, Formatters) in the `Python logging documentation `_. -Create a custom logging class ------------------------------ +Create a custom logging config +------------------------------ Configuring your logging classes can be done via the ``logging_config_class`` option in ``airflow.cfg`` file. -This configuration should specify the import path to a configuration compatible with -:func:`logging.config.dictConfig`. If your file is a standard import location, then you should set a -:envvar:`PYTHONPATH` environment variable. +Despite the option name the value is a dotted import path to a ``dict`` that satisfies +:func:`logging.config.dictConfig` — not a Python class. The ``_class`` suffix is +historical and kept for backwards compatibility. If your file is a standard import +location, then you should set a :envvar:`PYTHONPATH` environment variable. Follow the steps below to enable custom logging config class: @@ -102,6 +103,48 @@ See :doc:`../modules_management` for details on how Python and Airflow manage mo You can override the way both standard logs of the components and "task" logs are handled. +Custom logging configs and remote logging +----------------------------------------- + +When ``[logging] remote_logging = True`` and you point ``logging_config_class`` +at your own module, define two module-level attributes in that module: + +* ``REMOTE_TASK_LOG`` — an instance of + :class:`~airflow.logging.remote.RemoteLogIO` (or + :class:`~airflow.logging.remote.RemoteLogStreamIO`) that uploads task logs + and reads them back for the UI. +* ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id used when + ``[logging] remote_log_conn_id`` is unset. + +If ``REMOTE_TASK_LOG`` is missing, Airflow emits one ``WARNING`` at startup +and the UI cannot read task logs back from the remote backend. + + .. code-block:: python + + # ~/airflow/config/log_config.py + from airflow.logging.remote import RemoteLogIO + + + class MyRemoteLogIO: + @property + def processors(self): + return () + + def upload(self, path, ti): ... # upload local log file at ``path`` to your backend + + def read(self, relative_path, ti): ... # return (source_info, log_messages) for the UI + + + REMOTE_TASK_LOG: RemoteLogIO | None = MyRemoteLogIO() + DEFAULT_REMOTE_CONN_ID: str | None = "my_remote_conn" + +.. note:: + + Define ``REMOTE_TASK_LOG`` in your own module rather than re-exporting it + from ``airflow.config_templates.airflow_local_settings``, which is planned + for deprecation. + + Custom logger for Operators, Hooks and Tasks -------------------------------------------- diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index e30e68fb17984..c5bb91b682f56 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -50,6 +50,9 @@ BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")) # This isn't used anymore, but kept for compat of people who might have imported it +# Default value for the ``[logging] logging_config_class`` option. Plain +# ``logging.config.dictConfig`` dict; the ``_class`` suffix on the config option +# is historical. DEFAULT_LOGGING_CONFIG: dict[str, Any] = { "version": 1, "disable_existing_loggers": False, @@ -121,6 +124,11 @@ ################## REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") + +# Side-channel attributes read by ``discover_remote_log_handler`` from whichever +# module ``[logging] logging_config_class`` resolves through. Custom modules that +# override that option should define both at module scope to enable remote +# task-log read-back. REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None DEFAULT_REMOTE_CONN_ID: str | None = None diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 86e422451a08b..9426a98271bf8 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -890,9 +890,12 @@ logging: default: "INFO" logging_config_class: description: | - Logging class - Specify the class that will specify the logging configuration - This class has to be on the python classpath + Dotted import path to a ``logging.config.dictConfig`` dict. The + ``_class`` suffix is historical — the target is a dict, not a class. + + Airflow also reads two optional module-level attributes from the + enclosing module: ``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID``, + used to drive remote task-log read-back. See :ref:`write-logs-advanced`. version_added: 2.0.0 type: string example: "my.path.default_local_settings.LOGGING_CONFIG" diff --git a/airflow-core/src/airflow/logging_config.py b/airflow-core/src/airflow/logging_config.py index 7f4ae605cb21a..c049146e6a847 100644 --- a/airflow-core/src/airflow/logging_config.py +++ b/airflow-core/src/airflow/logging_config.py @@ -120,6 +120,39 @@ def load_logging_config() -> tuple[dict[str, Any], str]: ) or DEFAULT_LOGGING_CONFIG_PATH +def _warn_if_missing_remote_task_log(logging_class_path: str) -> None: + """ + Warn if ``[logging] remote_logging`` is on but the user module exposes no remote IO. + + Runs *after* ``dictConfig`` has constructed handlers, so deprecated + self-registration in provider task handlers (Elasticsearch, OpenSearch) has + already had its chance to populate ``_ActiveLoggingConfig.remote_task_log``. + Only fires for user-defined ``logging_config_class`` values; the stock + fallback is exempt. + + :param logging_class_path: the resolved ``[logging] logging_config_class`` + dotted path (already defaulted to :data:`DEFAULT_LOGGING_CONFIG_PATH`). + """ + user_defined = bool(logging_class_path) and logging_class_path != DEFAULT_LOGGING_CONFIG_PATH + remote_logging_enabled = conf.getboolean("logging", "remote_logging", fallback=False) + if not (user_defined and remote_logging_enabled): + return + if _ActiveLoggingConfig.remote_task_log is not None: + return + # Strip the trailing ``.`` to leave the enclosing module path. + # ``logging_class_path`` should always be dotted since ``import_string`` + # would have raised otherwise, but guard the access defensively. + parts = logging_class_path.rsplit(".", 1) + modpath = parts[0] if len(parts) == 2 else logging_class_path + log.warning( + "[logging] remote_logging is enabled but the user-defined logging module %r " + "does not expose a REMOTE_TASK_LOG attribute, so remote task-log read-back is " + "disabled. Define REMOTE_TASK_LOG (a RemoteLogIO instance) at module scope " + "to enable it.", + modpath, + ) + + def configure_logging(): from airflow._shared.logging import configure_logging, init_log_folder, translate_config_values @@ -171,6 +204,14 @@ def configure_logging(): # otherwise Airflow would silently fall back on the default config raise e + # Runs after dictConfig so deprecated handler self-registration (ES/OS) has + # had its chance to populate _ActiveLoggingConfig.remote_task_log. + logging_class_path = ( + conf.get("logging", "logging_config_class", fallback=DEFAULT_LOGGING_CONFIG_PATH) + or DEFAULT_LOGGING_CONFIG_PATH + ) + _warn_if_missing_remote_task_log(logging_class_path) + validate_logging_config() new_folder_permissions = int( diff --git a/airflow-core/tests/unit/logging/test_logging_config.py b/airflow-core/tests/unit/logging/test_logging_config.py index 3fffb376425b9..80f406815f65e 100644 --- a/airflow-core/tests/unit/logging/test_logging_config.py +++ b/airflow-core/tests/unit/logging/test_logging_config.py @@ -28,6 +28,7 @@ _ActiveLoggingConfig, _get_logging_config, _load_logging_config, + _warn_if_missing_remote_task_log, get_default_remote_conn_id, get_remote_task_log, load_logging_config, @@ -200,3 +201,42 @@ def test_skips_reload_when_already_loaded(self): mocked_conf.get.return_value = None assert get_default_remote_conn_id() == "cached_conn" mock_load.assert_not_called() + + +class TestWarnIfMissingRemoteTaskLog: + @pytest.fixture(autouse=True) + def _reset_active_logging_config(self, monkeypatch): + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", None, raising=False) + monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False) + + def test_warns_when_user_module_missing_remote_task_log_and_remote_logging_enabled(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG") + mock_log.warning.assert_called_once() + assert "my_pkg.custom_settings" in mock_log.warning.call_args.args + + def test_no_warning_when_using_fallback_path(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log(DEFAULT_LOGGING_CONFIG_PATH) + mock_log.warning.assert_not_called() + + def test_no_warning_when_remote_logging_disabled(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "False") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG") + mock_log.warning.assert_not_called() + + def test_no_warning_when_remote_task_log_is_set(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False) + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG") + mock_log.warning.assert_not_called() + + def test_no_warning_when_empty_logging_class_path(self, monkeypatch): + monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True") + with mock.patch("airflow.logging_config.log") as mock_log: + _warn_if_missing_remote_task_log("") + mock_log.warning.assert_not_called() diff --git a/pyproject.toml b/pyproject.toml index bae4c06e3b5ec..5a6f250fc1012 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -219,7 +219,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-edge3>=1.0.0" ] "elasticsearch" = [ - "apache-airflow-providers-elasticsearch>=6.5.0" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py + "apache-airflow-providers-elasticsearch>=6.6.0" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py ] "exasol" = [ "apache-airflow-providers-exasol>=4.6.1" @@ -306,7 +306,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-openlineage>=2.3.0" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py ] "opensearch" = [ - "apache-airflow-providers-opensearch>=1.9.0" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py + "apache-airflow-providers-opensearch>=1.9.3" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py ] "opsgenie" = [ "apache-airflow-providers-opsgenie>=5.8.0" @@ -447,7 +447,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-discord>=3.9.0", "apache-airflow-providers-docker>=3.14.1", "apache-airflow-providers-edge3>=1.0.0", - "apache-airflow-providers-elasticsearch>=6.5.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py + "apache-airflow-providers-elasticsearch>=6.6.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py "apache-airflow-providers-exasol>=4.6.1", "apache-airflow-providers-fab>=3.6.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py "apache-airflow-providers-facebook>=3.7.0", @@ -476,7 +476,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-openai>=1.5.0", "apache-airflow-providers-openfaas>=3.7.0", "apache-airflow-providers-openlineage>=2.3.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py - "apache-airflow-providers-opensearch>=1.9.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py + "apache-airflow-providers-opensearch>=1.9.3", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py "apache-airflow-providers-opsgenie>=5.8.0", "apache-airflow-providers-oracle>=3.12.0", "apache-airflow-providers-pagerduty>=3.8.1", diff --git a/scripts/ci/prek/update_airflow_pyproject_toml.py b/scripts/ci/prek/update_airflow_pyproject_toml.py index a7d653311850f..2313790e0a5d5 100755 --- a/scripts/ci/prek/update_airflow_pyproject_toml.py +++ b/scripts/ci/prek/update_airflow_pyproject_toml.py @@ -94,8 +94,8 @@ "openlineage": parse_version("2.3.0"), "git": parse_version("0.0.2"), "common.messaging": parse_version("2.0.0"), - "elasticsearch": parse_version("6.5.0"), - "opensearch": parse_version("1.9.0"), + "elasticsearch": parse_version("6.6.0"), + "opensearch": parse_version("1.9.3"), } diff --git a/shared/logging/src/airflow_shared/logging/remote.py b/shared/logging/src/airflow_shared/logging/remote.py index 59af50be6bc7d..b80a3ceb9cd9d 100644 --- a/shared/logging/src/airflow_shared/logging/remote.py +++ b/shared/logging/src/airflow_shared/logging/remote.py @@ -79,7 +79,25 @@ def discover_remote_log_handler( fallback_path: str, import_string: Callable[[str], Any], ) -> tuple[RemoteLogIO | None, str | None]: - """Discover and load the remote log handler from a logging config module.""" + """ + Look up the optional remote-log handler alongside a logging dictConfig. + + ``[logging] logging_config_class`` is a dotted path to a + ``logging.config.dictConfig`` dict. After importing the dict, this helper + re-imports the enclosing module and reads two optional module-level + attributes via ``getattr``: + + * ``REMOTE_TASK_LOG`` — :class:`RemoteLogIO` / :class:`RemoteLogStreamIO` + instance that uploads and reads task logs. + * ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id for that + backend. + + Either may be ``None`` immediately after this call; provider task handlers + can still populate ``REMOTE_TASK_LOG`` from inside ``__init__`` when + ``dictConfig`` instantiates them (deprecated path). Callers that want to + warn on missing remote-log configuration should re-check + ``_ActiveLoggingConfig.remote_task_log`` *after* ``dictConfig`` has run. + """ # Sometimes we end up with `""` as the value! logging_class_path = logging_class_path or fallback_path diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index 3a8d4b3374db0..c2635216de4b1 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -172,7 +172,12 @@ def init_log_file(local_relative_path: str) -> Path: def _load_logging_config() -> None: - """Load and cache the remote logging configuration from SDK config.""" + """ + Load and cache the remote logging configuration from SDK config. + + SDK mirror of :func:`airflow.logging_config.load_logging_config` — see that + function for the ``logging_config_class`` / ``REMOTE_TASK_LOG`` contract. + """ from airflow.sdk._shared.logging.factory import resolve_remote_task_log from airflow.sdk._shared.module_loading import import_string from airflow.sdk.configuration import conf