Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ flower_url_prefix =
flower_port = 5555
flower_basic_auth =
sync_parallelism = 0
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
celery_config_options = airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
Expand Down
40 changes: 9 additions & 31 deletions airflow-core/src/airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,32 +196,31 @@ def __init__(
*args,
**kwargs,
):
configuration_description = retrieve_configuration_description(include_providers=False)
_configuration_description = retrieve_configuration_description(include_providers=False)
# For those who would like to use a different data structure to keep defaults:
# We have to keep the default values in a ConfigParser rather than in any other
# data structure, because the values we have might contain %% which are ConfigParser
# interpolation placeholders. The _default_values config parser will interpolate them
# properly when we call get() on it.
_default_values = create_default_config_parser(configuration_description)
_default_values = create_default_config_parser(_configuration_description)
from airflow.providers_manager import ProvidersManager

super().__init__(
configuration_description,
_configuration_description,
_default_values,
ProvidersManager,
create_default_config_parser,
_default_config_file_path("provider_config_fallback_defaults.cfg"),
*args,
**kwargs,
)
self.configuration_description = configuration_description
self._configuration_description = _configuration_description
self._default_values = _default_values
if default_config is not None:
self._update_defaults_from_string(default_config)
self._update_logging_deprecated_template_to_one_from_defaults()
self.is_validated = False
self._suppress_future_warnings = False
self._providers_configuration_loaded = False

@property
def _validators(self) -> list[Callable[[], None]]:
Expand Down Expand Up @@ -367,24 +366,6 @@ def write_custom_config(
if content:
file.write(f"{content}\n\n")

def _ensure_providers_config_loaded(self) -> None:
"""Ensure providers configurations are loaded."""
if not self._providers_configuration_loaded:
from airflow.providers_manager import ProvidersManager

ProvidersManager().initialize_providers_configuration()

def _ensure_providers_config_unloaded(self) -> bool:
"""Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded."""
if self._providers_configuration_loaded:
self.restore_core_default_configuration()
return True
return False

def _reload_provider_configs(self) -> None:
"""Reload providers configuration."""
self.load_providers_configuration()

def _upgrade_postgres_metastore_conn(self):
"""
Upgrade SQL schemas.
Expand Down Expand Up @@ -514,7 +495,7 @@ def expand_all_configuration_values(self):
for key, value in self.items(section):
if value is not None:
if self.has_option(section, key):
self.remove_option(section, key)
self.remove_option(section, key, remove_default=False)
if self.is_template(section, key) or not isinstance(value, str):
self.set(section, key, value)
else:
Expand All @@ -525,11 +506,6 @@ def remove_all_read_configurations(self):
for section in self.sections():
self.remove_section(section)

@property
def providers_configuration_loaded(self) -> bool:
"""Checks if providers have been loaded."""
return self._providers_configuration_loaded

def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
"""
Override to use module-level function that reads from global conf.
Expand Down Expand Up @@ -644,16 +620,18 @@ def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
# We know that fernet_key is not set, so we can generate it, set as global key
# and also write it to the config file so that same key will be used next time
_SecretKeys.fernet_key = _generate_fernet_key()
conf.configuration_description["core"]["options"]["fernet_key"]["default"] = (
conf._configuration_description["core"]["options"]["fernet_key"]["default"] = (
_SecretKeys.fernet_key
)
conf._default_values.set("core", "fernet_key", _SecretKeys.fernet_key)

_SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8")
conf.configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = (
conf._configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = (
_SecretKeys.jwt_secret_key
)
conf._default_values.set("api_auth", "jwt_secret", _SecretKeys.jwt_secret_key)
# Invalidate cached configuration_description so it recomputes with the updated base
conf.invalidate_cache()
pathlib.Path(airflow_config.__fspath__()).touch()
make_group_other_inaccessible(airflow_config.__fspath__())
with open(airflow_config, "w") as file:
Expand Down
8 changes: 0 additions & 8 deletions airflow-core/src/airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,10 +613,6 @@ def initialize_providers_configuration(self):
"""Lazy initialization of provider configuration metadata and merge it into ``conf``."""
self.initialize_providers_list()
self._discover_config()
# Imported lazily to avoid a configuration/providers_manager import cycle during startup.
from airflow.configuration import conf

conf.load_providers_configuration()

@provider_info_cache("plugins")
def initialize_providers_plugins(self):
Expand Down Expand Up @@ -1455,10 +1451,6 @@ def provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
self.initialize_providers_configuration()
return sorted(self._provider_configs.items(), key=lambda x: x[0])

@property
def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
return sorted(self._provider_configs.items(), key=lambda x: x[0])

def _cleanup(self):
self._initialized_cache.clear()
self._provider_dict.clear()
Expand Down
204 changes: 171 additions & 33 deletions airflow-core/tests/unit/core/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@
from airflow.providers_manager import ProvidersManager
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.config import (
CFG_FALLBACK_CONFIG_OPTIONS,
PROVIDER_METADATA_CONFIG_OPTIONS,
PROVIDER_METADATA_OVERRIDES_CFG_FALLBACK,
conf_vars,
create_fresh_airflow_config,
)
from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker
from tests_common.test_utils.reset_warning_registry import reset_warning_registry
from unit.utils.test_config import (
Expand Down Expand Up @@ -1876,50 +1882,182 @@ def test_sensitive_values():


@skip_if_force_lowest_dependencies_marker
def test_restore_and_reload_provider_configuration():
def test_provider_configuration_toggle_with_context_manager():
"""Test that make_sure_configuration_loaded toggles provider config on/off."""
from airflow.settings import conf

assert conf.providers_configuration_loaded is True
assert conf._use_providers_configuration is True
# With providers enabled, the provider value is returned via the fallback lookup chain.
assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor"
conf.restore_core_default_configuration()
assert conf.providers_configuration_loaded is False
# built-in pre-2-7 celery executor
assert conf.get("celery", "celery_app_name") == "airflow.executors.celery_executor"
conf.load_providers_configuration()
assert conf.providers_configuration_loaded is True

with conf.make_sure_configuration_loaded(with_providers=False):
assert conf._use_providers_configuration is False
with pytest.raises(
AirflowConfigException,
match=re.escape("section/key [celery/celery_app_name] not found in config"),
):
conf.get("celery", "celery_app_name")
# After the context manager exits, provider config is restored.
assert conf._use_providers_configuration is True
assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor"


@skip_if_force_lowest_dependencies_marker
def test_error_when_contributing_to_existing_section():
def test_provider_sections_do_not_overlap_with_core():
"""Test that provider config sections don't overlap with core configuration sections."""
from airflow.settings import conf

with conf.make_sure_configuration_loaded(with_providers=True):
assert conf.providers_configuration_loaded is True
assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor"
conf.restore_core_default_configuration()
assert conf.providers_configuration_loaded is False
conf.configuration_description["celery"] = {
"description": "Celery Executor configuration",
"options": {
"celery_app_name": {
"default": "test",
}
},
}
conf._default_values.add_section("celery")
conf._default_values.set("celery", "celery_app_name", "test")
assert conf.get("celery", "celery_app_name") == "test"
# patching restoring_core_default_configuration to avoid reloading the defaults
with patch.object(conf, "restore_core_default_configuration"):
core_sections = set(conf._configuration_description.keys())
provider_sections = set(conf._provider_metadata_configuration_description.keys())
overlap = core_sections & provider_sections
assert not overlap, (
f"Provider configuration sections overlap with core sections: {overlap}. "
"Providers must only add new sections, not contribute to existing ones."
)


@skip_if_force_lowest_dependencies_marker
class TestProviderConfigPriority:
"""Tests that conf.get and conf.has_option respect provider metadata and cfg fallbacks with correct priority."""

@pytest.mark.parametrize(
("section", "option", "expected"),
PROVIDER_METADATA_CONFIG_OPTIONS,
ids=[f"{s}.{o}" for s, o, _ in PROVIDER_METADATA_CONFIG_OPTIONS],
)
def test_get_returns_provider_metadata_value(self, section, option, expected):
"""conf.get returns provider metadata (provider.yaml) values."""
from airflow.settings import conf

assert conf.get(section, option) == expected

@pytest.mark.parametrize(
("section", "option", "expected"),
CFG_FALLBACK_CONFIG_OPTIONS,
ids=[f"{s}.{o}" for s, o, _ in CFG_FALLBACK_CONFIG_OPTIONS],
)
def test_cfg_fallback_has_expected_value(self, section, option, expected):
"""provider_config_fallback_defaults.cfg contains expected default values."""
from airflow.settings import conf

assert conf.get_from_provider_cfg_config_fallback_defaults(section, option) == expected

@pytest.mark.parametrize(
("section", "option", "expected"),
PROVIDER_METADATA_CONFIG_OPTIONS,
ids=[f"{s}.{o}" for s, o, _ in PROVIDER_METADATA_CONFIG_OPTIONS],
)
def test_has_option_true_for_provider_metadata(self, section, option, expected):
"""conf.has_option returns True for options defined in provider metadata."""
from airflow.settings import conf

assert conf.has_option(section, option) is True

@pytest.mark.parametrize(
("section", "option", "expected"),
CFG_FALLBACK_CONFIG_OPTIONS,
ids=[f"{s}.{o}" for s, o, _ in CFG_FALLBACK_CONFIG_OPTIONS],
)
def test_has_option_true_for_cfg_fallback(self, section, option, expected):
"""conf.has_option returns True for options in provider_config_fallback_defaults.cfg."""
from airflow.settings import conf

assert conf.has_option(section, option) is True

def test_has_option_false_for_nonexistent_option(self):
"""conf.has_option returns False for options not in any source."""
from airflow.settings import conf

assert conf.has_option("celery", "totally_nonexistent_option_xyz") is False

@pytest.mark.parametrize(
("section", "option", "metadata_value", "cfg_value"),
PROVIDER_METADATA_OVERRIDES_CFG_FALLBACK,
ids=[f"{s}.{o}" for s, o, _, _ in PROVIDER_METADATA_OVERRIDES_CFG_FALLBACK],
)
def test_provider_metadata_overrides_cfg_fallback(self, section, option, metadata_value, cfg_value):
"""Provider metadata values take priority over provider_config_fallback_defaults.cfg values."""
from airflow.settings import conf

assert conf.get(section, option) == metadata_value
assert conf.get_from_provider_cfg_config_fallback_defaults(section, option) == cfg_value

@pytest.mark.parametrize(
("section", "option", "metadata_value", "cfg_value"),
PROVIDER_METADATA_OVERRIDES_CFG_FALLBACK,
ids=[f"{s}.{o}" for s, o, _, _ in PROVIDER_METADATA_OVERRIDES_CFG_FALLBACK],
)
def test_get_default_value_priority(self, section, option, metadata_value, cfg_value):
"""get_default_value checks provider metadata before cfg fallback."""
from airflow.settings import conf

assert conf.get_default_value(section, option) == metadata_value

@pytest.mark.parametrize(
("section", "option", "expected"),
CFG_FALLBACK_CONFIG_OPTIONS + PROVIDER_METADATA_CONFIG_OPTIONS,
ids=[f"{s}.{o}" for s, o, _ in CFG_FALLBACK_CONFIG_OPTIONS + PROVIDER_METADATA_CONFIG_OPTIONS],
)
def test_providers_disabled_dont_get_cfg_defaults_or_provider_metadata(self, section, option, expected):
"""With providers disabled, conf.get raises for provider-only options."""
test_conf = create_fresh_airflow_config()
with test_conf.make_sure_configuration_loaded(with_providers=False):
with pytest.raises(
AirflowConfigException,
match="The provider apache-airflow-providers-celery is attempting to contribute "
"configuration section celery that has already been added before. "
"The source of it: Airflow's core package",
match=re.escape(f"section/key [{section}/{option}] not found in config"),
):
conf.load_providers_configuration()
assert conf.get("celery", "celery_app_name") == "test"
test_conf.get(section, option)

def test_provider_section_absent_when_providers_disabled(self):
"""Provider-contributed sections are excluded from configuration_description when providers disabled."""
test_conf = create_fresh_airflow_config()
with test_conf.make_sure_configuration_loaded(with_providers=False):
desc = test_conf.configuration_description
provider_only_sections = set(test_conf._provider_metadata_configuration_description.keys())
for section in provider_only_sections:
if section not in test_conf._configuration_description:
assert section not in desc

@pytest.mark.parametrize(
("section", "option", "expected"),
CFG_FALLBACK_CONFIG_OPTIONS,
ids=[f"{s}.{o}" for s, o, _ in CFG_FALLBACK_CONFIG_OPTIONS],
)
def test_has_option_returns_false_for_cfg_fallback_when_providers_disabled(
self, section, option, expected
):
"""With providers disabled, conf.has_option returns False for cfg-fallback-only options."""
test_conf = create_fresh_airflow_config()
with test_conf.make_sure_configuration_loaded(with_providers=False):
assert test_conf.has_option(section, option) is False

@pytest.mark.parametrize(
("section", "option", "expected"),
PROVIDER_METADATA_CONFIG_OPTIONS,
ids=[f"{s}.{o}" for s, o, _ in PROVIDER_METADATA_CONFIG_OPTIONS],
)
def test_has_option_returns_false_for_provider_metadata_when_providers_disabled(
self, section, option, expected
):
"""With providers disabled, conf.has_option returns False for provider-metadata-only options."""
test_conf = create_fresh_airflow_config()
with test_conf.make_sure_configuration_loaded(with_providers=False):
assert test_conf.has_option(section, option) is False

def test_env_var_overrides_provider_values(self):
"""Environment variables override both provider metadata and cfg fallback values."""
from airflow.settings import conf

with mock.patch.dict("os.environ", {"AIRFLOW__CELERY__CELERY_APP_NAME": "env_override"}):
assert conf.get("celery", "celery_app_name") == "env_override"

def test_user_config_overrides_provider_values(self):
"""User-set config values (airflow.cfg) override provider defaults."""
from airflow.settings import conf

custom_value = "my_custom.celery_executor"
with conf_vars({("celery", "celery_app_name"): custom_value}):
assert conf.get("celery", "celery_app_name") == custom_value


# Technically it's not a DB test, but we want to make sure it's not interfering with xdist non-db tests
Expand Down
Loading
Loading