diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py index a44b23d06dc6d..87a8799c55c5e 100644 --- a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py +++ b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py @@ -66,25 +66,26 @@ def get_connection(self, conn_id: str, team_name: str | None = None) -> Connecti # Convert ExecutionAPI response to SDK Connection return _process_connection_result_conn(msg) except RuntimeError as e: - # TriggerCommsDecoder.send() uses async_to_sync internally, which raises RuntimeError - # when called within an async event loop. In greenback portal contexts (triggerer), - # we catch this and use greenback to call the async version instead. - if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"): - import asyncio + import asyncio + import greenback - import greenback + msg = str(e) + if ( + "You cannot use AsyncToSync in the same thread as an async event loop" in msg + or "attached to a different loop" in msg + ): task = asyncio.current_task() - if greenback.has_portal(task): + if task and greenback.has_portal(task): import warnings warnings.warn( - "You should not use sync calls here -- use `await aget_connection` instead", + "Sync connection access failed due to event loop mismatch. " + "Falling back to async connection retrieval.", stacklevel=2, ) return greenback.await_(self.aget_connection(conn_id)) - # Fall through to the general exception handler for other RuntimeErrors - return None + return None except Exception: # If SUPERVISOR_COMMS fails for any reason, return None # to allow fallback to other backends