Skip to content

Improve error handling for SUPERVISOR_COMMS access outside task context#61630

Open
andreahlert wants to merge 18 commits intoapache:mainfrom
andreahlert:fix/supervisor-comms-import-error
Open

Improve error handling for SUPERVISOR_COMMS access outside task context#61630
andreahlert wants to merge 18 commits intoapache:mainfrom
andreahlert:fix/supervisor-comms-import-error

Conversation

@andreahlert
Copy link
Contributor

@andreahlert andreahlert commented Feb 8, 2026

Summary

Related to #51816

SUPERVISOR_COMMS was declared as a bare type annotation (SUPERVISOR_COMMS: CommsDecoder[...]) in task_runner.py, which does not create an actual module attribute in Python. This caused ImportError when Variable.get() was called at the top level of a DAG file (outside task execution context), e.g. during dag.test().

This PR improves error handling and messaging for this scenario but does not address the root cause. A separate PR will implement the actual fix using a lazy-init approach with InProcessExecutionAPI.

Changes

  • Initialize SUPERVISOR_COMMS to None so the import always succeeds. The existing check in ensure_secrets_backend_loaded() already handles is not None, so the fallback logic is unaffected.
  • Add None guards before .send() calls in _set_variable(), _delete_variable(), and all ExecutionAPISecretsBackend methods to prevent AttributeError on None.
  • Improve error messages when Variable.get() fails outside task context, suggesting alternatives (environment variables, Jinja templates, moving the call inside a task).
  • Add tests for Variable.get/set/delete outside task execution context.
  • Align Variable/Connection with supervisor logic: airflow.models.Variable and Connection were using hasattr(..., "SUPERVISOR_COMMS") to decide whether to use the Task SDK path. Once we set SUPERVISOR_COMMS = None, that became true everywhere and provider tests (which run outside task context) started taking the SDK path and failing. Added a small helper is_in_task_sdk_execution_context() that returns true only when SUPERVISOR_COMMS is present and not None, same as in supervisor.py. Variable and Connection now use it, so outside task context we keep using the legacy path and provider CI should be fine again.

Files changed

File Change
task_runner.py SUPERVISOR_COMMS = None (was bare annotation)
context.py None guards + enhanced error messages
execution_api.py Defensive return None guards
test_variables.py 4 new tests for outside-task-context behavior
airflow-core/.../task_sdk_context.py New helper is_in_task_sdk_execution_context()
variable.py Use helper instead of hasattr(..., "SUPERVISOR_COMMS") (get/set/update/delete)
connection.py Use helper for get_connection_from_secrets and from_json

Test plan

  • Existing test_variables.py tests pass (no regression)
  • New TestVariableOutsideTaskContext tests pass
  • Variable.get() with env var works outside task context
  • Variable.get() without env var raises helpful error outside task context
  • Variable.set() raises clear error outside task context
  • Variable.delete() raises clear error outside task context

SUPERVISOR_COMMS was declared as a bare type annotation without
assignment, which does not create an actual module attribute in Python.
This caused ImportError when Variable.get() was called at the top level
of a DAG file (outside task execution context), such as during
dag.test().

Initialize SUPERVISOR_COMMS to None so the import always succeeds, add
None guards before .send() calls in _set_variable, _delete_variable,
and ExecutionAPISecretsBackend methods, and provide helpful error
messages suggesting alternatives like environment variables or Jinja
templates.

Closes: apache#51816
Copy link
Contributor

@SameerMesiah97 SameerMesiah97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix looks reasonable in isolation, but several provider tests appear to have relied on the previous permissive behavior when SUPERVISOR_COMMS was absent or not initialized. By normalizing it to None, execution-time paths (particularly around connection resolution) are now enforced and fail loudly, which is causing widespread provider CI failures. It seems those tests did not anticipate SUPERVISOR_COMMS being present but None and therefore entering these stricter code paths. It appears that provider tests might have to be loosened (or maybe connection resolution migh have to check for SUPERVISOR_COMMS) to accomodate this change but I am not going to make such a sweeping suggestion unilaterally.

…ruff format

Use type: ignore[assignment] instead of | None for SUPERVISOR_COMMS to avoid 39 mypy union-attr errors. Replace hasattr with getattr is not None in variable.py and connection.py. Fix reinit_supervisor_comms to check is None. Simplify set_supervisor_comms context manager. Update test assertions.
@andreahlert andreahlert requested a review from XD-DENG as a code owner March 3, 2026 19:22
@andreahlert
Copy link
Contributor Author

This fix looks reasonable in isolation, but several provider tests appear to have relied on the previous permissive behavior when SUPERVISOR_COMMS was absent or not initialized. By normalizing it to None, execution-time paths (particularly around connection resolution) are now enforced and fail loudly, which is causing widespread provider CI failures. It seems those tests did not anticipate SUPERVISOR_COMMS being present but None and therefore entering these stricter code paths. It appears that provider tests might have to be loosened (or maybe connection resolution migh have to check for SUPERVISOR_COMMS) to accomodate this change but I am not going to make such a sweeping suggestion unilaterally.

Thanks for the feedback. Already handled it with is_in_task_sdk_execution_context(), which checks that SUPERVISOR_COMMS is not None. Variable and Connection use it now, so provider tests keep the legacy path.

Copy link
Contributor

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreahlert Thank you very much for working on this!

I believe this is a meaningful improvement to Airflow's error handling. The change provides a clearer error message and suggests a possible solution for end users, which is definitely helpful.

That said, my impression is that this PR does not solve the root cause of the issue discussed in #51816. Because of this, I think the PR description should probably be updated from "Fixes" to "Related", and maybe we should make the title more descriptive, this is an error handling improvement.

The core issue is that there are valid use cases where accessing Airflow Variables during DAG parsing is necessary. Examples include:

@kaxil confirmed that these are valid use cases here:
#51816 (comment)

Since we allow that in the actual dag parsing models (as it goes via DAG processor -> Supervisor comms -> Variable), we should do the same for dag.test.

@ashb gave a hint on a possible solution in #51816 (comment)

So now back to the main question: can we "just" make it set up an in-process API server there to try and ask the local DB in that case? Are there any security risks of doing that?

Because of this, my understanding is that the underlying problem remains unresolved, and this PR mainly improves the user-facing error message rather than fixing the behaviour itself.

@eladkal eladkal added the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label Mar 12, 2026
@eladkal eladkal added this to the Airflow 3.1.9 milestone Mar 12, 2026
@andreahlert
Copy link
Contributor Author

Thanks for the detailed review, Tatiana. I'll update this PR to "Related to" instead of "Fixes" and adjust the title accordingly. For the actual fix, I'll open a separate PR implementing the lazy-init approach via __getattr__ on task_runner.py, reusing the existing InProcessExecutionAPI to spin up an in-process server at parse time when running as __main__. The building blocks are already there with InProcessTestSupervisor. The open question is scoping the security model, since the Execution API relies on task identity and there's no task context during parsing. I'll need input from @ashb and @kaxil on that before moving forward.

@andreahlert andreahlert changed the title Fix ImportError when importing SUPERVISOR_COMMS outside task context Improve error handling for SUPERVISOR_COMMS access outside task context Mar 12, 2026
Per review feedback: keep SUPERVISOR_COMMS as a bare type annotation
(not initialized to None) and use hasattr checks in the few places
that need to detect task execution context.

Signed-off-by: André Ahlert <andre@aex.partners>
Signed-off-by: André Ahlert <andre@aex.partners>
@andreahlert andreahlert requested a review from ashb March 19, 2026 14:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-sdk backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants