From 0804ece825a51072948e5fe134c40abec576ea3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Sun, 8 Feb 2026 07:36:35 -0300 Subject: [PATCH 1/7] Fix ImportError when importing SUPERVISOR_COMMS outside task context SUPERVISOR_COMMS was declared as a bare type annotation without assignment, which does not create an actual module attribute in Python. This caused ImportError when Variable.get() was called at the top level of a DAG file (outside task execution context), such as during dag.test(). Initialize SUPERVISOR_COMMS to None so the import always succeeds, add None guards before .send() calls in _set_variable, _delete_variable, and ExecutionAPISecretsBackend methods, and provide helpful error messages suggesting alternatives like environment variables or Jinja templates. Closes: #51816 --- .../src/airflow/sdk/execution_time/context.py | 51 +++++++++++++++++-- .../execution_time/secrets/execution_api.py | 12 +++++ .../airflow/sdk/execution_time/task_runner.py | 2 +- .../task_sdk/definitions/test_variables.py | 46 +++++++++++++++++ 4 files changed, 107 insertions(+), 4 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index db5a75e10c18d..1c02e2727028d 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -266,8 +266,25 @@ def _get_variable(key: str, deserialize_json: bool) -> Any: ) # If no backend found the variable, raise a not found error (mirrors _get_connection) - from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType from airflow.sdk.execution_time.comms import ErrorResponse + from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + + if SUPERVISOR_COMMS is None: + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.VARIABLE_NOT_FOUND, + detail={ + "message": ( + f"Variable '{key}' not found. Note: SUPERVISOR_COMMS is not available, " + "which means this code is running outside a task execution context " + "(e.g., at the top level of a DAG file). " + "Consider using environment variables (AIRFLOW_VAR_), " + "Jinja templates ({{ var.value. }}), " + "or move the Variable.get() call inside a task function." + ) + }, + ) + ) raise AirflowRuntimeError( ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND, detail={"message": f"Variable {key} not found"}) @@ -283,7 +300,7 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ import json from airflow.sdk.execution_time.cache import SecretCache - from airflow.sdk.execution_time.comms import PutVariable + from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS @@ -317,6 +334,20 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ except Exception as e: log.exception(e) + if SUPERVISOR_COMMS is None: + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.GENERIC_ERROR, + detail={ + "message": ( + "Variable.set() requires a task execution context (SUPERVISOR_COMMS is not available). " + "This typically happens when calling Variable.set() at the top level of a DAG file " + "or outside of a running task. Variable.set() can only be used inside a task." + ) + }, + ) + ) + SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description)) # Invalidate cache after setting the variable @@ -330,9 +361,23 @@ def _delete_variable(key: str) -> None: # will make that module depend on Task SDK, which is not ideal because we intend to # keep Task SDK as a separate package than execution time mods. from airflow.sdk.execution_time.cache import SecretCache - from airflow.sdk.execution_time.comms import DeleteVariable + from airflow.sdk.execution_time.comms import DeleteVariable, ErrorResponse from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + if SUPERVISOR_COMMS is None: + raise AirflowRuntimeError( + ErrorResponse( + error=ErrorType.GENERIC_ERROR, + detail={ + "message": ( + "Variable.delete() requires a task execution context (SUPERVISOR_COMMS is not available). " + "This typically happens when calling Variable.delete() at the top level of a DAG file " + "or outside of a running task. Variable.delete() can only be used inside a task." + ) + }, + ) + ) + msg = SUPERVISOR_COMMS.send(DeleteVariable(key=key)) if TYPE_CHECKING: assert isinstance(msg, OKResponse) diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py index a44b23d06dc6d..1a1c6cfdd8e5b 100644 --- a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py +++ b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py @@ -56,6 +56,9 @@ def get_connection(self, conn_id: str, team_name: str | None = None) -> Connecti from airflow.sdk.execution_time.context import _process_connection_result_conn from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + if SUPERVISOR_COMMS is None: + return None + try: msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id)) @@ -102,6 +105,9 @@ def get_variable(self, key: str, team_name: str | None = None) -> str | None: from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable, VariableResult from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + if SUPERVISOR_COMMS is None: + return None + try: msg = SUPERVISOR_COMMS.send(GetVariable(key=key)) @@ -129,6 +135,9 @@ async def aget_connection(self, conn_id: str) -> Connection | None: # type: ign from airflow.sdk.execution_time.context import _process_connection_result_conn from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + if SUPERVISOR_COMMS is None: + return None + try: msg = await SUPERVISOR_COMMS.asend(GetConnection(conn_id=conn_id)) @@ -153,6 +162,9 @@ async def aget_variable(self, key: str) -> str | None: from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable, VariableResult from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + if SUPERVISOR_COMMS is None: + return None + try: msg = await SUPERVISOR_COMMS.asend(GetVariable(key=key)) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index ba52ca064f6b8..fc2e8aeb603b4 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -799,7 +799,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: # deeply nested execution stack. # - By defining `SUPERVISOR_COMMS` as a global, it ensures that this communication mechanism is readily # accessible wherever needed during task execution without modifying every layer of the call stack. -SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] +SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] | None = None # State machine! diff --git a/task-sdk/tests/task_sdk/definitions/test_variables.py b/task-sdk/tests/task_sdk/definitions/test_variables.py index 3717f834735ff..88572dc7f6df8 100644 --- a/task-sdk/tests/task_sdk/definitions/test_variables.py +++ b/task-sdk/tests/task_sdk/definitions/test_variables.py @@ -25,6 +25,7 @@ from airflow.sdk import Variable from airflow.sdk.configuration import initialize_secrets_backends +from airflow.sdk.exceptions import AirflowRuntimeError from airflow.sdk.execution_time.comms import PutVariable, VariableResult from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS @@ -186,3 +187,48 @@ def test_backend_fallback_to_env_var(self, mock_get_variable, mock_env_get, mock # mock_env is only called when LocalFilesystemBackend doesn't have it mock_env_get.assert_called() assert var == "fake_value" + + +class TestVariableOutsideTaskContext: + """Tests for Variable operations when SUPERVISOR_COMMS is None (outside task execution context).""" + + @mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable") + def test_get_with_env_var_works_without_supervisor_comms(self, mock_env_get, monkeypatch): + """Variable.get() should still work via EnvironmentVariablesBackend when SUPERVISOR_COMMS is None.""" + from airflow.sdk.execution_time import task_runner + + monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + mock_env_get.return_value = "env_value" + + result = Variable.get(key="my_env_var") + assert result == "env_value" + mock_env_get.assert_called_once_with(key="my_env_var") + + def test_get_not_found_without_supervisor_comms(self, monkeypatch): + """Variable.get() should raise with a helpful message when variable not found and SUPERVISOR_COMMS is None.""" + from airflow.sdk.execution_time import task_runner + + monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + + with pytest.raises(AirflowRuntimeError, match="outside a task execution context"): + Variable.get(key="nonexistent_var") + + def test_set_without_supervisor_comms(self, monkeypatch): + """Variable.set() should raise AirflowRuntimeError when SUPERVISOR_COMMS is None.""" + from airflow.sdk.execution_time import task_runner + from airflow.sdk.execution_time.context import _set_variable + + monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + + with pytest.raises(AirflowRuntimeError, match="Variable.set\\(\\) requires a task execution context"): + _set_variable(key="my_key", value="my_value") + + def test_delete_without_supervisor_comms(self, monkeypatch): + """Variable.delete() should raise AirflowRuntimeError when SUPERVISOR_COMMS is None.""" + from airflow.sdk.execution_time import task_runner + from airflow.sdk.execution_time.context import _delete_variable + + monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + + with pytest.raises(AirflowRuntimeError, match="Variable.delete\\(\\) requires a task execution context"): + _delete_variable(key="my_key") From 00160df174c1f28c4ac6940d0881a30296376089 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Tue, 3 Mar 2026 16:22:30 -0300 Subject: [PATCH 2/7] Fix CI failures: MyPy, hasattr routing, reinit_supervisor_comms, and ruff format Use type: ignore[assignment] instead of | None for SUPERVISOR_COMMS to avoid 39 mypy union-attr errors. Replace hasattr with getattr is not None in variable.py and connection.py. Fix reinit_supervisor_comms to check is None. Simplify set_supervisor_comms context manager. Update test assertions. --- airflow-core/src/airflow/models/connection.py | 10 +++++++-- airflow-core/src/airflow/models/variable.py | 20 ++++++++++++++---- .../airflow/sdk/execution_time/supervisor.py | 21 ++++++------------- .../airflow/sdk/execution_time/task_runner.py | 6 +++--- .../task_sdk/definitions/test_variables.py | 4 +++- .../execution_time/test_supervisor.py | 18 +++++++--------- 6 files changed, 43 insertions(+), 36 deletions(-) diff --git a/airflow-core/src/airflow/models/connection.py b/airflow-core/src/airflow/models/connection.py index 0e1505f4667dd..8831ff0fd26a5 100644 --- a/airflow-core/src/airflow/models/connection.py +++ b/airflow-core/src/airflow/models/connection.py @@ -505,7 +505,10 @@ def get_connection_from_secrets(cls, conn_id: str, team_name: str | None = None) # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): + if ( + getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) + is not None + ): from airflow.sdk import Connection as TaskSDKConnection from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType @@ -589,7 +592,10 @@ def to_dict(self, *, prune_empty: bool = False, validate: bool = True) -> dict[s @classmethod def from_json(cls, value, conn_id=None) -> Connection: - if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): + if ( + getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) + is not None + ): from airflow.sdk import Connection as TaskSDKConnection warnings.warn( diff --git a/airflow-core/src/airflow/models/variable.py b/airflow-core/src/airflow/models/variable.py index 5435326de5417..537793e715011 100644 --- a/airflow-core/src/airflow/models/variable.py +++ b/airflow-core/src/airflow/models/variable.py @@ -154,7 +154,10 @@ def get( # If this is set it means we are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): + if ( + getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) + is not None + ): warnings.warn( "Using Variable.get from `airflow.models` is deprecated." "Please use `get` on Variable from sdk(`airflow.sdk.Variable`) instead", @@ -214,7 +217,10 @@ def set( # If this is set it means we are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): + if ( + getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) + is not None + ): warnings.warn( "Using Variable.set from `airflow.models` is deprecated." "Please use `set` on Variable from sdk(`airflow.sdk.Variable`) instead", @@ -345,7 +351,10 @@ def update( # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): + if ( + getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) + is not None + ): warnings.warn( "Using Variable.update from `airflow.models` is deprecated." "Please use `set` on Variable from sdk(`airflow.sdk.Variable`) instead as it is an upsert.", @@ -411,7 +420,10 @@ def delete(key: str, team_name: str | None = None, session: Session | None = Non # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): + if ( + getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) + is not None + ): warnings.warn( "Using Variable.delete from `airflow.models` is deprecated." "Please use `delete` on Variable from sdk(`airflow.sdk.Variable`) instead", diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 9894d4fff3153..bab36488148c5 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1747,28 +1747,19 @@ def set_supervisor_comms(temp_comms): by injecting a test Comms implementation (e.g. `InProcessSupervisorComms`) in place of the real inter-process communication layer. - Some parts of the code (e.g. models.Variable.get) check for the presence - of `task_runner.SUPERVISOR_COMMS` to determine if the code is running in a Task SDK execution context. - This override ensures those code paths behave correctly during in-process tests. + Some parts of the code (e.g. models.Variable.get) check that + `task_runner.SUPERVISOR_COMMS` is not None to determine if the code is running in a Task SDK + execution context. This override ensures those code paths behave correctly during in-process tests. """ from airflow.sdk.execution_time import task_runner - sentinel = object() - old = getattr(task_runner, "SUPERVISOR_COMMS", sentinel) - - if temp_comms is not None: - task_runner.SUPERVISOR_COMMS = temp_comms - elif old is not sentinel: - delattr(task_runner, "SUPERVISOR_COMMS") + old = task_runner.SUPERVISOR_COMMS + task_runner.SUPERVISOR_COMMS = temp_comms try: yield finally: - if old is sentinel: - if hasattr(task_runner, "SUPERVISOR_COMMS"): - delattr(task_runner, "SUPERVISOR_COMMS") - else: - task_runner.SUPERVISOR_COMMS = old + task_runner.SUPERVISOR_COMMS = old def run_task_in_process(ti: TaskInstance, task) -> TaskRunResult: diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index db8e2ecc04ccf..4a64f520755e3 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -798,7 +798,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: # deeply nested execution stack. # - By defining `SUPERVISOR_COMMS` as a global, it ensures that this communication mechanism is readily # accessible wherever needed during task execution without modifying every layer of the call stack. -SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] | None = None +SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] = None # type: ignore[assignment] # State machine! @@ -1849,8 +1849,8 @@ def reinit_supervisor_comms() -> None: """ import socket - if "SUPERVISOR_COMMS" not in globals(): - global SUPERVISOR_COMMS + global SUPERVISOR_COMMS + if SUPERVISOR_COMMS is None: log = structlog.get_logger(logger_name="task") fd = int(os.environ.get("__AIRFLOW_SUPERVISOR_FD", "0")) diff --git a/task-sdk/tests/task_sdk/definitions/test_variables.py b/task-sdk/tests/task_sdk/definitions/test_variables.py index 88572dc7f6df8..56725c3e02bae 100644 --- a/task-sdk/tests/task_sdk/definitions/test_variables.py +++ b/task-sdk/tests/task_sdk/definitions/test_variables.py @@ -230,5 +230,7 @@ def test_delete_without_supervisor_comms(self, monkeypatch): monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) - with pytest.raises(AirflowRuntimeError, match="Variable.delete\\(\\) requires a task execution context"): + with pytest.raises( + AirflowRuntimeError, match="Variable.delete\\(\\) requires a task execution context" + ): _delete_variable(key="my_key") diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 46b7f660bb006..99a1399f8dc0d 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2676,12 +2676,9 @@ class DummyComms: @pytest.fixture(autouse=True) def cleanup_supervisor_comms(self): - # Ensure clean state before/after test - if hasattr(task_runner, "SUPERVISOR_COMMS"): - delattr(task_runner, "SUPERVISOR_COMMS") + task_runner.SUPERVISOR_COMMS = None # type: ignore[assignment] yield - if hasattr(task_runner, "SUPERVISOR_COMMS"): - delattr(task_runner, "SUPERVISOR_COMMS") + task_runner.SUPERVISOR_COMMS = None # type: ignore[assignment] def test_set_supervisor_comms_overrides_and_restores(self): task_runner.SUPERVISOR_COMMS = self.DummyComms() @@ -2693,21 +2690,20 @@ def test_set_supervisor_comms_overrides_and_restores(self): assert task_runner.SUPERVISOR_COMMS is original def test_set_supervisor_comms_sets_temporarily_when_not_set(self): - assert not hasattr(task_runner, "SUPERVISOR_COMMS") + assert task_runner.SUPERVISOR_COMMS is None replacement = self.DummyComms() with set_supervisor_comms(replacement): assert task_runner.SUPERVISOR_COMMS is replacement - assert not hasattr(task_runner, "SUPERVISOR_COMMS") + assert task_runner.SUPERVISOR_COMMS is None def test_set_supervisor_comms_unsets_temporarily_when_not_set(self): - assert not hasattr(task_runner, "SUPERVISOR_COMMS") + assert task_runner.SUPERVISOR_COMMS is None - # This will delete an attribute that isn't set, and restore it likewise with set_supervisor_comms(None): - assert not hasattr(task_runner, "SUPERVISOR_COMMS") + assert task_runner.SUPERVISOR_COMMS is None - assert not hasattr(task_runner, "SUPERVISOR_COMMS") + assert task_runner.SUPERVISOR_COMMS is None class TestInProcessTestSupervisor: From e1ff6643b8b4c2c7503591c883d4f07e48a3a4ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Thu, 5 Mar 2026 17:29:11 -0300 Subject: [PATCH 3/7] Fix test_logging_propogated_by_default flakiness from OTLP trace logs Assert only on operator logger messages so CI logs (e.g. OTLP connection errors to localhost:4318) do not break the test. --- .../tests/unit/serialization/test_serialized_objects.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index d4f549f6df1e7..a5e55072e0c54 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -842,8 +842,13 @@ def test_logging_propogated_by_default(self, caplog): BaseOperator(task_id="test").log.warning("test") # This looks like "how could it fail" but this actually checks that the handler called `emit`. Testing # the other case (that when we have set_context it goes to the file is harder to achieve without - # leaking a lot of state) - assert caplog.messages == ["test"] + # leaking a lot of state). Only assert on the operator's logger so other loggers (e.g. OTLP trace + # export errors in CI) do not affect the test. + operator_logger_prefix = "airflow.task.operators" + operator_messages = [ + r.message for r in caplog.records if r.name.startswith(operator_logger_prefix) + ] + assert operator_messages == ["test"] def test_resume_execution(self): from airflow.models.trigger import TriggerFailureReason From 56115ee3f6d118408fc5378f058943375b65bf67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Fri, 6 Mar 2026 08:34:18 -0300 Subject: [PATCH 4/7] Apply ruff format to test_serialized_objects.py --- .../tests/unit/serialization/test_serialized_objects.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index b8b2ce025e6c7..ad46fe82244a1 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -888,9 +888,7 @@ def test_logging_propogated_by_default(self, caplog): # leaking a lot of state). Only assert on the operator's logger so other loggers (e.g. OTLP trace # export errors in CI) do not affect the test. operator_logger_prefix = "airflow.task.operators" - operator_messages = [ - r.message for r in caplog.records if r.name.startswith(operator_logger_prefix) - ] + operator_messages = [r.message for r in caplog.records if r.name.startswith(operator_logger_prefix)] assert operator_messages == ["test"] def test_resume_execution(self): From 5cc69bac9acc3b53d311e7f30730091146b55142 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Wed, 11 Mar 2026 15:57:15 -0300 Subject: [PATCH 5/7] Apply ruff format to task_runner.py --- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index c41e4a9387018..7b3c738035be0 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1889,7 +1889,6 @@ def reinit_supervisor_comms() -> None: global SUPERVISOR_COMMS if SUPERVISOR_COMMS is None: - fd = int(os.environ.get("__AIRFLOW_SUPERVISOR_FD", "0")) SUPERVISOR_COMMS = CommsDecoder[ToTask, ToSupervisor](log=log, socket=socket.socket(fileno=fd)) From a273646bd6baa5b58deb479e40aecbfdab5c5ad2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Thu, 12 Mar 2026 09:43:55 -0300 Subject: [PATCH 6/7] Revert SUPERVISOR_COMMS=None, use hasattr checks instead MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review feedback: keep SUPERVISOR_COMMS as a bare type annotation (not initialized to None) and use hasattr checks in the few places that need to detect task execution context. Signed-off-by: André Ahlert --- airflow-core/src/airflow/models/connection.py | 10 ++------ airflow-core/src/airflow/models/variable.py | 20 ++++------------ .../src/airflow/sdk/execution_time/context.py | 16 ++++++------- .../execution_time/secrets/execution_api.py | 12 ---------- .../airflow/sdk/execution_time/supervisor.py | 23 +++++++++++++------ .../airflow/sdk/execution_time/task_runner.py | 7 +++--- .../task_sdk/definitions/test_variables.py | 18 +++++++-------- .../execution_time/test_supervisor.py | 17 ++++++++------ 8 files changed, 53 insertions(+), 70 deletions(-) diff --git a/airflow-core/src/airflow/models/connection.py b/airflow-core/src/airflow/models/connection.py index 8831ff0fd26a5..0e1505f4667dd 100644 --- a/airflow-core/src/airflow/models/connection.py +++ b/airflow-core/src/airflow/models/connection.py @@ -505,10 +505,7 @@ def get_connection_from_secrets(cls, conn_id: str, team_name: str | None = None) # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if ( - getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) - is not None - ): + if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): from airflow.sdk import Connection as TaskSDKConnection from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType @@ -592,10 +589,7 @@ def to_dict(self, *, prune_empty: bool = False, validate: bool = True) -> dict[s @classmethod def from_json(cls, value, conn_id=None) -> Connection: - if ( - getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) - is not None - ): + if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): from airflow.sdk import Connection as TaskSDKConnection warnings.warn( diff --git a/airflow-core/src/airflow/models/variable.py b/airflow-core/src/airflow/models/variable.py index 537793e715011..5435326de5417 100644 --- a/airflow-core/src/airflow/models/variable.py +++ b/airflow-core/src/airflow/models/variable.py @@ -154,10 +154,7 @@ def get( # If this is set it means we are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if ( - getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) - is not None - ): + if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): warnings.warn( "Using Variable.get from `airflow.models` is deprecated." "Please use `get` on Variable from sdk(`airflow.sdk.Variable`) instead", @@ -217,10 +214,7 @@ def set( # If this is set it means we are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if ( - getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) - is not None - ): + if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): warnings.warn( "Using Variable.set from `airflow.models` is deprecated." "Please use `set` on Variable from sdk(`airflow.sdk.Variable`) instead", @@ -351,10 +345,7 @@ def update( # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if ( - getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) - is not None - ): + if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): warnings.warn( "Using Variable.update from `airflow.models` is deprecated." "Please use `set` on Variable from sdk(`airflow.sdk.Variable`) instead as it is an upsert.", @@ -420,10 +411,7 @@ def delete(key: str, team_name: str | None = None, session: Session | None = Non # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps) # and should use the Task SDK API server path - if ( - getattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS", None) - is not None - ): + if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISOR_COMMS"): warnings.warn( "Using Variable.delete from `airflow.models` is deprecated." "Please use `delete` on Variable from sdk(`airflow.sdk.Variable`) instead", diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index 1c02e2727028d..f5639b9386929 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -266,10 +266,10 @@ def _get_variable(key: str, deserialize_json: bool) -> Any: ) # If no backend found the variable, raise a not found error (mirrors _get_connection) + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.comms import ErrorResponse - from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: + if not hasattr(task_runner, "SUPERVISOR_COMMS"): raise AirflowRuntimeError( ErrorResponse( error=ErrorType.VARIABLE_NOT_FOUND, @@ -300,10 +300,10 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ import json from airflow.sdk.execution_time.cache import SecretCache + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded - from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS # check for write conflicts on the worker for secrets_backend in ensure_secrets_backend_loaded(): @@ -334,7 +334,7 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ except Exception as e: log.exception(e) - if SUPERVISOR_COMMS is None: + if not hasattr(task_runner, "SUPERVISOR_COMMS"): raise AirflowRuntimeError( ErrorResponse( error=ErrorType.GENERIC_ERROR, @@ -348,7 +348,7 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ ) ) - SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description)) + task_runner.SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description)) # Invalidate cache after setting the variable SecretCache.invalidate_variable(key) @@ -361,10 +361,10 @@ def _delete_variable(key: str) -> None: # will make that module depend on Task SDK, which is not ideal because we intend to # keep Task SDK as a separate package than execution time mods. from airflow.sdk.execution_time.cache import SecretCache + from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.comms import DeleteVariable, ErrorResponse - from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: + if not hasattr(task_runner, "SUPERVISOR_COMMS"): raise AirflowRuntimeError( ErrorResponse( error=ErrorType.GENERIC_ERROR, @@ -378,7 +378,7 @@ def _delete_variable(key: str) -> None: ) ) - msg = SUPERVISOR_COMMS.send(DeleteVariable(key=key)) + msg = task_runner.SUPERVISOR_COMMS.send(DeleteVariable(key=key)) if TYPE_CHECKING: assert isinstance(msg, OKResponse) diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py index 1a1c6cfdd8e5b..a44b23d06dc6d 100644 --- a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py +++ b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py @@ -56,9 +56,6 @@ def get_connection(self, conn_id: str, team_name: str | None = None) -> Connecti from airflow.sdk.execution_time.context import _process_connection_result_conn from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: - return None - try: msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id)) @@ -105,9 +102,6 @@ def get_variable(self, key: str, team_name: str | None = None) -> str | None: from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable, VariableResult from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: - return None - try: msg = SUPERVISOR_COMMS.send(GetVariable(key=key)) @@ -135,9 +129,6 @@ async def aget_connection(self, conn_id: str) -> Connection | None: # type: ign from airflow.sdk.execution_time.context import _process_connection_result_conn from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: - return None - try: msg = await SUPERVISOR_COMMS.asend(GetConnection(conn_id=conn_id)) @@ -162,9 +153,6 @@ async def aget_variable(self, key: str) -> str | None: from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable, VariableResult from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: - return None - try: msg = await SUPERVISOR_COMMS.asend(GetVariable(key=key)) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index bab36488148c5..6e4c023f3c5b0 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1747,19 +1747,28 @@ def set_supervisor_comms(temp_comms): by injecting a test Comms implementation (e.g. `InProcessSupervisorComms`) in place of the real inter-process communication layer. - Some parts of the code (e.g. models.Variable.get) check that - `task_runner.SUPERVISOR_COMMS` is not None to determine if the code is running in a Task SDK - execution context. This override ensures those code paths behave correctly during in-process tests. + Some parts of the code (e.g. models.Variable.get) check for the presence + of `task_runner.SUPERVISOR_COMMS` to determine if the code is running in a Task SDK execution context. + This override ensures those code paths behave correctly during in-process tests. """ from airflow.sdk.execution_time import task_runner - old = task_runner.SUPERVISOR_COMMS - task_runner.SUPERVISOR_COMMS = temp_comms + sentinel = object() + old = getattr(task_runner, "SUPERVISOR_COMMS", sentinel) + + if temp_comms is not None: + task_runner.SUPERVISOR_COMMS = temp_comms + elif old is not sentinel: + delattr(task_runner, "SUPERVISOR_COMMS") try: yield finally: - task_runner.SUPERVISOR_COMMS = old + if old is sentinel: + if hasattr(task_runner, "SUPERVISOR_COMMS"): + delattr(task_runner, "SUPERVISOR_COMMS") + else: + task_runner.SUPERVISOR_COMMS = old def run_task_in_process(ti: TaskInstance, task) -> TaskRunResult: @@ -1939,7 +1948,7 @@ def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]: try: from airflow.sdk.execution_time import task_runner - if hasattr(task_runner, "SUPERVISOR_COMMS") and task_runner.SUPERVISOR_COMMS is not None: + if hasattr(task_runner, "SUPERVISOR_COMMS"): # Client context: task runner with SUPERVISOR_COMMS return ensure_secrets_loaded(default_backends=DEFAULT_SECRETS_SEARCH_PATH_WORKERS) except (ImportError, AttributeError): diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 7b3c738035be0..674935f5eaecb 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -820,7 +820,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: # deeply nested execution stack. # - By defining `SUPERVISOR_COMMS` as a global, it ensures that this communication mechanism is readily # accessible wherever needed during task execution without modifying every layer of the call stack. -SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] = None # type: ignore[assignment] +SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] # State machine! @@ -1887,8 +1887,9 @@ def reinit_supervisor_comms() -> None: """ import socket - global SUPERVISOR_COMMS - if SUPERVISOR_COMMS is None: + if "SUPERVISOR_COMMS" not in globals(): + global SUPERVISOR_COMMS + fd = int(os.environ.get("__AIRFLOW_SUPERVISOR_FD", "0")) SUPERVISOR_COMMS = CommsDecoder[ToTask, ToSupervisor](log=log, socket=socket.socket(fileno=fd)) diff --git a/task-sdk/tests/task_sdk/definitions/test_variables.py b/task-sdk/tests/task_sdk/definitions/test_variables.py index 56725c3e02bae..33e6f0566f3c6 100644 --- a/task-sdk/tests/task_sdk/definitions/test_variables.py +++ b/task-sdk/tests/task_sdk/definitions/test_variables.py @@ -190,14 +190,14 @@ def test_backend_fallback_to_env_var(self, mock_get_variable, mock_env_get, mock class TestVariableOutsideTaskContext: - """Tests for Variable operations when SUPERVISOR_COMMS is None (outside task execution context).""" + """Tests for Variable operations when SUPERVISOR_COMMS is not set (outside task execution context).""" @mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable") def test_get_with_env_var_works_without_supervisor_comms(self, mock_env_get, monkeypatch): - """Variable.get() should still work via EnvironmentVariablesBackend when SUPERVISOR_COMMS is None.""" + """Variable.get() should still work via EnvironmentVariablesBackend when SUPERVISOR_COMMS is not set.""" from airflow.sdk.execution_time import task_runner - monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) mock_env_get.return_value = "env_value" result = Variable.get(key="my_env_var") @@ -205,30 +205,30 @@ def test_get_with_env_var_works_without_supervisor_comms(self, mock_env_get, mon mock_env_get.assert_called_once_with(key="my_env_var") def test_get_not_found_without_supervisor_comms(self, monkeypatch): - """Variable.get() should raise with a helpful message when variable not found and SUPERVISOR_COMMS is None.""" + """Variable.get() should raise with a helpful message when variable not found and SUPERVISOR_COMMS is not set.""" from airflow.sdk.execution_time import task_runner - monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) with pytest.raises(AirflowRuntimeError, match="outside a task execution context"): Variable.get(key="nonexistent_var") def test_set_without_supervisor_comms(self, monkeypatch): - """Variable.set() should raise AirflowRuntimeError when SUPERVISOR_COMMS is None.""" + """Variable.set() should raise AirflowRuntimeError when SUPERVISOR_COMMS is not set.""" from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.context import _set_variable - monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) with pytest.raises(AirflowRuntimeError, match="Variable.set\\(\\) requires a task execution context"): _set_variable(key="my_key", value="my_value") def test_delete_without_supervisor_comms(self, monkeypatch): - """Variable.delete() should raise AirflowRuntimeError when SUPERVISOR_COMMS is None.""" + """Variable.delete() should raise AirflowRuntimeError when SUPERVISOR_COMMS is not set.""" from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.context import _delete_variable - monkeypatch.setattr(task_runner, "SUPERVISOR_COMMS", None) + monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False) with pytest.raises( AirflowRuntimeError, match="Variable.delete\\(\\) requires a task execution context" diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 99a1399f8dc0d..683bffffcd3da 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2676,9 +2676,12 @@ class DummyComms: @pytest.fixture(autouse=True) def cleanup_supervisor_comms(self): - task_runner.SUPERVISOR_COMMS = None # type: ignore[assignment] + # Ensure clean state before/after test + if hasattr(task_runner, "SUPERVISOR_COMMS"): + delattr(task_runner, "SUPERVISOR_COMMS") yield - task_runner.SUPERVISOR_COMMS = None # type: ignore[assignment] + if hasattr(task_runner, "SUPERVISOR_COMMS"): + delattr(task_runner, "SUPERVISOR_COMMS") def test_set_supervisor_comms_overrides_and_restores(self): task_runner.SUPERVISOR_COMMS = self.DummyComms() @@ -2690,20 +2693,20 @@ def test_set_supervisor_comms_overrides_and_restores(self): assert task_runner.SUPERVISOR_COMMS is original def test_set_supervisor_comms_sets_temporarily_when_not_set(self): - assert task_runner.SUPERVISOR_COMMS is None + assert not hasattr(task_runner, "SUPERVISOR_COMMS") replacement = self.DummyComms() with set_supervisor_comms(replacement): assert task_runner.SUPERVISOR_COMMS is replacement - assert task_runner.SUPERVISOR_COMMS is None + assert not hasattr(task_runner, "SUPERVISOR_COMMS") def test_set_supervisor_comms_unsets_temporarily_when_not_set(self): - assert task_runner.SUPERVISOR_COMMS is None + assert not hasattr(task_runner, "SUPERVISOR_COMMS") with set_supervisor_comms(None): - assert task_runner.SUPERVISOR_COMMS is None + assert not hasattr(task_runner, "SUPERVISOR_COMMS") - assert task_runner.SUPERVISOR_COMMS is None + assert not hasattr(task_runner, "SUPERVISOR_COMMS") class TestInProcessTestSupervisor: From 70fdb01e9044249b1a7db1acb53bb91919272fc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Mon, 16 Mar 2026 05:40:24 -0300 Subject: [PATCH 7/7] Fix import ordering in context.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: André Ahlert --- task-sdk/src/airflow/sdk/execution_time/context.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index f5639b9386929..df67ebd7f944b 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -299,8 +299,8 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ # keep Task SDK as a separate package than execution time mods. import json - from airflow.sdk.execution_time.cache import SecretCache from airflow.sdk.execution_time import task_runner + from airflow.sdk.execution_time.cache import SecretCache from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded @@ -360,8 +360,8 @@ def _delete_variable(key: str) -> None: # A reason to not move it to `airflow.sdk.execution_time.comms` is that it # will make that module depend on Task SDK, which is not ideal because we intend to # keep Task SDK as a separate package than execution time mods. - from airflow.sdk.execution_time.cache import SecretCache from airflow.sdk.execution_time import task_runner + from airflow.sdk.execution_time.cache import SecretCache from airflow.sdk.execution_time.comms import DeleteVariable, ErrorResponse if not hasattr(task_runner, "SUPERVISOR_COMMS"):