Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions app/credentials.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
"""MolTrust Verifiable Credentials - W3C VC Data Model"""
"""MolTrust Verifiable Credentials - W3C VC Data Model

Supports dual signatures (Ed25519 + Dilithium3) for post-quantum safety.
If Dilithium keys are not configured, falls back to Ed25519-only signing.
Verification handles legacy (single Ed25519), new (dual), and future
(Dilithium-only) credentials transparently.
"""
import os, json, datetime, hashlib
from nacl.signing import SigningKey
from app.crypto.kms_signer import get_decrypted_signing_key_hex
from app.crypto.hybrid import dual_sign, verify_proof

ISSUER_DID = "did:web:api.moltrust.ch"

Expand All @@ -27,40 +34,42 @@ def issue_credential(subject_did: str, credential_type: str, claims: dict) -> di
}

signing_key = get_signing_key()
payload = json.dumps(credential, sort_keys=True).encode()
signed = signing_key.sign(payload)

credential["proof"] = {
"type": "Ed25519Signature2020",
"created": now.isoformat() + "Z",
"verificationMethod": f"{ISSUER_DID}#key-1",
"proofPurpose": "assertionMethod",
"proofValue": signed.signature.hex()
}
credential = dual_sign(credential, signing_key)
return credential

def verify_credential(credential: dict) -> dict:
proof = credential.get("proof")
if not proof:
return {"valid": False, "error": "No proof found"}
if proof.get("verificationMethod") != f"{ISSUER_DID}#key-1":
return {"valid": False, "error": "Unknown verification method"}

try:
cred_copy = {k: v for k, v in credential.items() if k != "proof"}
payload = json.dumps(cred_copy, sort_keys=True).encode()
signature = bytes.fromhex(proof["proofValue"])
# Check verification method(s) belong to our issuer
proofs = proof if isinstance(proof, list) else [proof]
for p in proofs:
vm = p.get("verificationMethod", "")
if not vm.startswith(ISSUER_DID):
return {"valid": False, "error": f"Unknown verification method: {vm}"}

try:
signing_key = get_signing_key()
verify_key = signing_key.verify_key
verify_key.verify(payload, signature)

result = verify_proof(credential, verify_key)
if not result["valid"]:
errors = [c.get("error", "check failed") for c in result.get("checks", []) if not c.get("valid")]
return {"valid": False, "error": "; ".join(errors), "checks": result["checks"]}

# Check expiration
exp = credential.get("expirationDate", "")
if exp:
exp_dt = datetime.datetime.fromisoformat(exp.replace("Z", ""))
if datetime.datetime.utcnow() > exp_dt:
return {"valid": False, "error": "Credential expired"}

return {"valid": True, "issuer": credential["issuer"], "subject": credential["credentialSubject"]["id"]}
return {
"valid": True,
"issuer": credential["issuer"],
"subject": credential["credentialSubject"]["id"],
"checks": result["checks"],
}
except Exception as e:
return {"valid": False, "error": str(e)}
129 changes: 129 additions & 0 deletions app/crypto/dilithium.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""
ML-DSA-65 (Dilithium3) post-quantum signing for MolTrust.

Uses liboqs (Open Quantum Safe) — MIT licensed.
Install: pip install liboqs-python

Key storage follows the same pattern as Ed25519:
- Primary: encrypted via AWS KMS (env: DILITHIUM_PRIVATE_KEY_ENCRYPTED)
- Fallback: hex env var (env: DILITHIUM_PRIVATE_KEY_HEX) — dev only
- Public key: env: DILITHIUM_PUBLIC_KEY_HEX

