Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ that Python objects log to loggers that follow naming convention of ``<package>.
You can read more about standard python logging classes (Loggers, Handlers, Formatters) in the
`Python logging documentation <https://docs.python.org/library/logging.html>`_.

Create a custom logging class
-----------------------------
Create a custom logging config
------------------------------

Configuring your logging classes can be done via the ``logging_config_class`` option in ``airflow.cfg`` file.
This configuration should specify the import path to a configuration compatible with
:func:`logging.config.dictConfig`. If your file is a standard import location, then you should set a
:envvar:`PYTHONPATH` environment variable.
Despite the option name the value is a dotted import path to a ``dict`` that satisfies
:func:`logging.config.dictConfig` — not a Python class. The ``_class`` suffix is
historical and kept for backwards compatibility. If your file is a standard import
location, then you should set a :envvar:`PYTHONPATH` environment variable.

Follow the steps below to enable custom logging config class:

Expand Down Expand Up @@ -102,6 +103,48 @@ See :doc:`../modules_management` for details on how Python and Airflow manage mo
You can override the way both standard logs of the components and "task" logs are handled.


Custom logging configs and remote logging
-----------------------------------------

When ``[logging] remote_logging = True`` and you point ``logging_config_class``
at your own module, define two module-level attributes in that module:

* ``REMOTE_TASK_LOG`` — an instance of
:class:`~airflow.logging.remote.RemoteLogIO` (or
:class:`~airflow.logging.remote.RemoteLogStreamIO`) that uploads task logs
and reads them back for the UI.
* ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id used when
``[logging] remote_log_conn_id`` is unset.

If ``REMOTE_TASK_LOG`` is missing, Airflow emits one ``WARNING`` at startup
and the UI cannot read task logs back from the remote backend.

.. code-block:: python

# ~/airflow/config/log_config.py
from airflow.logging.remote import RemoteLogIO


class MyRemoteLogIO:
@property
def processors(self):
return ()

def upload(self, path, ti): ... # upload local log file at ``path`` to your backend

def read(self, relative_path, ti): ... # return (source_info, log_messages) for the UI


REMOTE_TASK_LOG: RemoteLogIO | None = MyRemoteLogIO()
DEFAULT_REMOTE_CONN_ID: str | None = "my_remote_conn"

.. note::

Define ``REMOTE_TASK_LOG`` in your own module rather than re-exporting it
from ``airflow.config_templates.airflow_local_settings``, which is planned
for deprecation.


Custom logger for Operators, Hooks and Tasks
--------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("logging", "BASE_LOG_FOLDER"))

# This isn't used anymore, but kept for compat of people who might have imported it
# Default value for the ``[logging] logging_config_class`` option. Plain
# ``logging.config.dictConfig`` dict; the ``_class`` suffix on the config option
# is historical.
DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
"version": 1,
"disable_existing_loggers": False,
Expand Down Expand Up @@ -121,6 +124,11 @@
##################

REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")

# Side-channel attributes read by ``discover_remote_log_handler`` from whichever
# module ``[logging] logging_config_class`` resolves through. Custom modules that
# override that option should define both at module scope to enable remote
# task-log read-back.
REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None
DEFAULT_REMOTE_CONN_ID: str | None = None

Expand Down
9 changes: 6 additions & 3 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -890,9 +890,12 @@ logging:
default: "INFO"
logging_config_class:
description: |
Logging class
Specify the class that will specify the logging configuration
This class has to be on the python classpath
Dotted import path to a ``logging.config.dictConfig`` dict. The
``_class`` suffix is historical — the target is a dict, not a class.

Airflow also reads two optional module-level attributes from the
enclosing module: ``REMOTE_TASK_LOG`` and ``DEFAULT_REMOTE_CONN_ID``,
used to drive remote task-log read-back. See :ref:`write-logs-advanced`.
version_added: 2.0.0
type: string
example: "my.path.default_local_settings.LOGGING_CONFIG"
Expand Down
41 changes: 41 additions & 0 deletions airflow-core/src/airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,39 @@ def load_logging_config() -> tuple[dict[str, Any], str]:
) or DEFAULT_LOGGING_CONFIG_PATH


