Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,26 @@ def get_connection(self, conn_id: str, team_name: str | None = None) -> Connecti
# Convert ExecutionAPI response to SDK Connection
return _process_connection_result_conn(msg)
except RuntimeError as e:
# TriggerCommsDecoder.send() uses async_to_sync internally, which raises RuntimeError
# when called within an async event loop. In greenback portal contexts (triggerer),
# we catch this and use greenback to call the async version instead.
if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"):
import asyncio
import asyncio
import greenback

import greenback
msg = str(e)

if (
"You cannot use AsyncToSync in the same thread as an async event loop" in msg
or "attached to a different loop" in msg
):
task = asyncio.current_task()
if greenback.has_portal(task):
if task and greenback.has_portal(task):
import warnings

warnings.warn(
"You should not use sync calls here -- use `await aget_connection` instead",
"Sync connection access failed due to event loop mismatch. "
"Falling back to async connection retrieval.",
stacklevel=2,
)
return greenback.await_(self.aget_connection(conn_id))
# Fall through to the general exception handler for other RuntimeErrors
return None
return None
except Exception:
# If SUPERVISOR_COMMS fails for any reason, return None
# to allow fallback to other backends
Expand Down
Loading