Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
# 'oidcrp',
# "oic<1; python_version < 3",
"oic",
"cryptojwt"
],
extras_require={
"test": [
Expand Down
7 changes: 7 additions & 0 deletions src/pas/plugins/oidc/browser/configure.zcml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,11 @@
permission="zope2.View"
/>

<browser:page
name="test-jwt"
for="*"
class=".test_jwt.TestJWTView"
permission="cmf.ManagePortal"
/>

</configure>
114 changes: 114 additions & 0 deletions src/pas/plugins/oidc/browser/test_jwt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from Products.Five.browser import BrowserView
import traceback


class TestJWTView(BrowserView):
"""Diagnostic view to test cryptojwt availability and EU Login key loading.

Access: @@test-jwt (requires Manager role)
Safe: read-only, no login flow changes, only fetches public keys.
Remove after testing is complete.
"""

def __call__(self):
self.request.response.setHeader("Content-Type", "text/plain")
output = []

# --- Test 1: Can we import cryptojwt? ---
output.append("=== Test 1: Import cryptojwt ===")
try:
from cryptojwt.jws.jws import JWS # noqa: F401
from cryptojwt.key_bundle import KeyBundle # noqa: F401

output.append("PASS - cryptojwt is installed and importable")
except Exception:
output.append("FAIL")
output.append(traceback.format_exc())
return "\n".join(output)

output.append("")

# --- Test 2: Can cryptojwt handle a secp256k1 (P-256K) key? ---
output.append("=== Test 2: Load a P-256K key into cryptojwt ===")
try:
# A hardcoded test key (not a real key, just valid EC points)
from cryptography.hazmat.primitives.asymmetric import ec
import base64

# Generate a throwaway secp256k1 key pair
private_key = ec.generate_private_key(ec.SECP256K1())
numbers = private_key.public_key().public_numbers()

def int_to_b64url(n, length):
return base64.urlsafe_b64encode(
n.to_bytes(length, "big")
).rstrip(b"=").decode()

test_jwk = {
"kty": "EC",
"crv": "P-256K",
"alg": "ES256K",
"x": int_to_b64url(numbers.x, 32),
"y": int_to_b64url(numbers.y, 32),
}

kb = KeyBundle([test_jwk])
keys = kb.keys()
output.append(
"PASS - loaded %d key(s) with P-256K curve" % len(keys)
)
except Exception:
output.append("FAIL")
output.append(traceback.format_exc())
return "\n".join(output)

output.append("")

# --- Test 3: Can cryptojwt load EU Login's real JWKS? ---
output.append("=== Test 3: Fetch and load EU Login JWKS ===")
try:
from Products.CMFCore.utils import getToolByName
from pas.plugins.oidc.jwt_verification import (
fetch_jwks,
normalize_jwks_curves,
)

pas = getToolByName(self.context, "acl_users")

# Find the OIDC plugin
plugin = None
for p_id, p in pas.objectItems():
if hasattr(p, "get_oauth2_client"):
plugin = p
break

if not plugin:
output.append("SKIP - no OIDC plugin found in acl_users")
return "\n".join(output)

client = plugin.get_oauth2_client()
jwks_uri = client.provider_info["jwks_uri"]
output.append("JWKS URI: %s" % jwks_uri)

# Fetch
raw_jwks = fetch_jwks(jwks_uri)
output.append("Keys found: %d" % len(raw_jwks.get("keys", [])))
for k in raw_jwks.get("keys", []):
output.append(
" - kid=%s crv=%s kty=%s"
% (k.get("kid", "?"), k.get("crv", "?"), k.get("kty", "?"))
)

# Normalize and load
normalized = normalize_jwks_curves(raw_jwks)
kb2 = KeyBundle(normalized["keys"])
loaded = kb2.keys()
output.append("")
output.append(
"PASS - cryptojwt loaded %d key(s) from EU Login" % len(loaded)
)
except Exception:
output.append("FAIL")
output.append(traceback.format_exc())

return "\n".join(output)
151 changes: 151 additions & 0 deletions src/pas/plugins/oidc/jwt_verification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""
JWT verification using cryptojwt with proper secp256k1 (P-256K) support.

This module provides an alternative to pyjwkest for verifying EU Login
ID tokens signed with the secp256k1 elliptic curve.
"""

import copy
import json
import logging
import time

import requests
from cryptojwt.jws.jws import JWS
from cryptojwt.key_bundle import KeyBundle

logger = logging.getLogger(__name__)

# EU Login JWKS uses "secp256k1" but cryptojwt expects "P-256K"
CURVE_ALIASES = {
"secp256k1": "P-256K",
}

# Algorithm to set on keys when normalizing secp256k1 curves
CURVE_ALG = {
"secp256k1": "ES256K",
}

# Simple JWKS cache: {uri: (jwks_dict, timestamp)}
_jwks_cache = {}
_CACHE_TTL = 300 # 5 minutes


def normalize_jwks_curves(jwks_dict):
"""Rewrite curve names in a JWKS dict so cryptojwt can parse them.

EU Login publishes JWKS with "crv": "secp256k1", but cryptojwt
expects "P-256K". Also sets the correct algorithm on the key.
"""
normalized = copy.deepcopy(jwks_dict)
for key in normalized.get("keys", []):
crv = key.get("crv", "")
if crv in CURVE_ALIASES:
key["crv"] = CURVE_ALIASES[crv]
key["alg"] = CURVE_ALG[crv]
return normalized


def fetch_jwks(jwks_uri):
"""Fetch JWKS from the provider, with simple TTL caching."""
now = time.time()
cached = _jwks_cache.get(jwks_uri)
if cached and (now - cached[1]) < _CACHE_TTL:
return cached[0]

resp = requests.get(jwks_uri, timeout=30)
resp.raise_for_status()
data = resp.json()
_jwks_cache[jwks_uri] = (data, now)
return data


def verify_id_token(id_token_jwt, jwks_uri, issuer=None, client_id=None):
"""Verify an ID token JWT using cryptojwt with secp256k1 support.

Args:
id_token_jwt: Raw JWT string (header.payload.signature)
jwks_uri: URL of the OIDC provider's JWKS endpoint
issuer: Expected issuer claim (optional validation)
client_id: Expected audience claim (optional validation)

Returns:
dict: Verified JWT claims

Raises:
Exception: If verification fails
"""
# Fetch and normalize JWKS
jwks = fetch_jwks(jwks_uri)
jwks_normalized = normalize_jwks_curves(jwks)

# Load keys
kb = KeyBundle(jwks_normalized["keys"])
keys = kb.keys()

if not keys:
raise ValueError("No keys loaded from JWKS")

# Verify JWT signature
verifier = JWS()
verifier["alg"] = "ES256K"
claims = verifier.verify_compact(id_token_jwt, keys, sigalg="ES256K")

# Validate claims
if issuer and claims.get("iss") != issuer:
raise ValueError(
"Invalid issuer: expected %s, got %s"
% (issuer, claims.get("iss"))
)

if client_id and claims.get("aud") != client_id:
aud = claims.get("aud")
# aud can be a string or a list
if isinstance(aud, list) and client_id not in aud:
raise ValueError(
"Invalid audience: %s not in %s" % (client_id, aud)
)
elif isinstance(aud, str) and aud != client_id:
raise ValueError(
"Invalid audience: expected %s, got %s" % (client_id, aud)
)

return claims


def do_token_exchange(
token_endpoint, code, redirect_uris, client_id, client_secret,
code_verifier=None,
):
"""Exchange an authorization code for tokens via HTTP POST.

Args:
token_endpoint: URL of the token endpoint
code: Authorization code from the callback
redirect_uris: List of redirect URIs (first one is used)
client_id: OAuth2 client ID
client_secret: OAuth2 client secret
code_verifier: PKCE code verifier (optional)

Returns:
dict: Raw token response containing access_token, id_token, etc.
"""
redirect_uri = redirect_uris[0] if isinstance(redirect_uris, list) else redirect_uris

data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
}

if code_verifier:
data["code_verifier"] = code_verifier

resp = requests.post(
token_endpoint,
data=data,
auth=(client_id, client_secret),
timeout=30,
)
resp.raise_for_status()
return resp.json()
Loading