Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
0804ece
Fix ImportError when importing SUPERVISOR_COMMS outside task context
andreahlert Feb 8, 2026
a9397cd
Merge branch 'main' into fix/supervisor-comms-import-error
andreahlert Feb 11, 2026
ba820cd
Merge branch 'main' into fix/supervisor-comms-import-error
andreahlert Feb 16, 2026
5cfd010
Merge branch 'main' into fix/supervisor-comms-import-error
andreahlert Mar 2, 2026
ae59778
Merge branch 'main' into fix/supervisor-comms-import-error
andreahlert Mar 3, 2026
00160df
Fix CI failures: MyPy, hasattr routing, reinit_supervisor_comms, and …
andreahlert Mar 3, 2026
47bbf71
Merge branch 'main' into fix/supervisor-comms-import-error
andreahlert Mar 3, 2026
7ba8770
Merge branch 'main' into fix/supervisor-comms-import-error
andreahlert Mar 4, 2026
97a938c
Merge branch 'main' into fix/supervisor-comms-import-error
andreahlert Mar 4, 2026
e1ff664
Fix test_logging_propogated_by_default flakiness from OTLP trace logs
andreahlert Mar 5, 2026
c8ef55c
Merge branch 'main' into fix/supervisor-comms-import-error
andreahlert Mar 5, 2026
56115ee
Apply ruff format to test_serialized_objects.py
andreahlert Mar 6, 2026
cc136f1
Merge branch 'main' into fix/supervisor-comms-import-error
andreahlert Mar 6, 2026
8085f1e
Merge upstream/main into fix/supervisor-comms-import-error
andreahlert Mar 11, 2026
ab58fd3
Merge remote-tracking branch 'upstream/main' into fix/supervisor-comm…
andreahlert Mar 11, 2026
5cc69ba
Apply ruff format to task_runner.py
andreahlert Mar 11, 2026
a273646
Revert SUPERVISOR_COMMS=None, use hasattr checks instead
andreahlert Mar 12, 2026
70fdb01
Fix import ordering in context.py
andreahlert Mar 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -910,8 +910,11 @@ def test_logging_propogated_by_default(self, caplog):
BaseOperator(task_id="test").log.warning("test")
# This looks like "how could it fail" but this actually checks that the handler called `emit`. Testing
# the other case (that when we have set_context it goes to the file is harder to achieve without
# leaking a lot of state)
assert caplog.messages == ["test"]
# leaking a lot of state). Only assert on the operator's logger so other loggers (e.g. OTLP trace
# export errors in CI) do not affect the test.
operator_logger_prefix = "airflow.task.operators"
operator_messages = [r.message for r in caplog.records if r.name.startswith(operator_logger_prefix)]
assert operator_messages == ["test"]

def test_resume_execution(self):
from airflow.models.trigger import TriggerFailureReason
Expand Down
59 changes: 52 additions & 7 deletions task-sdk/src/airflow/sdk/execution_time/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,26 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
)

# If no backend found the variable, raise a not found error (mirrors _get_connection)
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.comms import ErrorResponse

if not hasattr(task_runner, "SUPERVISOR_COMMS"):
raise AirflowRuntimeError(
ErrorResponse(
error=ErrorType.VARIABLE_NOT_FOUND,
detail={
"message": (
f"Variable '{key}' not found. Note: SUPERVISOR_COMMS is not available, "
"which means this code is running outside a task execution context "
"(e.g., at the top level of a DAG file). "
"Consider using environment variables (AIRFLOW_VAR_<key>), "
"Jinja templates ({{ var.value.<key> }}), "
"or move the Variable.get() call inside a task function."
)
},
)
)

raise AirflowRuntimeError(
ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND, detail={"message": f"Variable {key} not found"})
)
Expand All @@ -282,11 +299,11 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ
# keep Task SDK as a separate package than execution time mods.
import json

from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.cache import SecretCache
from airflow.sdk.execution_time.comms import PutVariable
from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable
from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend
from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS

# check for write conflicts on the worker
for secrets_backend in ensure_secrets_backend_loaded():
Expand Down Expand Up @@ -317,7 +334,21 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ
except Exception as e:
log.exception(e)

SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description))
if not hasattr(task_runner, "SUPERVISOR_COMMS"):
raise AirflowRuntimeError(
ErrorResponse(
error=ErrorType.GENERIC_ERROR,
detail={
"message": (
"Variable.set() requires a task execution context (SUPERVISOR_COMMS is not available). "
"This typically happens when calling Variable.set() at the top level of a DAG file "
"or outside of a running task. Variable.set() can only be used inside a task."
)
},
)
)

task_runner.SUPERVISOR_COMMS.send(PutVariable(key=key, value=value, description=description))

# Invalidate cache after setting the variable
SecretCache.invalidate_variable(key)
Expand All @@ -329,11 +360,25 @@ def _delete_variable(key: str) -> None:
# A reason to not move it to `airflow.sdk.execution_time.comms` is that it
# will make that module depend on Task SDK, which is not ideal because we intend to
# keep Task SDK as a separate package than execution time mods.
from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.cache import SecretCache
from airflow.sdk.execution_time.comms import DeleteVariable
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
from airflow.sdk.execution_time.comms import DeleteVariable, ErrorResponse

if not hasattr(task_runner, "SUPERVISOR_COMMS"):
raise AirflowRuntimeError(
ErrorResponse(
error=ErrorType.GENERIC_ERROR,
detail={
"message": (
"Variable.delete() requires a task execution context (SUPERVISOR_COMMS is not available). "
"This typically happens when calling Variable.delete() at the top level of a DAG file "
"or outside of a running task. Variable.delete() can only be used inside a task."
)
},
)
)

msg = SUPERVISOR_COMMS.send(DeleteVariable(key=key))
msg = task_runner.SUPERVISOR_COMMS.send(DeleteVariable(key=key))
if TYPE_CHECKING:
assert isinstance(msg, OKResponse)

Expand Down
2 changes: 1 addition & 1 deletion task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1948,7 +1948,7 @@ def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]:
try:
from airflow.sdk.execution_time import task_runner

if hasattr(task_runner, "SUPERVISOR_COMMS") and task_runner.SUPERVISOR_COMMS is not None:
if hasattr(task_runner, "SUPERVISOR_COMMS"):
# Client context: task runner with SUPERVISOR_COMMS
return ensure_secrets_loaded(default_backends=DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
except (ImportError, AttributeError):
Expand Down
48 changes: 48 additions & 0 deletions task-sdk/tests/task_sdk/definitions/test_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from airflow.sdk import Variable
from airflow.sdk.configuration import initialize_secrets_backends
from airflow.sdk.exceptions import AirflowRuntimeError
from airflow.sdk.execution_time.comms import PutVariable, VariableResult
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS

Expand Down Expand Up @@ -186,3 +187,50 @@ def test_backend_fallback_to_env_var(self, mock_get_variable, mock_env_get, mock
# mock_env is only called when LocalFilesystemBackend doesn't have it
mock_env_get.assert_called()
assert var == "fake_value"


class TestVariableOutsideTaskContext:
"""Tests for Variable operations when SUPERVISOR_COMMS is not set (outside task execution context)."""

@mock.patch("airflow.secrets.environment_variables.EnvironmentVariablesBackend.get_variable")
def test_get_with_env_var_works_without_supervisor_comms(self, mock_env_get, monkeypatch):
"""Variable.get() should still work via EnvironmentVariablesBackend when SUPERVISOR_COMMS is not set."""
from airflow.sdk.execution_time import task_runner

monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False)
mock_env_get.return_value = "env_value"

result = Variable.get(key="my_env_var")
assert result == "env_value"
mock_env_get.assert_called_once_with(key="my_env_var")

def test_get_not_found_without_supervisor_comms(self, monkeypatch):
"""Variable.get() should raise with a helpful message when variable not found and SUPERVISOR_COMMS is not set."""
from airflow.sdk.execution_time import task_runner

monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False)

with pytest.raises(AirflowRuntimeError, match="outside a task execution context"):
Variable.get(key="nonexistent_var")

def test_set_without_supervisor_comms(self, monkeypatch):
"""Variable.set() should raise AirflowRuntimeError when SUPERVISOR_COMMS is not set."""
from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.context import _set_variable

monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False)

with pytest.raises(AirflowRuntimeError, match="Variable.set\\(\\) requires a task execution context"):
_set_variable(key="my_key", value="my_value")

def test_delete_without_supervisor_comms(self, monkeypatch):
"""Variable.delete() should raise AirflowRuntimeError when SUPERVISOR_COMMS is not set."""
from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.context import _delete_variable

monkeypatch.delattr(task_runner, "SUPERVISOR_COMMS", raising=False)

with pytest.raises(
AirflowRuntimeError, match="Variable.delete\\(\\) requires a task execution context"
):
_delete_variable(key="my_key")
Original file line number Diff line number Diff line change
Expand Up @@ -2703,7 +2703,6 @@ def test_set_supervisor_comms_sets_temporarily_when_not_set(self):
def test_set_supervisor_comms_unsets_temporarily_when_not_set(self):
assert not hasattr(task_runner, "SUPERVISOR_COMMS")

# This will delete an attribute that isn't set, and restore it likewise
with set_supervisor_comms(None):
assert not hasattr(task_runner, "SUPERVISOR_COMMS")

Expand Down
Loading