diff --git a/examples/test-keystore.py b/examples/test-keystore.py new file mode 100644 index 0000000..a4ffa41 --- /dev/null +++ b/examples/test-keystore.py @@ -0,0 +1,66 @@ +import binascii +import hashlib +import json +import logging +import random +import time +from uuid import UUID + +import ubirch +from ubirch.ubirch_protocol import UBIRCH_PROTOCOL_TYPE_BIN + +logging.basicConfig(format='%(asctime)s %(name)20.20s %(levelname)-8.8s %(message)s', level=logging.DEBUG) +logger = logging.getLogger() + + +######################################################################## +# Implement the ubirch-protocol with signing and saving the signatures +class Proto(ubirch.Protocol): + + def __init__(self, key_store: ubirch.KeyStore, uuid: UUID) -> None: + super().__init__() + self.__ks = key_store + + # check if the device already has keys or generate a new pair + if not keystore.exists_signing_key(uuid): + keystore.create_ecdsa_keypair(uuid) + + def _sign(self, uuid: UUID, message: bytes) -> bytes: + return self.__ks.find_signing_key(uuid).sign(message) + + def _verify(self, uuid: UUID, message: bytes, signature: bytes): + return self.__ks.find_verifying_key(uuid).verify(signature, message) + + +######################################################################## + +uuid = UUID(hex="c8e5026e-5aef-4ad1-a7f0-1111820bf060") + +# create a keystore for the device +keystore = ubirch.KeyStore("demo-device.jks", "keystore") + +# create an instance of the protocol with signature saving +protocol = Proto(keystore, uuid) + +# create a message like being sent to the customer backend +# include an ID and timestamp in the data message to ensure a unique hash +message = { + "id": str(uuid), + "ts": int(time.time()), + "data": "{:d}".format(random.randint(0, 100)) +} + +# create a compact rendering of the message to ensure determinism when creating the hash +serialized = json.dumps(message, separators=(',', ':'), sort_keys=True, ensure_ascii=False).encode() + +# hash the message +message_hash = hashlib.sha256(serialized).digest() +logger.info("message hash: {}".format(binascii.b2a_base64(message_hash).decode().rstrip("\n"))) + +# create a new chained protocol message with the message hash +upp = protocol.message_chained(uuid, UBIRCH_PROTOCOL_TYPE_BIN, message_hash) +logger.info("UPP: {}".format(binascii.hexlify(upp).decode())) + +# verify the upp +unpacked = protocol.message_verify(upp) +print("UPP verified") diff --git a/ubirch/ubirch_ks.py b/ubirch/ubirch_ks.py index 998ce6a..5a99bf2 100644 --- a/ubirch/ubirch_ks.py +++ b/ubirch/ubirch_ks.py @@ -15,30 +15,38 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import hashlib from datetime import datetime, timedelta from logging import getLogger from os import urandom from uuid import UUID +import ecdsa import ed25519 -from ed25519 import SigningKey, VerifyingKey from jks import jks, AlgorithmIdentifier, rfc5208, TrustedCertEntry from pyasn1.codec.ber import encoder logger = getLogger(__name__) -ECC_ENCRYPTION_OID = (1, 2, 1, 3, 101, 112) +EDDSA_OID = (1, 2, 1, 3, 101, 112) +ECDSA_OID = (1, 2, 840, 10045, 4, 3, 2) class ED25519Certificate(TrustedCertEntry): - def __init__(self, alias: str, verifying_key: VerifyingKey, **kwargs): + def __init__(self, alias: str, verifying_key: ed25519.VerifyingKey, **kwargs): super().__init__(**kwargs) self.alias = alias self.cert = verifying_key.to_bytes() self.timestamp = int(datetime.utcnow().timestamp()) +class ECDSACertificate(TrustedCertEntry): + + def __init__(self, alias: str, verifying_key: ecdsa.VerifyingKey, **kwargs): + super().__init__(**kwargs) + self.alias = alias + self.cert = verifying_key.to_string() + self.timestamp = int(datetime.utcnow().timestamp()) class KeyStore(object): """ @@ -60,25 +68,26 @@ def _load_keys(self) -> None: logger.warning("creating new key store: {}".format(self._ks_file)) self._ks = jks.KeyStore.new("jks", []) - def insert_ed25519_signing_key(self, uuid, sk: SigningKey): - """Insert an existing ED25519 signing key.""" + def insert_ed25519_signing_key(self, uuid: UUID, sk: ed25519.SigningKey): + """Store an existing ED25519 signing key in the key store.""" # encode the ED25519 private key as PKCS#8 private_key_info = rfc5208.PrivateKeyInfo() private_key_info.setComponentByName('version', 'v1') a = AlgorithmIdentifier() - a.setComponentByName('algorithm', ECC_ENCRYPTION_OID) + a.setComponentByName('algorithm', EDDSA_OID) private_key_info.setComponentByName('privateKeyAlgorithm', a) private_key_info.setComponentByName('privateKey', sk.to_bytes()) pkey_pkcs8 = encoder.encode(private_key_info) pke = jks.PrivateKeyEntry.new(alias=str(uuid.hex), certs=[], key=pkey_pkcs8) self._ks.entries['pke_' + uuid.hex] = pke - def insert_ed25519_verifying_key(self, uuid, vk: VerifyingKey): - # store verifying key in certificate store + def insert_ed25519_verifying_key(self, uuid: UUID, vk: ed25519.VerifyingKey): + """Store an existing ED25519 verifying key in the key store.""" self._ks.entries[uuid.hex] = ED25519Certificate(uuid.hex, vk) - def insert_ed25519_keypair(self, uuid: UUID, vk: VerifyingKey, sk: SigningKey) -> (VerifyingKey, SigningKey): - """Insert an existing ED25519 key pair into the key store.""" + def insert_ed25519_keypair(self, uuid: UUID, vk: ed25519.VerifyingKey, sk: ed25519.SigningKey) -> ( + ed25519.VerifyingKey, ed25519.SigningKey): + """Store an existing ED25519 key pair in the key store.""" if uuid.hex in self._ks.entries or uuid.hex in self._ks.certs: raise Exception("uuid '{}' already exists in keystore".format(uuid.hex)) @@ -88,11 +97,46 @@ def insert_ed25519_keypair(self, uuid: UUID, vk: VerifyingKey, sk: SigningKey) - logger.info("inserted new key pair for {}: {}".format(uuid.hex, bytes.decode(vk.to_ascii(encoding='hex')))) return (vk, sk) - def create_ed25519_keypair(self, uuid: UUID) -> (VerifyingKey, SigningKey): + def create_ed25519_keypair(self, uuid: UUID) -> (ed25519.VerifyingKey, ed25519.SigningKey): """Create a new ED25519 key pair and store in key store.""" sk, vk = ed25519.create_keypair(entropy=urandom) return self.insert_ed25519_keypair(uuid, vk, sk) + def insert_ecdsa_signing_key(self, uuid, sk: ecdsa.SigningKey): + """Insert an existing ECDSA signing key.""" + # encode the ECDSA private key as PKCS#8 + private_key_info = rfc5208.PrivateKeyInfo() + private_key_info.setComponentByName('version', 'v1') + a = AlgorithmIdentifier() + a.setComponentByName('algorithm', ECDSA_OID) + private_key_info.setComponentByName('privateKeyAlgorithm', a) + private_key_info.setComponentByName('privateKey', sk.to_string()) + pkey_pkcs8 = encoder.encode(private_key_info) + pke = jks.PrivateKeyEntry.new(alias=str(uuid.hex), certs=[], key=pkey_pkcs8) + self._ks.entries['pke_' + uuid.hex] = pke + + def insert_ecdsa_verifying_key(self, uuid, vk: ecdsa.VerifyingKey): + # store verifying key in certificate store + self._ks.entries[uuid.hex] = ECDSACertificate(uuid.hex, vk) + + def insert_ecdsa_keypair(self, uuid: UUID, vk: ecdsa.VerifyingKey, sk: ecdsa.SigningKey) -> (ecdsa.VerifyingKey, ecdsa.SigningKey): + """Insert an existing ECDSA key pair into the key store.""" + if uuid.hex in self._ks.entries or uuid.hex in self._ks.certs: + raise Exception("uuid '{}' already exists in keystore".format(uuid.hex)) + + self.insert_ecdsa_verifying_key(uuid, vk) + self.insert_ecdsa_signing_key(uuid, sk) + self._ks.save(self._ks_file, self._ks_password) + logger.info("inserted new key pair for {}: {}".format(uuid.hex, vk.to_string().decode())) + return (vk, sk) + + def create_ecdsa_keypair(self, uuid: UUID, curve: ecdsa.curves.Curve = ecdsa.NIST256p, hashfunc=hashlib.sha256) -> (ecdsa.VerifyingKey, ecdsa.SigningKey): + """Create new ECDSA key pair and store in key store""" + + sk = ecdsa.SigningKey.generate(curve, entropy=urandom, hashfunc=hashfunc) + vk = sk.get_verifying_key() + return self.insert_ecdsa_keypair(uuid, vk, sk) + def exists_signing_key(self, uuid: UUID): """Check whether this UUID has a signing key in the key store.""" return 'pke_' + uuid.hex in self._ks.private_keys @@ -101,17 +145,28 @@ def exists_verifying_key(self, uuid: UUID): """Check whether this UUID has a verifying key in the key store.""" return uuid.hex in self._ks.certs - def find_signing_key(self, uuid: UUID) -> SigningKey: + def find_signing_key(self, uuid: UUID) -> ed25519.SigningKey or ecdsa.SigningKey: """Find the signing key for this UUID.""" sk = self._ks.private_keys['pke_' + uuid.hex] - return SigningKey(sk.pkey) - - def find_verifying_key(self, uuid: UUID) -> VerifyingKey: + if sk._algorithm_oid == EDDSA_OID: # FIXME sk._algorithm_oid is None + return ed25519.SigningKey(sk.pkey) + elif sk._algorithm_oid == ECDSA_OID: + return ecdsa.SigningKey(sk.pkey) + else: + raise Exception("stored key with unknown algorithm OID: '{}'".format(sk._algorithm_oid)) + + def find_verifying_key(self, uuid: UUID) -> ed25519.VerifyingKey or ecdsa.VerifyingKey: """Find the verifying key for this UUID.""" cert = self._ks.certs[uuid.hex] - return VerifyingKey(cert.cert) + + # try: + # vk = ed25519.VerifyingKey(cert.cert) + # except + + return ed25519.VerifyingKey(cert.cert) def get_certificate(self, uuid: UUID) -> dict or None: + """Get the public key info for key registration""" if not uuid.hex in self._ks.certs: return None @@ -122,7 +177,7 @@ def get_certificate(self, uuid: UUID) -> dict or None: # TODO fix handling of key validity not_after = created + timedelta(days=365) return { - "algorithm": 'ECC_ED25519', + "algorithm": 'ECC_ED25519', # TODO if ECDSA 'ecdsa-p256v1' "created": int(created.timestamp()), "hwDeviceId": uuid.bytes, "pubKey": vk.to_bytes(),