From 142954a4745a4581b1d9f991ce6e5bb2351011b2 Mon Sep 17 00:00:00 2001 From: Matthias Dellweg Date: Wed, 3 Jun 2026 15:27:10 +0200 Subject: [PATCH] Add prioritization to auth selection --- .../src/pulp_glue/common/authentication.py | 59 +++++++++++-------- pulp-glue/src/pulp_glue/common/openapi.py | 14 +++-- pulp-glue/tests/test_auth_provider.py | 6 +- src/pulp_cli/generic.py | 24 +++++--- 4 files changed, 63 insertions(+), 40 deletions(-) diff --git a/pulp-glue/src/pulp_glue/common/authentication.py b/pulp-glue/src/pulp_glue/common/authentication.py index 46c41112d..aabf12e15 100644 --- a/pulp-glue/src/pulp_glue/common/authentication.py +++ b/pulp-glue/src/pulp_glue/common/authentication.py @@ -1,4 +1,5 @@ import typing as t +import warnings from pulp_glue.common import oas @@ -11,16 +12,18 @@ class AuthProviderBase: Different authentication schemes can be implemented in subclasses. """ - def can_complete_http_basic(self) -> bool: + def can_complete_http_basic(self) -> t.Literal[False] | int: return False - def can_complete_mutualTLS(self) -> bool: + def can_complete_mutualTLS(self) -> t.Literal[False] | int: return False - def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> bool: + def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> t.Literal[False] | int: return False - def can_complete_scheme(self, security_scheme: oas.SecurityScheme, scopes: list[str]) -> bool: + def can_complete_scheme( + self, security_scheme: oas.SecurityScheme, scopes: list[str] + ) -> t.Literal[False] | int: if isinstance(security_scheme, oas.SecuritySchemeHttp): if security_scheme.scheme == "basic": return self.can_complete_http_basic() @@ -28,27 +31,31 @@ def can_complete_scheme(self, security_scheme: oas.SecurityScheme, scopes: list[ return self.can_complete_mutualTLS() elif isinstance(security_scheme, oas.SecuritySchemeOAuth2): client_credentials_flow = security_scheme.flows.client_credentials - if client_credentials_flow is not None and self.can_complete_oauth2_client_credentials( - list(client_credentials_flow.scopes.keys()) - ): - return True + if client_credentials_flow is not None: + return self.can_complete_oauth2_client_credentials( + list(client_credentials_flow.scopes.keys()) + ) return False def can_complete( self, proposal: dict[str, list[str]], security_schemes: dict[str, oas.SecurityScheme | oas.Reference], - ) -> bool: + ) -> t.Literal[False] | int: + cost: int = 0 for name, scopes in proposal.items(): security_scheme = security_schemes.get(name) - if ( - security_scheme is None - or isinstance(security_scheme, oas.Reference) - or not self.can_complete_scheme(security_scheme, scopes) - ): + if security_scheme is None: + warnings.warn("OpenAPI references security scheme it does not define.") return False - # This covers the case where `[]` allows for no auth at all. - return True + if isinstance(security_scheme, oas.Reference): + # TODO implement dereferencing in the authenticating code first. + return False + if (extra_cost := self.can_complete_scheme(security_scheme, scopes)) is False: + return False + cost += extra_cost + # This covers the case where `[]` allows for no auth at all for zero cost. + return cost async def auth_success_hook(self, **kwargs: t.Any) -> None: pass @@ -76,8 +83,8 @@ def __init__(self, username: t.AnyStr, password: t.AnyStr): self.username: bytes = username.encode("latin1") if isinstance(username, str) else username self.password: bytes = password.encode("latin1") if isinstance(password, str) else password - def can_complete_http_basic(self) -> bool: - return True + def can_complete_http_basic(self) -> t.Literal[False] | int: + return 1 async def http_basic_credentials(self) -> tuple[bytes, bytes]: return self.username, self.password @@ -120,14 +127,18 @@ def __init__( if cert is None and key is not None: raise RuntimeError("Key can only be used together with a cert.") - def can_complete_http_basic(self) -> bool: - return self.username is not None + def can_complete_http_basic(self) -> t.Literal[False] | int: + # Basic auth is comparatively costly on the server side. + return self.username is not None and 15 - def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> bool: - return self.client_id is not None + def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> t.Literal[False] | int: + # There is an extra roundtrip for aquiring the token. + # Should be cheap afterwards. + return self.client_id is not None and 10 - def can_complete_mutualTLS(self) -> bool: - return self.cert is not None + def can_complete_mutualTLS(self) -> t.Literal[False] | int: + # No extra cost, the tls setup will be done anyway. + return self.cert is not None and 0 async def http_basic_credentials(self) -> tuple[bytes, bytes]: assert self.username is not None diff --git a/pulp-glue/src/pulp_glue/common/openapi.py b/pulp-glue/src/pulp_glue/common/openapi.py index 77cad6c80..794350a01 100644 --- a/pulp-glue/src/pulp_glue/common/openapi.py +++ b/pulp-glue/src/pulp_glue/common/openapi.py @@ -532,13 +532,17 @@ def _select_proposal( and self._auth_provider is not None ): security_schemes = self._api_spec.components.security_schemes - try: - proposal = next( - p + cost_proposal_iter = ( + (c, p) + for c, p in ( + (self._auth_provider.can_complete(p, security_schemes), p) for p in request.security - if self._auth_provider.can_complete(p, security_schemes) ) - except StopIteration: + if c is not False + ) + try: + _dummy, proposal = min(cost_proposal_iter) + except ValueError: raise OpenAPIError(_("No suitable auth scheme found.")) return proposal diff --git a/pulp-glue/tests/test_auth_provider.py b/pulp-glue/tests/test_auth_provider.py index 695389f76..e45d9ec70 100644 --- a/pulp-glue/tests/test_auth_provider.py +++ b/pulp-glue/tests/test_auth_provider.py @@ -101,7 +101,7 @@ def test_username_needs_password(self) -> None: def test_can_complete_basic_auth_and_provide_credentials(self) -> None: provider = GlueAuthProvider(username="user1", password="secret1") - assert provider.can_complete_http_basic() is True + assert provider.can_complete_http_basic() == 15 assert asyncio.run(provider.http_basic_credentials()) == (b"user1", b"secret1") def test_client_id_needs_client_secret(self) -> None: @@ -110,7 +110,7 @@ def test_client_id_needs_client_secret(self) -> None: def test_can_complete_oauth2_client_credentials_and_provide_them(self) -> None: provider = GlueAuthProvider(client_id="client1", client_secret="secret1") - assert provider.can_complete_oauth2_client_credentials([]) is True + assert provider.can_complete_oauth2_client_credentials([]) == 10 assert asyncio.run(provider.oauth2_client_credentials()) == ( b"client1", b"secret1", @@ -118,5 +118,5 @@ def test_can_complete_oauth2_client_credentials_and_provide_them(self) -> None: def test_can_complete_mutualTLS_and_provide_cert(self) -> None: provider = GlueAuthProvider(cert="FAKECERTIFICATE") - assert provider.can_complete_mutualTLS() is True + assert provider.can_complete_mutualTLS() == 0 assert provider.tls_credentials() == ("FAKECERTIFICATE", None) diff --git a/src/pulp_cli/generic.py b/src/pulp_cli/generic.py index 0374af0c0..9da982bbe 100644 --- a/src/pulp_cli/generic.py +++ b/src/pulp_cli/generic.py @@ -218,14 +218,22 @@ def __init__(self, pulp_ctx: PulpCLIContext): self._password_in_secretstorage: bool | None = None self._oauth2_client_credentials: tuple[bytes, bytes] | None = None - def can_complete_http_basic(self) -> bool: - return self.pulp_ctx.username is not None - - def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> bool: - return self.pulp_ctx.oauth2_client_id is not None - - def can_complete_mutualTLS(self) -> bool: - return self.pulp_ctx.cert is not None + def can_complete_http_basic(self) -> t.Literal[False] | int: + if self._http_basic is not None: + return 15 + if self.pulp_ctx.username is not None: + return 25 if self.pulp_ctx.password is None else 15 + return False + + def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> t.Literal[False] | int: + if self._oauth2_client_credentials is not None: + return 10 + if self.pulp_ctx.oauth2_client_id is not None: + return 20 if self.pulp_ctx.oauth2_client_secret is None else 10 + return False + + def can_complete_mutualTLS(self) -> t.Literal[False] | int: + return self.pulp_ctx.cert is not None and 0 def _fetch_password(self) -> bytes: if SECRET_STORAGE: