diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index a814de07d2e05..20993a6648bf4 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -910,8 +910,11 @@ def test_logging_propogated_by_default(self, caplog): BaseOperator(task_id="test").log.warning("test") # This looks like "how could it fail" but this actually checks that the handler called `emit`. Testing # the other case (that when we have set_context it goes to the file is harder to achieve without - # leaking a lot of state) - assert caplog.messages == ["test"] + # leaking a lot of state). Only assert on the operator's logger so other loggers (e.g. OTLP trace + # export errors in CI) do not affect the test. + operator_logger_prefix = "airflow.task.operators" + operator_messages = [r.message for r in caplog.records if r.name.startswith(operator_logger_prefix)] + assert operator_messages == ["test"] def test_resume_execution(self): from airflow.models.trigger import TriggerFailureReason diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index db5a75e10c18d..df67ebd7f944b 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -266,9 +266,26 @@ def _get_variable(key: str, deserialize_json: bool) -> Any: ) # If no backend found the variable, raise a not found error (mirrors _get_connection) - from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.comms import ErrorResponse + if not hasattr(task_runner, "SUPERVISOR_COMMS"): + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.VARIABLE_NOT_FOUND, + detail={ + "message": ( + f"Variable '{key}' not found. Note: SUPERVISOR_COMMS is not available, " + "which means this code is running outside a task execution context " + "(e.g., at the top level of a DAG file). " + "Consider using environment variables (AIRFLOW_VAR_), " + "Jinja templates ({{ var.value. }}), " + "or move the Variable.get() call inside a task function." + ) + }, + ) + ) + raise AirflowRuntimeError( ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND, detail={"message": f"Variable {key} not found"}) ) @@ -282,11 +299,11 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ # keep Task SDK as a separate package than execution time mods. import json + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.cache import SecretCache - from airflow.sdk.execution_time.comms import PutVariable + from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded - from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS # check for write conflicts on the worker for secrets_backend in ensure_secrets_backend_loaded(): @@ -317,7 +334,21 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ except Exception as e: log.exception(e) - SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description)) + if not hasattr(task_runner, "SUPERVISOR_COMMS"): + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.GENERIC_ERROR, + detail={ + "message": ( + "Variable.set() requires a task execution context (SUPERVISOR_COMMS is not available). " + "This typically happens when calling Variable.set() at the top level of a DAG file " + "or outside of a running task. Variable.set() can only be used inside a task." + ) + }, + ) + ) + + task_runner.SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description)) # Invalidate cache after setting the variable SecretCache.invalidate_variable(key) @@ -329,11 +360,25 @@ def _delete_variable(key: str) -> None: # A reason to not move it to `airflow.sdk.execution_time.comms` is that it # will make that module depend on Task SDK, which is not ideal because we intend to # keep Task SDK as a separate package than execution time mods. + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.cache import SecretCache - from airflow.sdk.execution_time.comms import DeleteVariable - from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + from airflow.sdk.execution_time.comms import DeleteVariable, ErrorResponse + + if not hasattr(task_runner, "SUPERVISOR_COMMS"): + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.GENERIC_ERROR, + detail={ + "message": ( + "Variable.delete() requires a task execution context (SUPERVISOR_COMMS is not available). " + "This typically happens when calling Variable.delete() at the top level of a DAG file " + "or outside of a running task. Variable.delete() can only be used inside a task." + ) + }, + ) + ) - msg = SUPERVISOR_COMMS.send(DeleteVariable(key=key)) + msg = task_runner.SUPERVISOR_COMMS.send(DeleteVariable(key=key)) if TYPE_CHECKING: assert isinstance(msg, OKResponse) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 9894d4fff3153..6e4c023f3c5b0 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1948,7 +1948,7 @@ def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]: try: from airflow.sdk.execution_time import task_runner - if hasattr(task_runner, "SUPERVISOR_COMMS") and task_runner.SUPERVISOR_COMMS is not None: + if hasattr(task_runner, "SUPERVISOR_COMMS"): # Client context: task runner with SUPERVISOR_COMMS return ensure_secrets_loaded(default_backends=DEFAULT_SECRETS_SEARCH_PATH_WORKERS) except (ImportError, AttributeError): diff --git a/task-sdk/tests/task_sdk/definitions/test_variables.py b/task-sdk/tests/task_sdk/definitions/test_variables.py index 3717f834735ff..33e6f0566f3c6 100644 --- a/task-sdk/tests/task_sdk/definitions/test_variables.py +++ b/task-sdk/tests/task_sdk/definitions/test_variables.py @@ -25,6 +25,7 @@ from airflow.sdk import Variable from airflow.sdk.configuration import initialize_secrets_backends +from airflow.sdk.exceptions import AirflowRuntimeError from airflow.sdk.execution_time.comms import PutVariable, VariableResult from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS @@ -186,3 +187,50 @@ def test_backend_fallback_to_env_var(self, mock_get_variable, mock_env_get, mock # mock_env is only called when LocalFilesystemBackend doesn't have it mock_env_get.assert_called() assert var == "fake_value" + + +class TestVariableOutsideTaskContext: + """Tests for Variable operations when SUPERVISOR_COMMS is not set (outside task execution context).""" + + @mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable") + def test_get_with_env_var_works_without_supervisor_comms(self, mock_env_get, monkeypatch): + """Variable.get() should still work via EnvironmentVariablesBackend when SUPERVISOR_COMMS is not set.""" + from airflow.sdk.execution_time import task_runner + + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) + mock_env_get.return_value = "env_value" + + result = Variable.get(key="my_env_var") + assert result == "env_value" + mock_env_get.assert_called_once_with(key="my_env_var") + + def test_get_not_found_without_supervisor_comms(self, monkeypatch): + """Variable.get() should raise with a helpful message when variable not found and SUPERVISOR_COMMS is not set.""" + from airflow.sdk.execution_time import task_runner + + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) + + with pytest.raises(AirflowRuntimeError, match="outside a task execution context"): + Variable.get(key="nonexistent_var") + + def test_set_without_supervisor_comms(self, monkeypatch): + """Variable.set() should raise AirflowRuntimeError when SUPERVISOR_COMMS is not set.""" + from airflow.sdk.execution_time import task_runner + from airflow.sdk.execution_time.context import _set_variable + + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) + + with pytest.raises(AirflowRuntimeError, match="Variable.set\\(\\) requires a task execution context"): + _set_variable(key="my_key", value="my_value") + + def test_delete_without_supervisor_comms(self, monkeypatch): + """Variable.delete() should raise AirflowRuntimeError when SUPERVISOR_COMMS is not set.""" + from airflow.sdk.execution_time import task_runner + from airflow.sdk.execution_time.context import _delete_variable + + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) + + with pytest.raises( + AirflowRuntimeError, match="Variable.delete\\(\\) requires a task execution context" + ): + _delete_variable(key="my_key") diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 46b7f660bb006..683bffffcd3da 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2703,7 +2703,6 @@ def test_set_supervisor_comms_sets_temporarily_when_not_set(self): def test_set_supervisor_comms_unsets_temporarily_when_not_set(self): assert not hasattr(task_runner, "SUPERVISOR_COMMS") - # This will delete an attribute that isn't set, and restore it likewise with set_supervisor_comms(None): assert not hasattr(task_runner, "SUPERVISOR_COMMS")