Improve error handling for SUPERVISOR_COMMS access outside task context#61630
Improve error handling for SUPERVISOR_COMMS access outside task context#61630andreahlert wants to merge 18 commits intoapache:mainfrom
Conversation
SUPERVISOR_COMMS was declared as a bare type annotation without assignment, which does not create an actual module attribute in Python. This caused ImportError when Variable.get() was called at the top level of a DAG file (outside task execution context), such as during dag.test(). Initialize SUPERVISOR_COMMS to None so the import always succeeds, add None guards before .send() calls in _set_variable, _delete_variable, and ExecutionAPISecretsBackend methods, and provide helpful error messages suggesting alternatives like environment variables or Jinja templates. Closes: apache#51816
There was a problem hiding this comment.
This fix looks reasonable in isolation, but several provider tests appear to have relied on the previous permissive behavior when SUPERVISOR_COMMS was absent or not initialized. By normalizing it to None, execution-time paths (particularly around connection resolution) are now enforced and fail loudly, which is causing widespread provider CI failures. It seems those tests did not anticipate SUPERVISOR_COMMS being present but None and therefore entering these stricter code paths. It appears that provider tests might have to be loosened (or maybe connection resolution migh have to check for SUPERVISOR_COMMS) to accomodate this change but I am not going to make such a sweeping suggestion unilaterally.
…ruff format Use type: ignore[assignment] instead of | None for SUPERVISOR_COMMS to avoid 39 mypy union-attr errors. Replace hasattr with getattr is not None in variable.py and connection.py. Fix reinit_supervisor_comms to check is None. Simplify set_supervisor_comms context manager. Update test assertions.
Assert only on operator logger messages so CI logs (e.g. OTLP connection errors to localhost:4318) do not break the test.
Thanks for the feedback. Already handled it with is_in_task_sdk_execution_context(), which checks that SUPERVISOR_COMMS is not None. Variable and Connection use it now, so provider tests keep the legacy path. |
There was a problem hiding this comment.
@andreahlert Thank you very much for working on this!
I believe this is a meaningful improvement to Airflow's error handling. The change provides a clearer error message and suggests a possible solution for end users, which is definitely helpful.
That said, my impression is that this PR does not solve the root cause of the issue discussed in #51816. Because of this, I think the PR description should probably be updated from "Fixes" to "Related", and maybe we should make the title more descriptive, this is an error handling improvement.
The core issue is that there are valid use cases where accessing Airflow Variables during DAG parsing is necessary. Examples include:
- The use case described by @opeida (the author of #51816)
- The one described by @DartVeDroid here:
#51816 (comment) - The implementation in Cosmos, which I mentioned in #51816 (comment), where it caches the dbt project graph representation as an Airflow Variable to optimise dbt project loading:
astronomer/astronomer-cosmos#1014
@kaxil confirmed that these are valid use cases here:
#51816 (comment)
Since we allow that in the actual dag parsing models (as it goes via DAG processor -> Supervisor comms -> Variable), we should do the same for dag.test.
@ashb gave a hint on a possible solution in #51816 (comment)
So now back to the main question: can we "just" make it set up an in-process API server there to try and ask the local DB in that case? Are there any security risks of doing that?
Because of this, my understanding is that the underlying problem remains unresolved, and this PR mainly improves the user-facing error message rather than fixing the behaviour itself.
|
Thanks for the detailed review, Tatiana. I'll update this PR to "Related to" instead of "Fixes" and adjust the title accordingly. For the actual fix, I'll open a separate PR implementing the lazy-init approach via |
Per review feedback: keep SUPERVISOR_COMMS as a bare type annotation (not initialized to None) and use hasattr checks in the few places that need to detect task execution context. Signed-off-by: André Ahlert <andre@aex.partners>
Signed-off-by: André Ahlert <andre@aex.partners>
Summary
Related to #51816
SUPERVISOR_COMMSwas declared as a bare type annotation (SUPERVISOR_COMMS: CommsDecoder[...]) intask_runner.py, which does not create an actual module attribute in Python. This causedImportErrorwhenVariable.get()was called at the top level of a DAG file (outside task execution context), e.g. duringdag.test().This PR improves error handling and messaging for this scenario but does not address the root cause. A separate PR will implement the actual fix using a lazy-init approach with
InProcessExecutionAPI.Changes
SUPERVISOR_COMMStoNoneso the import always succeeds. The existing check inensure_secrets_backend_loaded()already handlesis not None, so the fallback logic is unaffected.Noneguards before.send()calls in_set_variable(),_delete_variable(), and allExecutionAPISecretsBackendmethods to preventAttributeErroronNone.Variable.get()fails outside task context, suggesting alternatives (environment variables, Jinja templates, moving the call inside a task).Variable.get/set/deleteoutside task execution context.airflow.models.VariableandConnectionwere usinghasattr(..., "SUPERVISOR_COMMS")to decide whether to use the Task SDK path. Once we setSUPERVISOR_COMMS = None, that became true everywhere and provider tests (which run outside task context) started taking the SDK path and failing. Added a small helperis_in_task_sdk_execution_context()that returns true only whenSUPERVISOR_COMMSis present and not None, same as insupervisor.py. Variable and Connection now use it, so outside task context we keep using the legacy path and provider CI should be fine again.Files changed
task_runner.pySUPERVISOR_COMMS = None(was bare annotation)context.pyexecution_api.pyreturn Noneguardstest_variables.pyairflow-core/.../task_sdk_context.pyis_in_task_sdk_execution_context()variable.pyhasattr(..., "SUPERVISOR_COMMS")(get/set/update/delete)connection.pyTest plan
test_variables.pytests pass (no regression)TestVariableOutsideTaskContexttests passVariable.get()with env var works outside task contextVariable.get()without env var raises helpful error outside task contextVariable.set()raises clear error outside task contextVariable.delete()raises clear error outside task context