Skip to content
Open
39 changes: 39 additions & 0 deletions helm/blueapi/config_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,34 @@
"type": "object",
"$id": "OIDCConfig"
},
"OpaConfig": {
"additionalProperties": false,
"properties": {
"root": {
"default": "http://localhost:8181/",
"format": "uri",
"maxLength": 2083,
"minLength": 1,
"title": "Root",
"type": "string"
},
"tiled_service_account_check": {
"title": "Tiled Service Account Check",
"type": "string"
},
"submit_task_check": {
"title": "Submit Task Check",
"type": "string"
}
},
"required": [
"tiled_service_account_check",
"submit_task_check"
],
"title": "OpaConfig",
"type": "object",
"$id": "OpaConfig"
},
"PlanSource": {
"additionalProperties": false,
"properties": {
Expand Down Expand Up @@ -612,6 +640,17 @@
}
],
"default": null
},
"opa": {
"anyOf": [
{
"$ref": "OpaConfig"
},
{
"type": "null"
}
],
"default": null
}
},
"title": "ApplicationConfig",
Expand Down
38 changes: 38 additions & 0 deletions helm/blueapi/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,34 @@
},
"additionalProperties": false
},
"OpaConfig": {
"$id": "OpaConfig",
"title": "OpaConfig",
"type": "object",
"required": [
"tiled_service_account_check",
"submit_task_check"
],
"properties": {
"root": {
"title": "Root",
"default": "http://localhost:8181/",
"type": "string",
"format": "uri",
"maxLength": 2083,
"minLength": 1
},
"submit_task_check": {
"title": "Submit Task Check",
"type": "string"
},
"tiled_service_account_check": {
"title": "Tiled Service Account Check",
"type": "string"
}
},
"additionalProperties": false
},
"PlanSource": {
"$id": "PlanSource",
"title": "PlanSource",
Expand Down Expand Up @@ -1011,6 +1039,16 @@
}
]
},
"opa": {
"anyOf": [
{
"$ref": "OpaConfig"
},
{
"type": "null"
}
]
},
"scratch": {
"anyOf": [
{
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencies = [
"tomlkit",
"graypy>=2.1.0",
"httpx>=0.28.1",
"aiohttp>=3.13.5",
]
dynamic = ["version"]
license.file = "LICENSE"
Expand Down
8 changes: 8 additions & 0 deletions src/blueapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,12 @@ class Tag(StrEnum):
META = "Meta"


class OpaConfig(BlueapiBaseModel):
root: HttpUrl = HttpUrl("http://localhost:8181")
tiled_service_account_check: str
submit_task_check: str


class ApplicationConfig(BlueapiBaseModel):
"""
Config for the worker application as a whole. Root of
Expand Down Expand Up @@ -335,6 +341,7 @@ class ApplicationConfig(BlueapiBaseModel):
oidc: OIDCConfig | None = None
auth_token_path: Path | None = None
numtracker: NumtrackerConfig | None = None
opa: OpaConfig | None = None

def __eq__(self, other: object) -> bool:
if isinstance(other, ApplicationConfig):
Expand All @@ -343,6 +350,7 @@ def __eq__(self, other: object) -> bool:
& (self.env == other.env)
& (self.logging == other.logging)
& (self.api == other.api)
& (self.opa == other.opa)
)
return False

Expand Down
56 changes: 55 additions & 1 deletion src/blueapi/service/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
from functools import cached_property
from http import HTTPStatus
from pathlib import Path
from typing import Any, cast
from typing import Annotated, Any, cast

import httpx
import jwt
import requests
from fastapi import Depends, HTTPException, Request
from fastapi.security.utils import get_authorization_scheme_param
from pydantic import TypeAdapter
from requests.auth import AuthBase
from starlette.status import HTTP_401_UNAUTHORIZED

from blueapi.config import OIDCConfig, ServiceAccount
from blueapi.service.model import Cache
Expand Down Expand Up @@ -272,3 +275,54 @@ def get_access_token(self):
def sync_auth_flow(self, request):
request.headers["Authorization"] = f"Bearer {self.get_access_token()}"
yield request


def unchecked_bearer_token(req: Request) -> str | None:
"""Get bearer token value from authorization header"""
auth = req.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(auth)
if scheme.casefold() != "bearer":
return None
return param.strip()


UncheckedBearerToken = Annotated[str | None, Depends(unchecked_bearer_token)]


def build_access_token_check(config: OIDCConfig):
"""
Create a function to validate the bearer token of requests

The returned function should be used via fastAPI's 'Depends' mechanism to
ensure users are authenticated
"""
jwkclient = jwt.PyJWKClient(config.jwks_uri)

def validate_bearer_token(
request: Request, token: Annotated[str | None, Depends(unchecked_bearer_token)]
):
"""Check that a bearer token is valid and inject into request state"""
if not token:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)

signing_key = jwkclient.get_signing_key_from_jwt(token)
decoded: dict[str, Any] = jwt.decode(
token,
signing_key.key,
algorithms=config.id_token_signing_alg_values_supported,
verify=True,
audience=config.client_audience,
issuer=config.issuer,
)
request.state.decoded_access_token = decoded

return validate_bearer_token


def access_token(request: Request) -> str | None:
"""Get the decoded and verified access token of the user making the request"""
return getattr(request.state, "decoded_access_token", None)
123 changes: 123 additions & 0 deletions src/blueapi/service/authorization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import logging
import re
from collections.abc import Mapping
from contextlib import AbstractAsyncContextManager, aclosing, nullcontext
from typing import Any, Self, cast

import aiohttp
from aiohttp import ClientSession
from fastapi import Depends, HTTPException, Request
from starlette import status

from blueapi.config import OIDCConfig, OpaConfig, ServiceAccount
from blueapi.service.authentication import TiledAuth, unchecked_bearer_token
from blueapi.service.model import TaskRequest

LOGGER = logging.getLogger(__name__)
INSTRUMENT_SESSION_RE = re.compile(r"^[a-z]{2}(?P<proposal>\d+)-(?P<visit>\d+)$")


class OpaClient:
client: aiohttp.ClientSession

def __init__(self, instrument: str, config: OpaConfig):
LOGGER.info("Creating OpaClient for %s with config %s", instrument, config)
self._instrument = instrument
self._conf = config
self._session = ClientSession(base_url=config.root.encoded_string())

async def aclose(self):
LOGGER.info("Closing OPA session")
await self._session.close()

async def _call_opa(self, endpoint, data: Mapping[str, Any]) -> bool:
try:
resp = await self._session.post(
endpoint,
json={
"input": {
"beamline": self._instrument,
"audience": "account",
**data,
}
},
)
return (await resp.json())["result"]
except Exception:
LOGGER.exception("Failed to run check", exc_info=True)
raise

@classmethod
def for_config(
cls, instrument: str | None, config: OpaConfig | None
) -> AbstractAsyncContextManager[Self | None]:
if config:
if not instrument:
raise ValueError("Instrument name is required for OPA client")
return aclosing(cls(instrument, config))
LOGGER.info("No OPA config provided - not creating OpaClient")
return nullcontext()

async def require_tiled_service_account(self, token: str):
if not await self._call_opa(
self._conf.tiled_service_account_check,
{"token": token, "beamline": self._instrument},
):
raise ValueError(
f"Tiled service account is not valid for '{self._instrument}'"
)

async def require_submit_task(self, instrument_session: str, token: str):
if not (match := INSTRUMENT_SESSION_RE.match(instrument_session)):
raise ValueError("Invalid instrument session")

if not await self._call_opa(
self._conf.submit_task_check,
{
"token": token,
"proposal": int(match["proposal"]),
"visit": int(match["visit"]),
},
):
raise HTTPException(status_code=status.HTTP_403_UNORTHORIZED)


class OpaUserClient:
client: OpaClient
token: str

def __init__(self, client: OpaClient, token: str):
self.client = client
self.token = token

async def can_submit_task(self, task: TaskRequest):
LOGGER.info("Checking permissions to run task")
await self.client.require_submit_task(task.instrument_session, self.token)


async def validate_tiled_config(
tiled: ServiceAccount | str | None, oidc: OIDCConfig | None, opa: OpaClient | None
):
if not isinstance(tiled, ServiceAccount):
# can't validate an API key
return

if not opa or not oidc:
LOGGER.info("Missing OPA or OIDC configuration required to validate tiled auth")
return

LOGGER.info("Validating tiled configuration")
tiled.token_url = oidc.token_endpoint
auth = TiledAuth(tiled)
await opa.require_tiled_service_account(auth.get_access_token())


async def opa(
request: Request, token: str | None = Depends(unchecked_bearer_token)
) -> OpaUserClient | None:

if opa := cast(OpaClient | None, getattr(request.app.state, "authz", None)):
if not token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return OpaUserClient(opa, token)
return None
Loading
Loading