def _warn_if_missing_remote_task_log(logging_class_path: str) -> None:
"""
Warn if ``[logging] remote_logging`` is on but the user module exposes no remote IO.

Runs *after* ``dictConfig`` has constructed handlers, so deprecated
self-registration in provider task handlers (Elasticsearch, OpenSearch) has
already had its chance to populate ``_ActiveLoggingConfig.remote_task_log``.
Only fires for user-defined ``logging_config_class`` values; the stock
fallback is exempt.

:param logging_class_path: the resolved ``[logging] logging_config_class``
dotted path (already defaulted to :data:`DEFAULT_LOGGING_CONFIG_PATH`).
"""
user_defined = bool(logging_class_path) and logging_class_path != DEFAULT_LOGGING_CONFIG_PATH
remote_logging_enabled = conf.getboolean("logging", "remote_logging", fallback=False)
if not (user_defined and remote_logging_enabled):
return
if _ActiveLoggingConfig.remote_task_log is not None:
return
# Strip the trailing ``.<config_attr>`` to leave the enclosing module path.
# ``logging_class_path`` should always be dotted since ``import_string``
# would have raised otherwise, but guard the access defensively.
parts = logging_class_path.rsplit(".", 1)
modpath = parts[0] if len(parts) == 2 else logging_class_path
log.warning(
"[logging] remote_logging is enabled but the user-defined logging module %r "
"does not expose a REMOTE_TASK_LOG attribute, so remote task-log read-back is "
"disabled. Define REMOTE_TASK_LOG (a RemoteLogIO instance) at module scope "
"to enable it.",
modpath,
)


def configure_logging():
from airflow._shared.logging import configure_logging, init_log_folder, translate_config_values

Expand Down Expand Up @@ -171,6 +204,14 @@ def configure_logging():
# otherwise Airflow would silently fall back on the default config
raise e

# Runs after dictConfig so deprecated handler self-registration (ES/OS) has
# had its chance to populate _ActiveLoggingConfig.remote_task_log.
logging_class_path = (
conf.get("logging", "logging_config_class", fallback=DEFAULT_LOGGING_CONFIG_PATH)
or DEFAULT_LOGGING_CONFIG_PATH
)
_warn_if_missing_remote_task_log(logging_class_path)

validate_logging_config()

