Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 35 additions & 24 deletions pulp-glue/src/pulp_glue/common/authentication.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import typing as t
import warnings

from pulp_glue.common import oas

Expand All @@ -11,44 +12,50 @@ class AuthProviderBase:
Different authentication schemes can be implemented in subclasses.
"""

def can_complete_http_basic(self) -> bool:
def can_complete_http_basic(self) -> t.Literal[False] | int:
return False

def can_complete_mutualTLS(self) -> bool:
def can_complete_mutualTLS(self) -> t.Literal[False] | int:
return False

def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> bool:
def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> t.Literal[False] | int:
return False

def can_complete_scheme(self, security_scheme: oas.SecurityScheme, scopes: list[str]) -> bool:
def can_complete_scheme(
self, security_scheme: oas.SecurityScheme, scopes: list[str]
) -> t.Literal[False] | int:
if isinstance(security_scheme, oas.SecuritySchemeHttp):
if security_scheme.scheme == "basic":
return self.can_complete_http_basic()
elif isinstance(security_scheme, oas.SecuritySchemeMutualTLS):
return self.can_complete_mutualTLS()
elif isinstance(security_scheme, oas.SecuritySchemeOAuth2):
client_credentials_flow = security_scheme.flows.client_credentials
if client_credentials_flow is not None and self.can_complete_oauth2_client_credentials(
list(client_credentials_flow.scopes.keys())
):
return True
if client_credentials_flow is not None:
return self.can_complete_oauth2_client_credentials(
list(client_credentials_flow.scopes.keys())
)
return False

def can_complete(
self,
proposal: dict[str, list[str]],
security_schemes: dict[str, oas.SecurityScheme | oas.Reference],
) -> bool:
) -> t.Literal[False] | int:
cost: int = 0
for name, scopes in proposal.items():
security_scheme = security_schemes.get(name)
if (
security_scheme is None
or isinstance(security_scheme, oas.Reference)
or not self.can_complete_scheme(security_scheme, scopes)
):
if security_scheme is None:
warnings.warn("OpenAPI references security scheme it does not define.")
return False
# This covers the case where `[]` allows for no auth at all.
return True
if isinstance(security_scheme, oas.Reference):
# TODO implement dereferencing in the authenticating code first.
return False
if (extra_cost := self.can_complete_scheme(security_scheme, scopes)) is False:
return False
cost += extra_cost
# This covers the case where `[]` allows for no auth at all for zero cost.
return cost

async def auth_success_hook(self, **kwargs: t.Any) -> None:
pass
Expand Down Expand Up @@ -76,8 +83,8 @@ def __init__(self, username: t.AnyStr, password: t.AnyStr):
self.username: bytes = username.encode("latin1") if isinstance(username, str) else username
self.password: bytes = password.encode("latin1") if isinstance(password, str) else password

def can_complete_http_basic(self) -> bool:
return True
def can_complete_http_basic(self) -> t.Literal[False] | int:
return 1

async def http_basic_credentials(self) -> tuple[bytes, bytes]:
return self.username, self.password
Expand Down Expand Up @@ -120,14 +127,18 @@ def __init__(
if cert is None and key is not None:
raise RuntimeError("Key can only be used together with a cert.")

def can_complete_http_basic(self) -> bool:
return self.username is not None
def can_complete_http_basic(self) -> t.Literal[False] | int:
# Basic auth is comparatively costly on the server side.
return self.username is not None and 15

def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> bool:
return self.client_id is not None
def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> t.Literal[False] | int:
# There is an extra roundtrip for aquiring the token.
# Should be cheap afterwards.
return self.client_id is not None and 10

def can_complete_mutualTLS(self) -> bool:
return self.cert is not None
def can_complete_mutualTLS(self) -> t.Literal[False] | int:
# No extra cost, the tls setup will be done anyway.
return self.cert is not None and 0

async def http_basic_credentials(self) -> tuple[bytes, bytes]:
assert self.username is not None
Expand Down
14 changes: 9 additions & 5 deletions pulp-glue/src/pulp_glue/common/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,13 +532,17 @@ def _select_proposal(
and self._auth_provider is not None
):
security_schemes = self._api_spec.components.security_schemes
try:
proposal = next(
p
cost_proposal_iter = (
(c, p)
for c, p in (
(self._auth_provider.can_complete(p, security_schemes), p)
for p in request.security
if self._auth_provider.can_complete(p, security_schemes)
)
except StopIteration:
if c is not False
)
try:
_dummy, proposal = min(cost_proposal_iter)
except ValueError:
raise OpenAPIError(_("No suitable auth scheme found."))
return proposal

Expand Down
6 changes: 3 additions & 3 deletions pulp-glue/tests/test_auth_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_username_needs_password(self) -> None:

def test_can_complete_basic_auth_and_provide_credentials(self) -> None:
provider = GlueAuthProvider(username="user1", password="secret1")
assert provider.can_complete_http_basic() is True
assert provider.can_complete_http_basic() == 15
assert asyncio.run(provider.http_basic_credentials()) == (b"user1", b"secret1")

def test_client_id_needs_client_secret(self) -> None:
Expand All @@ -110,13 +110,13 @@ def test_client_id_needs_client_secret(self) -> None:

def test_can_complete_oauth2_client_credentials_and_provide_them(self) -> None:
provider = GlueAuthProvider(client_id="client1", client_secret="secret1")
assert provider.can_complete_oauth2_client_credentials([]) is True
assert provider.can_complete_oauth2_client_credentials([]) == 10
assert asyncio.run(provider.oauth2_client_credentials()) == (
b"client1",
b"secret1",
)

def test_can_complete_mutualTLS_and_provide_cert(self) -> None:
provider = GlueAuthProvider(cert="FAKECERTIFICATE")
assert provider.can_complete_mutualTLS() is True
assert provider.can_complete_mutualTLS() == 0
assert provider.tls_credentials() == ("FAKECERTIFICATE", None)
24 changes: 16 additions & 8 deletions src/pulp_cli/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,22 @@ def __init__(self, pulp_ctx: PulpCLIContext):
self._password_in_secretstorage: bool | None = None
self._oauth2_client_credentials: tuple[bytes, bytes] | None = None

def can_complete_http_basic(self) -> bool:
return self.pulp_ctx.username is not None

def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> bool:
return self.pulp_ctx.oauth2_client_id is not None

def can_complete_mutualTLS(self) -> bool:
return self.pulp_ctx.cert is not None
def can_complete_http_basic(self) -> t.Literal[False] | int:
if self._http_basic is not None:
return 15
if self.pulp_ctx.username is not None:
return 25 if self.pulp_ctx.password is None else 15
return False

def can_complete_oauth2_client_credentials(self, scopes: list[str]) -> t.Literal[False] | int:
if self._oauth2_client_credentials is not None:
return 10
if self.pulp_ctx.oauth2_client_id is not None:
return 20 if self.pulp_ctx.oauth2_client_secret is None else 10
return False

def can_complete_mutualTLS(self) -> t.Literal[False] | int:
return self.pulp_ctx.cert is not None and 0

def _fetch_password(self) -> bytes:
if SECRET_STORAGE:
Expand Down
Loading