If no Dilithium key is configured, PQC signing is gracefully skipped
and credentials are issued with Ed25519 only (Phase 1 → Phase 2 transition).
"""
import os
import logging
import base64
import time

logger = logging.getLogger("moltrust.crypto.dilithium")

ALGORITHM = "Dilithium3"

_cached_keypair = None
_cache_expiry = 0
_CACHE_TTL = 300 # 5 minutes


def _load_keypair() -> tuple[bytes, bytes] | None:
"""Load Dilithium keypair. Returns (secret_key, public_key) or None."""
global _cached_keypair, _cache_expiry

now = time.time()
if _cached_keypair and now < _cache_expiry:
return _cached_keypair

# Try KMS-encrypted key first
encrypted = os.environ.get("DILITHIUM_PRIVATE_KEY_ENCRYPTED")
if encrypted:
try:
import boto3
kms = boto3.client("kms", region_name=os.environ.get("AWS_REGION", "eu-central-1"))
response = kms.decrypt(
KeyId=os.environ.get("KMS_KEY_ID"),
CiphertextBlob=base64.b64decode(encrypted),
)
sk_hex = response["Plaintext"].decode("utf-8").strip()
pk_hex = os.environ.get("DILITHIUM_PUBLIC_KEY_HEX", "")
if not pk_hex:
logger.error("DILITHIUM_PUBLIC_KEY_HEX required when using KMS")
return None
_cached_keypair = (bytes.fromhex(sk_hex), bytes.fromhex(pk_hex))
_cache_expiry = now + _CACHE_TTL
return _cached_keypair
except Exception as e:
logger.error(f"Dilithium KMS decryption failed: {e}")
return None

# Fallback: plaintext hex env vars (development only)
sk_hex = os.environ.get("DILITHIUM_PRIVATE_KEY_HEX", "")
pk_hex = os.environ.get("DILITHIUM_PUBLIC_KEY_HEX", "")
if sk_hex and pk_hex:
_cached_keypair = (bytes.fromhex(sk_hex), bytes.fromhex(pk_hex))
_cache_expiry = now + _CACHE_TTL
return _cached_keypair

return None


def is_available() -> bool:
"""Check if Dilithium signing is configured and liboqs is installed."""
try:
import oqs # noqa: F401
except ImportError:
return False
return _load_keypair() is not None


def sign(payload: bytes) -> bytes | None:
"""Sign payload with Dilithium3. Returns signature or None if not configured."""
keypair = _load_keypair()
if not keypair:
return None
try:
import oqs
sk, _ = keypair
signer = oqs.Signature(ALGORITHM, secret_key=sk)
return signer.sign(payload)
except Exception as e:
logger.error(f"Dilithium signing failed: {e}")
return None


def verify(payload: bytes, signature: bytes, public_key: bytes) -> bool:
"""Verify a Dilithium3 signature."""
try:
import oqs
verifier = oqs.Signature(ALGORITHM)
return verifier.verify(payload, signature, public_key)
except Exception as e:
logger.error(f"Dilithium verification failed: {e}")
return False


def get_public_key_hex() -> str | None:
"""Return the Dilithium public key as hex, or None if not configured."""
keypair = _load_keypair()
if not keypair:
return None
return keypair[1].hex()


def generate_keypair() -> tuple[str, str]:
"""Generate a new Dilithium3 keypair. Returns (secret_key_hex, public_key_hex).

Utility for initial key generation — run once, store the keys securely.
"""
import oqs
signer = oqs.Signature(ALGORITHM)
pk = signer.generate_keypair()
sk = signer.export_secret_key()
return sk.hex(), pk.hex()


def clear_cache():
"""Clear cached keypair (for rotation)."""
global _cached_keypair, _cache_expiry
_cached_keypair = None
_cache_expiry = 0
134 changes: 134 additions & 0 deletions app/crypto/hybrid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""
Hybrid (dual) signature module for MolTrust.

Produces credentials with both Ed25519 and Dilithium3 proofs:
- Ed25519: legacy, for verifiers that don't support PQC yet
- Dilithium3: quantum-safe, the primary proof going forward

If Dilithium is not configured, falls back to Ed25519-only signing.
This allows a gradual rollout: deploy the code first, add Dilithium
keys when ready.
"""
import json
import logging
from app.crypto import dilithium

logger = logging.getLogger("moltrust.crypto.hybrid")

ISSUER_DID = "did:web:api.moltrust.ch"


def dual_sign(credential: dict, ed25519_key) -> dict:
"""Sign a credential with Ed25519 and optionally Dilithium3.

Args:
credential: The VC dict (without proof field)
ed25519_key: nacl.signing.SigningKey instance

Returns:
The credential dict with proof field set (single proof or array of two)
"""
try:
import jcs
payload = jcs.canonicalize(credential)
except ImportError:
payload = json.dumps(credential, sort_keys=True).encode()

now_str = credential.get("issuanceDate", "")

# Ed25519 signature
ed_signed = ed25519_key.sign(payload)
ed_proof = {
"type": "Ed25519Signature2020",
"created": now_str,
"verificationMethod": f"{ISSUER_DID}#key-ed25519",
"proofPurpose": "assertionMethod",
"canonicalizationAlgorithm": "JCS",
"proofValue": ed_signed.signature.hex(),
}

# Dilithium signature (if available)
dil_sig = dilithium.sign(payload)
if dil_sig is not None:
dil_proof = {
"type": "DilithiumSignature2026",
"created": now_str,
"verificationMethod": f"{ISSUER_DID}#key-dilithium",
"proofPurpose": "assertionMethod",
"canonicalizationAlgorithm": "JCS",
"proofValue": dil_sig.hex(),
}
credential["proof"] = [ed_proof, dil_proof]
logger.info("Credential dual-signed (Ed25519 + Dilithium3)")
else:
credential["proof"] = ed_proof
logger.debug("Credential signed with Ed25519 only (Dilithium not configured)")

return credential


def verify_proof(credential: dict, ed25519_verify_key) -> dict:
"""Verify a credential's proof(s).

Supports:
- Single Ed25519 proof (legacy, key-1 or key-ed25519)
- Single Dilithium proof
- Dual proof array (verifies both)

Returns dict with valid, checks, and errors.
"""
proof = credential.get("proof")
if not proof:
return {"valid": False, "error": "No proof found"}

cred_copy = {k: v for k, v in credential.items() if k != "proof"}

proofs = proof if isinstance(proof, list) else [proof]
results = {"valid": True, "checks": []}

for p in proofs:
proof_type = p.get("type", "")
vm = p.get("verificationMethod", "")

# Determine canonicalization
if p.get("canonicalizationAlgorithm") == "JCS":
try:
import jcs
payload = jcs.canonicalize(cred_copy)
except ImportError:
results["checks"].append({"type": proof_type, "valid": False, "error": "JCS library not available"})
results["valid"] = False
continue
else:
payload = json.dumps(cred_copy, sort_keys=True).encode()

try:
signature = bytes.fromhex(p["proofValue"])
except (ValueError, KeyError) as e:
results["checks"].append({"type": proof_type, "valid": False, "error": str(e)})
results["valid"] = False
continue

if "Ed25519" in proof_type:
try:
ed25519_verify_key.verify(payload, signature)
results["checks"].append({"type": "Ed25519", "valid": True})
except Exception as e:
results["checks"].append({"type": "Ed25519", "valid": False, "error": str(e)})
results["valid"] = False

elif "Dilithium" in proof_type:
pk_hex = dilithium.get_public_key_hex()
if not pk_hex:
results["checks"].append({"type": "Dilithium", "valid": False, "error": "Dilithium public key not configured"})
results["valid"] = False
else:
ok = dilithium.verify(payload, signature, bytes.fromhex(pk_hex))
results["checks"].append({"type": "Dilithium", "valid": ok})
if not ok:
results["valid"] = False
else:
results["checks"].append({"type": proof_type, "valid": False, "error": f"Unknown proof type: {proof_type}"})
results["valid"] = False

return results
Loading