new_folder_permissions = int(
Expand Down
40 changes: 40 additions & 0 deletions airflow-core/tests/unit/logging/test_logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
_ActiveLoggingConfig,
_get_logging_config,
_load_logging_config,
_warn_if_missing_remote_task_log,
get_default_remote_conn_id,
get_remote_task_log,
load_logging_config,
Expand Down Expand Up @@ -200,3 +201,42 @@ def test_skips_reload_when_already_loaded(self):
mocked_conf.get.return_value = None
assert get_default_remote_conn_id() == "cached_conn"
mock_load.assert_not_called()


class TestWarnIfMissingRemoteTaskLog:
@pytest.fixture(autouse=True)
def _reset_active_logging_config(self, monkeypatch):
monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", None, raising=False)
monkeypatch.setattr(_ActiveLoggingConfig, "logging_config_loaded", True, raising=False)

def test_warns_when_user_module_missing_remote_task_log_and_remote_logging_enabled(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True")
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG")
mock_log.warning.assert_called_once()
assert "my_pkg.custom_settings" in mock_log.warning.call_args.args

def test_no_warning_when_using_fallback_path(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True")
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log(DEFAULT_LOGGING_CONFIG_PATH)
mock_log.warning.assert_not_called()

def test_no_warning_when_remote_logging_disabled(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "False")
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG")
mock_log.warning.assert_not_called()

def test_no_warning_when_remote_task_log_is_set(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True")
monkeypatch.setattr(_ActiveLoggingConfig, "remote_task_log", object(), raising=False)
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log("my_pkg.custom_settings.LOGGING_CONFIG")
mock_log.warning.assert_not_called()

def test_no_warning_when_empty_logging_class_path(self, monkeypatch):
monkeypatch.setenv("AIRFLOW__LOGGING__REMOTE_LOGGING", "True")
with mock.patch("airflow.logging_config.log") as mock_log:
_warn_if_missing_remote_task_log("")
mock_log.warning.assert_not_called()
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ apache-airflow = "airflow.__main__:main"
"apache-airflow-providers-edge3>=1.0.0"
]
"elasticsearch" = [
"apache-airflow-providers-elasticsearch>=6.5.0" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
"apache-airflow-providers-elasticsearch>=6.6.0" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
]
"exasol" = [
"apache-airflow-providers-exasol>=4.6.1"
Expand Down Expand Up @@ -306,7 +306,7 @@ apache-airflow = "airflow.__main__:main"
"apache-airflow-providers-openlineage>=2.3.0" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
]
"opensearch" = [
"apache-airflow-providers-opensearch>=1.9.0" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
"apache-airflow-providers-opensearch>=1.9.3" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
]
"opsgenie" = [
"apache-airflow-providers-opsgenie>=5.8.0"
Expand Down Expand Up @@ -447,7 +447,7 @@ apache-airflow = "airflow.__main__:main"
"apache-airflow-providers-discord>=3.9.0",
"apache-airflow-providers-docker>=3.14.1",
"apache-airflow-providers-edge3>=1.0.0",
"apache-airflow-providers-elasticsearch>=6.5.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
"apache-airflow-providers-elasticsearch>=6.6.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
"apache-airflow-providers-exasol>=4.6.1",
"apache-airflow-providers-fab>=3.6.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
"apache-airflow-providers-facebook>=3.7.0",
Expand Down Expand Up @@ -476,7 +476,7 @@ apache-airflow = "airflow.__main__:main"
"apache-airflow-providers-openai>=1.5.0",
"apache-airflow-providers-openfaas>=3.7.0",
"apache-airflow-providers-openlineage>=2.3.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
"apache-airflow-providers-opensearch>=1.9.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
"apache-airflow-providers-opensearch>=1.9.3", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
"apache-airflow-providers-opsgenie>=5.8.0",
"apache-airflow-providers-oracle>=3.12.0",
"apache-airflow-providers-pagerduty>=3.8.1",
Expand Down
4 changes: 2 additions & 2 deletions scripts/ci/prek/update_airflow_pyproject_toml.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@
"openlineage": parse_version("2.3.0"),
"git": parse_version("0.0.2"),
"common.messaging": parse_version("2.0.0"),
"elasticsearch": parse_version("6.5.0"),
"opensearch": parse_version("1.9.0"),
"elasticsearch": parse_version("6.6.0"),
"opensearch": parse_version("1.9.3"),
}


Expand Down
20 changes: 19 additions & 1 deletion shared/logging/src/airflow_shared/logging/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,25 @@ def discover_remote_log_handler(
fallback_path: str,
import_string: Callable[[str], Any],
) -> tuple[RemoteLogIO | None, str | None]:
"""Discover and load the remote log handler from a logging config module."""
"""
Look up the optional remote-log handler alongside a logging dictConfig.

``[logging] logging_config_class`` is a dotted path to a
``logging.config.dictConfig`` dict. After importing the dict, this helper
re-imports the enclosing module and reads two optional module-level
attributes via ``getattr``:

* ``REMOTE_TASK_LOG`` — :class:`RemoteLogIO` / :class:`RemoteLogStreamIO`
instance that uploads and reads task logs.
* ``DEFAULT_REMOTE_CONN_ID`` — default Airflow connection id for that
backend.

Either may be ``None`` immediately after this call; provider task handlers
can still populate ``REMOTE_TASK_LOG`` from inside ``__init__`` when
``dictConfig`` instantiates them (deprecated path). Callers that want to
warn on missing remote-log configuration should re-check
``_ActiveLoggingConfig.remote_task_log`` *after* ``dictConfig`` has run.
"""
# Sometimes we end up with `""` as the value!
logging_class_path = logging_class_path or fallback_path

Expand Down
7 changes: 6 additions & 1 deletion task-sdk/src/airflow/sdk/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,12 @@ def init_log_file(local_relative_path: str) -> Path:


def _load_logging_config() -> None:
"""Load and cache the remote logging configuration from SDK config."""
"""
Load and cache the remote logging configuration from SDK config.

SDK mirror of :func:`airflow.logging_config.load_logging_config` — see that
function for the ``logging_config_class`` / ``REMOTE_TASK_LOG`` contract.
"""
from airflow.sdk._shared.logging.factory import resolve_remote_task_log
from airflow.sdk._shared.module_loading import import_string
from airflow.sdk.configuration import conf
Expand Down
Loading