From 1cabf86f38e7fe6b4abe75706ae42ccbf48a2c24 Mon Sep 17 00:00:00 2001 From: ana-oprea <80201759+ana-oprea@users.noreply.github.com> Date: Tue, 24 Mar 2026 11:35:40 +0200 Subject: [PATCH] first test with cryptojwt library --- setup.py | 1 + src/pas/plugins/oidc/browser/configure.zcml | 7 + src/pas/plugins/oidc/browser/test_jwt.py | 114 +++++++++++++++ src/pas/plugins/oidc/jwt_verification.py | 151 ++++++++++++++++++++ 4 files changed, 273 insertions(+) create mode 100644 src/pas/plugins/oidc/browser/test_jwt.py create mode 100644 src/pas/plugins/oidc/jwt_verification.py diff --git a/setup.py b/setup.py index 2d5e81e..2cd6ac0 100644 --- a/setup.py +++ b/setup.py @@ -61,6 +61,7 @@ # 'oidcrp', # "oic<1; python_version < 3", "oic", + "cryptojwt" ], extras_require={ "test": [ diff --git a/src/pas/plugins/oidc/browser/configure.zcml b/src/pas/plugins/oidc/browser/configure.zcml index 5f56254..bf78a59 100644 --- a/src/pas/plugins/oidc/browser/configure.zcml +++ b/src/pas/plugins/oidc/browser/configure.zcml @@ -36,4 +36,11 @@ permission="zope2.View" /> + + diff --git a/src/pas/plugins/oidc/browser/test_jwt.py b/src/pas/plugins/oidc/browser/test_jwt.py new file mode 100644 index 0000000..7fee92a --- /dev/null +++ b/src/pas/plugins/oidc/browser/test_jwt.py @@ -0,0 +1,114 @@ +from Products.Five.browser import BrowserView +import traceback + + +class TestJWTView(BrowserView): + """Diagnostic view to test cryptojwt availability and EU Login key loading. + + Access: @@test-jwt (requires Manager role) + Safe: read-only, no login flow changes, only fetches public keys. + Remove after testing is complete. + """ + + def __call__(self): + self.request.response.setHeader("Content-Type", "text/plain") + output = [] + + # --- Test 1: Can we import cryptojwt? --- + output.append("=== Test 1: Import cryptojwt ===") + try: + from cryptojwt.jws.jws import JWS # noqa: F401 + from cryptojwt.key_bundle import KeyBundle # noqa: F401 + + output.append("PASS - cryptojwt is installed and importable") + except Exception: + output.append("FAIL") + output.append(traceback.format_exc()) + return "\n".join(output) + + output.append("") + + # --- Test 2: Can cryptojwt handle a secp256k1 (P-256K) key? --- + output.append("=== Test 2: Load a P-256K key into cryptojwt ===") + try: + # A hardcoded test key (not a real key, just valid EC points) + from cryptography.hazmat.primitives.asymmetric import ec + import base64 + + # Generate a throwaway secp256k1 key pair + private_key = ec.generate_private_key(ec.SECP256K1()) + numbers = private_key.public_key().public_numbers() + + def int_to_b64url(n, length): + return base64.urlsafe_b64encode( + n.to_bytes(length, "big") + ).rstrip(b"=").decode() + + test_jwk = { + "kty": "EC", + "crv": "P-256K", + "alg": "ES256K", + "x": int_to_b64url(numbers.x, 32), + "y": int_to_b64url(numbers.y, 32), + } + + kb = KeyBundle([test_jwk]) + keys = kb.keys() + output.append( + "PASS - loaded %d key(s) with P-256K curve" % len(keys) + ) + except Exception: + output.append("FAIL") + output.append(traceback.format_exc()) + return "\n".join(output) + + output.append("") + + # --- Test 3: Can cryptojwt load EU Login's real JWKS? --- + output.append("=== Test 3: Fetch and load EU Login JWKS ===") + try: + from Products.CMFCore.utils import getToolByName + from pas.plugins.oidc.jwt_verification import ( + fetch_jwks, + normalize_jwks_curves, + ) + + pas = getToolByName(self.context, "acl_users") + + # Find the OIDC plugin + plugin = None + for p_id, p in pas.objectItems(): + if hasattr(p, "get_oauth2_client"): + plugin = p + break + + if not plugin: + output.append("SKIP - no OIDC plugin found in acl_users") + return "\n".join(output) + + client = plugin.get_oauth2_client() + jwks_uri = client.provider_info["jwks_uri"] + output.append("JWKS URI: %s" % jwks_uri) + + # Fetch + raw_jwks = fetch_jwks(jwks_uri) + output.append("Keys found: %d" % len(raw_jwks.get("keys", []))) + for k in raw_jwks.get("keys", []): + output.append( + " - kid=%s crv=%s kty=%s" + % (k.get("kid", "?"), k.get("crv", "?"), k.get("kty", "?")) + ) + + # Normalize and load + normalized = normalize_jwks_curves(raw_jwks) + kb2 = KeyBundle(normalized["keys"]) + loaded = kb2.keys() + output.append("") + output.append( + "PASS - cryptojwt loaded %d key(s) from EU Login" % len(loaded) + ) + except Exception: + output.append("FAIL") + output.append(traceback.format_exc()) + + return "\n".join(output) diff --git a/src/pas/plugins/oidc/jwt_verification.py b/src/pas/plugins/oidc/jwt_verification.py new file mode 100644 index 0000000..0806aa4 --- /dev/null +++ b/src/pas/plugins/oidc/jwt_verification.py @@ -0,0 +1,151 @@ +""" +JWT verification using cryptojwt with proper secp256k1 (P-256K) support. + +This module provides an alternative to pyjwkest for verifying EU Login +ID tokens signed with the secp256k1 elliptic curve. +""" + +import copy +import json +import logging +import time + +import requests +from cryptojwt.jws.jws import JWS +from cryptojwt.key_bundle import KeyBundle + +logger = logging.getLogger(__name__) + +# EU Login JWKS uses "secp256k1" but cryptojwt expects "P-256K" +CURVE_ALIASES = { + "secp256k1": "P-256K", +} + +# Algorithm to set on keys when normalizing secp256k1 curves +CURVE_ALG = { + "secp256k1": "ES256K", +} + +# Simple JWKS cache: {uri: (jwks_dict, timestamp)} +_jwks_cache = {} +_CACHE_TTL = 300 # 5 minutes + + +def normalize_jwks_curves(jwks_dict): + """Rewrite curve names in a JWKS dict so cryptojwt can parse them. + + EU Login publishes JWKS with "crv": "secp256k1", but cryptojwt + expects "P-256K". Also sets the correct algorithm on the key. + """ + normalized = copy.deepcopy(jwks_dict) + for key in normalized.get("keys", []): + crv = key.get("crv", "") + if crv in CURVE_ALIASES: + key["crv"] = CURVE_ALIASES[crv] + key["alg"] = CURVE_ALG[crv] + return normalized + + +def fetch_jwks(jwks_uri): + """Fetch JWKS from the provider, with simple TTL caching.""" + now = time.time() + cached = _jwks_cache.get(jwks_uri) + if cached and (now - cached[1]) < _CACHE_TTL: + return cached[0] + + resp = requests.get(jwks_uri, timeout=30) + resp.raise_for_status() + data = resp.json() + _jwks_cache[jwks_uri] = (data, now) + return data + + +def verify_id_token(id_token_jwt, jwks_uri, issuer=None, client_id=None): + """Verify an ID token JWT using cryptojwt with secp256k1 support. + + Args: + id_token_jwt: Raw JWT string (header.payload.signature) + jwks_uri: URL of the OIDC provider's JWKS endpoint + issuer: Expected issuer claim (optional validation) + client_id: Expected audience claim (optional validation) + + Returns: + dict: Verified JWT claims + + Raises: + Exception: If verification fails + """ + # Fetch and normalize JWKS + jwks = fetch_jwks(jwks_uri) + jwks_normalized = normalize_jwks_curves(jwks) + + # Load keys + kb = KeyBundle(jwks_normalized["keys"]) + keys = kb.keys() + + if not keys: + raise ValueError("No keys loaded from JWKS") + + # Verify JWT signature + verifier = JWS() + verifier["alg"] = "ES256K" + claims = verifier.verify_compact(id_token_jwt, keys, sigalg="ES256K") + + # Validate claims + if issuer and claims.get("iss") != issuer: + raise ValueError( + "Invalid issuer: expected %s, got %s" + % (issuer, claims.get("iss")) + ) + + if client_id and claims.get("aud") != client_id: + aud = claims.get("aud") + # aud can be a string or a list + if isinstance(aud, list) and client_id not in aud: + raise ValueError( + "Invalid audience: %s not in %s" % (client_id, aud) + ) + elif isinstance(aud, str) and aud != client_id: + raise ValueError( + "Invalid audience: expected %s, got %s" % (client_id, aud) + ) + + return claims + + +def do_token_exchange( + token_endpoint, code, redirect_uris, client_id, client_secret, + code_verifier=None, +): + """Exchange an authorization code for tokens via HTTP POST. + + Args: + token_endpoint: URL of the token endpoint + code: Authorization code from the callback + redirect_uris: List of redirect URIs (first one is used) + client_id: OAuth2 client ID + client_secret: OAuth2 client secret + code_verifier: PKCE code verifier (optional) + + Returns: + dict: Raw token response containing access_token, id_token, etc. + """ + redirect_uri = redirect_uris[0] if isinstance(redirect_uris, list) else redirect_uris + + data = { + "grant_type": "authorization_code", + "code": code, + "redirect_uri": redirect_uri, + } + + if code_verifier: + data["code_verifier"] = code_verifier + + resp = requests.post( + token_endpoint, + data=data, + auth=(client_id, client_secret), + timeout=30, + ) + resp.raise_for_status() + return resp.json() \ No newline at end of file