From c1e07f3bd5fc22d5c6c834b1a3e3864bb51323be Mon Sep 17 00:00:00 2001 From: Harald Roessler Date: Wed, 1 Apr 2026 15:39:28 +0700 Subject: [PATCH] feat: post-quantum dual-signature (Ed25519 + Dilithium3) Add quantum-safe signing to all Verifiable Credentials using ML-DSA-65 (Dilithium3) from liboqs (Open Quantum Safe, MIT licensed). Architecture: - app/crypto/dilithium.py: Dilithium3 key management, sign, verify with KMS-encrypted key support and plaintext fallback for dev - app/crypto/hybrid.py: dual_sign() produces both Ed25519 and Dilithium proofs; verify_proof() validates single or dual proofs - Graceful degradation: if Dilithium keys are not configured, falls back to Ed25519-only (no breaking change on deploy) Changes: - credentials.py: issue_credential() now uses dual_sign(), verify uses verify_proof() supporting legacy single-proof and new dual-proof - endorsement.py: endorsement VCs use dual_sign() - main.py: DID document dynamically includes Dilithium verification key when configured, with legacy key-1 alias for backward compatibility - scripts/generate_dilithium_keys.py: utility to generate keypair Migration path (from earlier discussion): Phase 1 (this PR): dual-signature, Ed25519 + Dilithium Phase 2 (future): Dilithium-first, Ed25519 deprecated Phase 3 (future): Ed25519 sunset Dependency: liboqs-python (pip install liboqs-python) Co-Authored-By: Claude Opus 4.6 (1M context) --- app/credentials.py | 47 ++++++---- app/crypto/dilithium.py | 129 +++++++++++++++++++++++++++ app/crypto/hybrid.py | 134 +++++++++++++++++++++++++++++ app/main.py | 96 ++++++++++++++------- app/swarm/endorsement.py | 14 +-- scripts/generate_dilithium_keys.py | 35 ++++++++ 6 files changed, 395 insertions(+), 60 deletions(-) create mode 100644 app/crypto/dilithium.py create mode 100644 app/crypto/hybrid.py create mode 100644 scripts/generate_dilithium_keys.py diff --git a/app/credentials.py b/app/credentials.py index b59818e..a03bb7a 100644 --- a/app/credentials.py +++ b/app/credentials.py @@ -1,7 +1,14 @@ -"""MolTrust Verifiable Credentials - W3C VC Data Model""" +"""MolTrust Verifiable Credentials - W3C VC Data Model + +Supports dual signatures (Ed25519 + Dilithium3) for post-quantum safety. +If Dilithium keys are not configured, falls back to Ed25519-only signing. +Verification handles legacy (single Ed25519), new (dual), and future +(Dilithium-only) credentials transparently. +""" import os, json, datetime, hashlib from nacl.signing import SigningKey from app.crypto.kms_signer import get_decrypted_signing_key_hex +from app.crypto.hybrid import dual_sign, verify_proof ISSUER_DID = "did:web:api.moltrust.ch" @@ -27,40 +34,42 @@ def issue_credential(subject_did: str, credential_type: str, claims: dict) -> di } signing_key = get_signing_key() - payload = json.dumps(credential, sort_keys=True).encode() - signed = signing_key.sign(payload) - - credential["proof"] = { - "type": "Ed25519Signature2020", - "created": now.isoformat() + "Z", - "verificationMethod": f"{ISSUER_DID}#key-1", - "proofPurpose": "assertionMethod", - "proofValue": signed.signature.hex() - } + credential = dual_sign(credential, signing_key) return credential def verify_credential(credential: dict) -> dict: proof = credential.get("proof") if not proof: return {"valid": False, "error": "No proof found"} - if proof.get("verificationMethod") != f"{ISSUER_DID}#key-1": - return {"valid": False, "error": "Unknown verification method"} - try: - cred_copy = {k: v for k, v in credential.items() if k != "proof"} - payload = json.dumps(cred_copy, sort_keys=True).encode() - signature = bytes.fromhex(proof["proofValue"]) + # Check verification method(s) belong to our issuer + proofs = proof if isinstance(proof, list) else [proof] + for p in proofs: + vm = p.get("verificationMethod", "") + if not vm.startswith(ISSUER_DID): + return {"valid": False, "error": f"Unknown verification method: {vm}"} + try: signing_key = get_signing_key() verify_key = signing_key.verify_key - verify_key.verify(payload, signature) + result = verify_proof(credential, verify_key) + if not result["valid"]: + errors = [c.get("error", "check failed") for c in result.get("checks", []) if not c.get("valid")] + return {"valid": False, "error": "; ".join(errors), "checks": result["checks"]} + + # Check expiration exp = credential.get("expirationDate", "") if exp: exp_dt = datetime.datetime.fromisoformat(exp.replace("Z", "")) if datetime.datetime.utcnow() > exp_dt: return {"valid": False, "error": "Credential expired"} - return {"valid": True, "issuer": credential["issuer"], "subject": credential["credentialSubject"]["id"]} + return { + "valid": True, + "issuer": credential["issuer"], + "subject": credential["credentialSubject"]["id"], + "checks": result["checks"], + } except Exception as e: return {"valid": False, "error": str(e)} diff --git a/app/crypto/dilithium.py b/app/crypto/dilithium.py new file mode 100644 index 0000000..6506932 --- /dev/null +++ b/app/crypto/dilithium.py @@ -0,0 +1,129 @@ +""" +ML-DSA-65 (Dilithium3) post-quantum signing for MolTrust. + +Uses liboqs (Open Quantum Safe) — MIT licensed. +Install: pip install liboqs-python + +Key storage follows the same pattern as Ed25519: +- Primary: encrypted via AWS KMS (env: DILITHIUM_PRIVATE_KEY_ENCRYPTED) +- Fallback: hex env var (env: DILITHIUM_PRIVATE_KEY_HEX) — dev only +- Public key: env: DILITHIUM_PUBLIC_KEY_HEX + +If no Dilithium key is configured, PQC signing is gracefully skipped +and credentials are issued with Ed25519 only (Phase 1 → Phase 2 transition). +""" +import os +import logging +import base64 +import time + +logger = logging.getLogger("moltrust.crypto.dilithium") + +ALGORITHM = "Dilithium3" + +_cached_keypair = None +_cache_expiry = 0 +_CACHE_TTL = 300 # 5 minutes + + +def _load_keypair() -> tuple[bytes, bytes] | None: + """Load Dilithium keypair. Returns (secret_key, public_key) or None.""" + global _cached_keypair, _cache_expiry + + now = time.time() + if _cached_keypair and now < _cache_expiry: + return _cached_keypair + + # Try KMS-encrypted key first + encrypted = os.environ.get("DILITHIUM_PRIVATE_KEY_ENCRYPTED") + if encrypted: + try: + import boto3 + kms = boto3.client("kms", region_name=os.environ.get("AWS_REGION", "eu-central-1")) + response = kms.decrypt( + KeyId=os.environ.get("KMS_KEY_ID"), + CiphertextBlob=base64.b64decode(encrypted), + ) + sk_hex = response["Plaintext"].decode("utf-8").strip() + pk_hex = os.environ.get("DILITHIUM_PUBLIC_KEY_HEX", "") + if not pk_hex: + logger.error("DILITHIUM_PUBLIC_KEY_HEX required when using KMS") + return None + _cached_keypair = (bytes.fromhex(sk_hex), bytes.fromhex(pk_hex)) + _cache_expiry = now + _CACHE_TTL + return _cached_keypair + except Exception as e: + logger.error(f"Dilithium KMS decryption failed: {e}") + return None + + # Fallback: plaintext hex env vars (development only) + sk_hex = os.environ.get("DILITHIUM_PRIVATE_KEY_HEX", "") + pk_hex = os.environ.get("DILITHIUM_PUBLIC_KEY_HEX", "") + if sk_hex and pk_hex: + _cached_keypair = (bytes.fromhex(sk_hex), bytes.fromhex(pk_hex)) + _cache_expiry = now + _CACHE_TTL + return _cached_keypair + + return None + + +def is_available() -> bool: + """Check if Dilithium signing is configured and liboqs is installed.""" + try: + import oqs # noqa: F401 + except ImportError: + return False + return _load_keypair() is not None + + +def sign(payload: bytes) -> bytes | None: + """Sign payload with Dilithium3. Returns signature or None if not configured.""" + keypair = _load_keypair() + if not keypair: + return None + try: + import oqs + sk, _ = keypair + signer = oqs.Signature(ALGORITHM, secret_key=sk) + return signer.sign(payload) + except Exception as e: + logger.error(f"Dilithium signing failed: {e}") + return None + + +def verify(payload: bytes, signature: bytes, public_key: bytes) -> bool: + """Verify a Dilithium3 signature.""" + try: + import oqs + verifier = oqs.Signature(ALGORITHM) + return verifier.verify(payload, signature, public_key) + except Exception as e: + logger.error(f"Dilithium verification failed: {e}") + return False + + +def get_public_key_hex() -> str | None: + """Return the Dilithium public key as hex, or None if not configured.""" + keypair = _load_keypair() + if not keypair: + return None + return keypair[1].hex() + + +def generate_keypair() -> tuple[str, str]: + """Generate a new Dilithium3 keypair. Returns (secret_key_hex, public_key_hex). + + Utility for initial key generation — run once, store the keys securely. + """ + import oqs + signer = oqs.Signature(ALGORITHM) + pk = signer.generate_keypair() + sk = signer.export_secret_key() + return sk.hex(), pk.hex() + + +def clear_cache(): + """Clear cached keypair (for rotation).""" + global _cached_keypair, _cache_expiry + _cached_keypair = None + _cache_expiry = 0 diff --git a/app/crypto/hybrid.py b/app/crypto/hybrid.py new file mode 100644 index 0000000..19e1044 --- /dev/null +++ b/app/crypto/hybrid.py @@ -0,0 +1,134 @@ +""" +Hybrid (dual) signature module for MolTrust. + +Produces credentials with both Ed25519 and Dilithium3 proofs: +- Ed25519: legacy, for verifiers that don't support PQC yet +- Dilithium3: quantum-safe, the primary proof going forward + +If Dilithium is not configured, falls back to Ed25519-only signing. +This allows a gradual rollout: deploy the code first, add Dilithium +keys when ready. +""" +import json +import logging +from app.crypto import dilithium + +logger = logging.getLogger("moltrust.crypto.hybrid") + +ISSUER_DID = "did:web:api.moltrust.ch" + + +def dual_sign(credential: dict, ed25519_key) -> dict: + """Sign a credential with Ed25519 and optionally Dilithium3. + + Args: + credential: The VC dict (without proof field) + ed25519_key: nacl.signing.SigningKey instance + + Returns: + The credential dict with proof field set (single proof or array of two) + """ + try: + import jcs + payload = jcs.canonicalize(credential) + except ImportError: + payload = json.dumps(credential, sort_keys=True).encode() + + now_str = credential.get("issuanceDate", "") + + # Ed25519 signature + ed_signed = ed25519_key.sign(payload) + ed_proof = { + "type": "Ed25519Signature2020", + "created": now_str, + "verificationMethod": f"{ISSUER_DID}#key-ed25519", + "proofPurpose": "assertionMethod", + "canonicalizationAlgorithm": "JCS", + "proofValue": ed_signed.signature.hex(), + } + + # Dilithium signature (if available) + dil_sig = dilithium.sign(payload) + if dil_sig is not None: + dil_proof = { + "type": "DilithiumSignature2026", + "created": now_str, + "verificationMethod": f"{ISSUER_DID}#key-dilithium", + "proofPurpose": "assertionMethod", + "canonicalizationAlgorithm": "JCS", + "proofValue": dil_sig.hex(), + } + credential["proof"] = [ed_proof, dil_proof] + logger.info("Credential dual-signed (Ed25519 + Dilithium3)") + else: + credential["proof"] = ed_proof + logger.debug("Credential signed with Ed25519 only (Dilithium not configured)") + + return credential + + +def verify_proof(credential: dict, ed25519_verify_key) -> dict: + """Verify a credential's proof(s). + + Supports: + - Single Ed25519 proof (legacy, key-1 or key-ed25519) + - Single Dilithium proof + - Dual proof array (verifies both) + + Returns dict with valid, checks, and errors. + """ + proof = credential.get("proof") + if not proof: + return {"valid": False, "error": "No proof found"} + + cred_copy = {k: v for k, v in credential.items() if k != "proof"} + + proofs = proof if isinstance(proof, list) else [proof] + results = {"valid": True, "checks": []} + + for p in proofs: + proof_type = p.get("type", "") + vm = p.get("verificationMethod", "") + + # Determine canonicalization + if p.get("canonicalizationAlgorithm") == "JCS": + try: + import jcs + payload = jcs.canonicalize(cred_copy) + except ImportError: + results["checks"].append({"type": proof_type, "valid": False, "error": "JCS library not available"}) + results["valid"] = False + continue + else: + payload = json.dumps(cred_copy, sort_keys=True).encode() + + try: + signature = bytes.fromhex(p["proofValue"]) + except (ValueError, KeyError) as e: + results["checks"].append({"type": proof_type, "valid": False, "error": str(e)}) + results["valid"] = False + continue + + if "Ed25519" in proof_type: + try: + ed25519_verify_key.verify(payload, signature) + results["checks"].append({"type": "Ed25519", "valid": True}) + except Exception as e: + results["checks"].append({"type": "Ed25519", "valid": False, "error": str(e)}) + results["valid"] = False + + elif "Dilithium" in proof_type: + pk_hex = dilithium.get_public_key_hex() + if not pk_hex: + results["checks"].append({"type": "Dilithium", "valid": False, "error": "Dilithium public key not configured"}) + results["valid"] = False + else: + ok = dilithium.verify(payload, signature, bytes.fromhex(pk_hex)) + results["checks"].append({"type": "Dilithium", "valid": ok}) + if not ok: + results["valid"] = False + else: + results["checks"].append({"type": proof_type, "valid": False, "error": f"Unknown proof type: {proof_type}"}) + results["valid"] = False + + return results diff --git a/app/main.py b/app/main.py index 4e6c1b9..9c159b3 100644 --- a/app/main.py +++ b/app/main.py @@ -1236,39 +1236,75 @@ async def health_check(request: Request): } # --- W3C DID:web Support --- -DID_WEB_DOCUMENT = { - "@context": [ +_DID_BASE = "did:web:api.moltrust.ch" + +def _build_did_document() -> dict: + """Build DID document dynamically, including Dilithium key if configured.""" + from app.crypto import dilithium + + contexts = [ "https://www.w3.org/ns/did/v1", - "https://w3id.org/security/suites/ed25519-2020/v1" - ], - "id": "did:web:api.moltrust.ch", - "controller": "did:web:api.moltrust.ch", - "verificationMethod": [{ - "id": "did:web:api.moltrust.ch#key-1", + "https://w3id.org/security/suites/ed25519-2020/v1", + ] + verification_methods = [{ + "id": f"{_DID_BASE}#key-ed25519", "type": "Ed25519VerificationKey2020", - "controller": "did:web:api.moltrust.ch", + "controller": _DID_BASE, "publicKeyMultibase": "z6MktwcfvxeKmXstWpyEr9wJkJE2xzzkpBkdCSghdvCzrqDC" - }], - "authentication": ["did:web:api.moltrust.ch#key-1"], - "assertionMethod": ["did:web:api.moltrust.ch#key-1"], - "service": [ - { - "id": "did:web:api.moltrust.ch#trust-api", - "type": "TrustLayer", - "serviceEndpoint": "https://api.moltrust.ch" - }, - { - "id": "did:web:api.moltrust.ch#identity", - "type": "AgentIdentity", - "serviceEndpoint": "https://api.moltrust.ch/identity" - }, - { - "id": "did:web:api.moltrust.ch#reputation", - "type": "ReputationService", - "serviceEndpoint": "https://api.moltrust.ch/reputation" - } - ] -} + }] + auth_methods = [f"{_DID_BASE}#key-ed25519"] + assertion_methods = [f"{_DID_BASE}#key-ed25519"] + + # Legacy alias for backward compatibility + verification_methods.append({ + "id": f"{_DID_BASE}#key-1", + "type": "Ed25519VerificationKey2020", + "controller": _DID_BASE, + "publicKeyMultibase": "z6MktwcfvxeKmXstWpyEr9wJkJE2xzzkpBkdCSghdvCzrqDC" + }) + + # Add Dilithium key if configured + dil_pk = dilithium.get_public_key_hex() + if dil_pk: + contexts.append("https://w3id.org/security/suites/dilithium-2026/v1") + verification_methods.append({ + "id": f"{_DID_BASE}#key-dilithium", + "type": "DilithiumVerificationKey2026", + "controller": _DID_BASE, + "publicKeyMultibase": "z" + dil_pk[:64] + "...", # truncated for readability + "publicKeyHex": dil_pk, + }) + auth_methods.append(f"{_DID_BASE}#key-dilithium") + assertion_methods.append(f"{_DID_BASE}#key-dilithium") + + return { + "@context": contexts, + "id": _DID_BASE, + "controller": _DID_BASE, + "verificationMethod": verification_methods, + "authentication": auth_methods, + "assertionMethod": assertion_methods, + "service": [ + { + "id": f"{_DID_BASE}#trust-api", + "type": "TrustLayer", + "serviceEndpoint": "https://api.moltrust.ch" + }, + { + "id": f"{_DID_BASE}#identity", + "type": "AgentIdentity", + "serviceEndpoint": "https://api.moltrust.ch/identity" + }, + { + "id": f"{_DID_BASE}#reputation", + "type": "ReputationService", + "serviceEndpoint": "https://api.moltrust.ch/reputation" + } + ] + } + +# Cache the document (rebuilt on restart or when keys change) +DID_WEB_DOCUMENT = _build_did_document() @app.get("/.well-known/did.json") @limiter.limit("60/minute") diff --git a/app/swarm/endorsement.py b/app/swarm/endorsement.py index 5407b79..b3998b3 100644 --- a/app/swarm/endorsement.py +++ b/app/swarm/endorsement.py @@ -151,19 +151,11 @@ async def issue_endorsement( } } - # 11. Ed25519 Signatur (HIGH-5: real signing, no more sandbox_unsigned) + # 11. Dual signature (Ed25519 + Dilithium if configured) from app.credentials import get_signing_key - import json as _json + from app.crypto.hybrid import dual_sign signing_key = get_signing_key() - payload = _json.dumps(vc, sort_keys=True).encode() - signed = signing_key.sign(payload) - vc["proof"] = { - "type": "Ed25519Signature2020", - "created": now.isoformat(), - "verificationMethod": "did:web:api.moltrust.ch#key-1", - "proofPurpose": "assertionMethod", - "proofValue": signed.signature.hex() - } + vc = dual_sign(vc, signing_key) # VC JWT in DB speichern await conn.execute( diff --git a/scripts/generate_dilithium_keys.py b/scripts/generate_dilithium_keys.py new file mode 100644 index 0000000..15310fe --- /dev/null +++ b/scripts/generate_dilithium_keys.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +"""Generate a Dilithium3 (ML-DSA-65) keypair for MolTrust. + +Usage: + pip install liboqs-python + python scripts/generate_dilithium_keys.py + +Output: + Prints the secret key and public key as hex strings. + Store the secret key securely (KMS or env var). + Set DILITHIUM_PUBLIC_KEY_HEX in the environment. +""" + +try: + import oqs +except ImportError: + print("Error: liboqs-python not installed.") + print("Install with: pip install liboqs-python") + print("See: https://github.com/open-quantum-safe/liboqs-python") + raise SystemExit(1) + +signer = oqs.Signature("Dilithium3") +public_key = signer.generate_keypair() +secret_key = signer.export_secret_key() + +print(f"Algorithm: Dilithium3 (ML-DSA-65)") +print(f"Secret key length: {len(secret_key)} bytes") +print(f"Public key length: {len(public_key)} bytes") +print() +print(f"DILITHIUM_PRIVATE_KEY_HEX={secret_key.hex()}") +print() +print(f"DILITHIUM_PUBLIC_KEY_HEX={public_key.hex()}") +print() +print("IMPORTANT: Store the secret key in AWS KMS or a secrets manager.") +print("Never commit it to version control.")