-
Notifications
You must be signed in to change notification settings - Fork 17.3k
Fix in-process Execution API secrets routing in client contexts #65587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -42,9 +42,12 @@ | |||||||||||||||||||||
| get_sig_validation_args, | ||||||||||||||||||||||
| get_signing_args, | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| from airflow.process_context import override_process_context | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if TYPE_CHECKING: | ||||||||||||||||||||||
| import httpx | ||||||||||||||||||||||
| from a2wsgi.asgi_typing import ASGIApp as A2WSGIApp | ||||||||||||||||||||||
| from starlette.types import ASGIApp, Receive, Scope, Send | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| import structlog | ||||||||||||||||||||||
| from structlog.contextvars import bind_contextvars | ||||||||||||||||||||||
|
|
@@ -348,6 +351,17 @@ def get_extra_schemas() -> dict[str, dict]: | |||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| class _RequestScopedServerContextApp: | ||||||||||||||||||||||
| """Wrap an ASGI app so in-process requests behave like server-side API handling.""" | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| def __init__(self, app: FastAPI) -> None: | ||||||||||||||||||||||
| self.app = app | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: | ||||||||||||||||||||||
| with override_process_context("server"): | ||||||||||||||||||||||
| await self.app(scope, receive, send) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| @attrs.define() | ||||||||||||||||||||||
| class InProcessExecutionAPI: | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
|
|
@@ -357,11 +371,12 @@ class InProcessExecutionAPI: | |||||||||||||||||||||
| needed so that we can use the sync httpx client | ||||||||||||||||||||||
| """ | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| request_scoped_server_context: bool = attrs.field(default=False, kw_only=True) | ||||||||||||||||||||||
| _app: FastAPI | None = None | ||||||||||||||||||||||
| _cm: AsyncExitStack | None = None | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| @cached_property | ||||||||||||||||||||||
| def app(self): | ||||||||||||||||||||||
| def app(self) -> FastAPI: | ||||||||||||||||||||||
| if not self._app: | ||||||||||||||||||||||
| from airflow.api_fastapi.common.dagbag import create_dag_bag | ||||||||||||||||||||||
| from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, TIToken | ||||||||||||||||||||||
|
|
@@ -391,14 +406,20 @@ async def always_allow(request: Request): | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| return self._app | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| @cached_property | ||||||||||||||||||||||
| def asgi_app(self) -> ASGIApp: | ||||||||||||||||||||||
| if self.request_scoped_server_context: | ||||||||||||||||||||||
| return _RequestScopedServerContextApp(self.app) | ||||||||||||||||||||||
| return self.app | ||||||||||||||||||||||
|
Comment on lines
407
to
+413
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems no need to introduce another
Suggested change
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| @cached_property | ||||||||||||||||||||||
| def transport(self) -> httpx.WSGITransport: | ||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| import httpx | ||||||||||||||||||||||
| from a2wsgi import ASGIMiddleware | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| middleware = ASGIMiddleware(self.app) | ||||||||||||||||||||||
| middleware = ASGIMiddleware(cast("A2WSGIApp", self.asgi_app)) | ||||||||||||||||||||||
|
Comment on lines
-401
to
+422
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then we can revert this line change. |
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| # https://github.com/abersheeran/a2wsgi/discussions/64 | ||||||||||||||||||||||
| async def start_lifespan(cm: AsyncExitStack, app: FastAPI): | ||||||||||||||||||||||
|
|
@@ -413,4 +434,4 @@ async def start_lifespan(cm: AsyncExitStack, app: FastAPI): | |||||||||||||||||||||
| def atransport(self) -> httpx.ASGITransport: | ||||||||||||||||||||||
| import httpx | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| return httpx.ASGITransport(app=self.app) | ||||||||||||||||||||||
| return httpx.ASGITransport(app=self.asgi_app) | ||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had similar consolidation in https://github.com/apache/airflow/pull/59876/changes#diff-7694d13e2f87c84d20b0b8b44797bf96d754ae270204217e518082decc74649bR102 (PR was closed because was not accepted by other maintainers and went stale) - I'd favor "hiding" this small utility in another module where fitting rather than adding a new module just for the context detection.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm on the other side to keep this as-is so the lifecycle of |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| from __future__ import annotations | ||
|
|
||
| import os | ||
| import sys | ||
| from collections.abc import Generator | ||
| from contextlib import contextmanager | ||
| from contextvars import ContextVar | ||
| from typing import Literal | ||
|
|
||
| __all__ = [ | ||
| "get_process_context", | ||
| "override_process_context", | ||
| "should_use_task_sdk_api_path", | ||
| ] | ||
|
|
||
| _PROCESS_CONTEXT_OVERRIDE: ContextVar[str | None] = ContextVar( | ||
| "_AIRFLOW_PROCESS_CONTEXT_OVERRIDE", | ||
| default=None, | ||
| ) | ||
|
henry3260 marked this conversation as resolved.
|
||
|
|
||
|
jason810496 marked this conversation as resolved.
|
||
|
|
||
| def get_process_context() -> str | None: | ||
| """Return the current process context, preferring request-scoped overrides.""" | ||
| return _PROCESS_CONTEXT_OVERRIDE.get() or os.environ.get("_AIRFLOW_PROCESS_CONTEXT") | ||
|
|
||
|
|
||
| @contextmanager | ||
| def override_process_context(context: Literal["server", "client"]) -> Generator[None, None, None]: | ||
| """Temporarily override the current process context for the active execution flow.""" | ||
| token = _PROCESS_CONTEXT_OVERRIDE.set(context) | ||
| try: | ||
| yield | ||
| finally: | ||
| _PROCESS_CONTEXT_OVERRIDE.reset(token) | ||
|
|
||
|
|
||
| def should_use_task_sdk_api_path() -> bool: | ||
| """Return True when execution-context helpers should route through Task SDK APIs.""" | ||
| if get_process_context() == "server": | ||
| return False | ||
|
|
||
| task_runner_module = sys.modules.get("airflow.sdk.execution_time.task_runner") | ||
| return bool(getattr(task_runner_module, "SUPERVISOR_COMMS", None)) | ||
Uh oh!
There was an error while loading. Please reload this page.