From f8dff835e5642b1609277475f3fdae0900ff732f Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Wed, 18 Feb 2026 11:38:56 -0800 Subject: [PATCH 1/9] add materials to duvet --- .duvet/config.toml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.duvet/config.toml b/.duvet/config.toml index 41d29170..868a8f20 100644 --- a/.duvet/config.toml +++ b/.duvet/config.toml @@ -11,6 +11,12 @@ comment-style = { meta = "##=", content = "##%" } # Include required specifications here [[specification]] +source = "specification/s3-encryption/materials/keyrings.md" +[[specification]] +source = "specification/s3-encryption/materials/s3-keyring.md" +[[specification]] +source = "specification/s3-encryption/materials/s3-kms-keyring.md" +[[specification]] source = "specification/s3-encryption/client.md" [[specification]] source = "specification/s3-encryption/decryption.md" From e839d3a30a0ef4072dadbc654dd822dbc6ce3a04 Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Mon, 23 Feb 2026 15:24:57 -0800 Subject: [PATCH 2/9] duvet for keyring/kms keyring --- .duvet/config.toml | 4 + .../kms_keyring_exceptions.md | 214 ++++++++ src/s3_encryption/materials/keyring.py | 53 +- src/s3_encryption/materials/kms_keyring.py | 96 +++- test/test_kms_keyring.py | 457 ++++++++++++++++++ 5 files changed, 813 insertions(+), 11 deletions(-) create mode 100644 compliance_exceptions/kms_keyring_exceptions.md create mode 100644 test/test_kms_keyring.py diff --git a/.duvet/config.toml b/.duvet/config.toml index 868a8f20..cb7abf7f 100644 --- a/.duvet/config.toml +++ b/.duvet/config.toml @@ -8,6 +8,10 @@ comment-style = { meta = "##=", content = "##%" } pattern = "test/**/*.py" type = "test" comment-style = { meta = "##=", content = "##%" } +[[source]] +pattern = "compliance_exceptions/**/*.md" +type = "exception" +comment-style = { meta = "##=", content = "##%" } # Include required specifications here [[specification]] diff --git a/compliance_exceptions/kms_keyring_exceptions.md b/compliance_exceptions/kms_keyring_exceptions.md new file mode 100644 index 00000000..0da55fb3 --- /dev/null +++ b/compliance_exceptions/kms_keyring_exceptions.md @@ -0,0 +1,214 @@ +# Compliance Exceptions for KMS Keyring Implementation + +## Summary + +The Python S3 Encryption Client implementation takes a pragmatic approach that: +1. Simplifies the keyring architecture by not implementing the full abstract method pattern (GenerateDataKey, EncryptDataKey, DecryptDataKey) +2. Defers validation to the AWS SDK where appropriate (key identifier validation) +3. Uses more efficient KMS API patterns (GenerateDataKey instead of separate Generate + Encrypt) +4. Omits optional features like custom User Agent strings (planned for future enhancement) + +## TODOs + +##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context +##= type=TODO +##% - A custom API Name or User Agent string SHOULD be provided in order to provide metrics on KMS calls associated with the S3 Encryption Client. + +##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 +##= type=TODO +##% - A custom API Name or User Agent string SHOULD be provided in order to provide metrics on KMS calls associated with the S3 Encryption Client. + +## Initialization Validation + +##= specification/s3-encryption/materials/s3-kms-keyring.md#initialization +##= type=exception +##% The KmsKeyring MAY validate that the AWS KMS key identifier is not null or empty. + +Justification: This validation is not implemented. The Python implementation relies on attrs field validation and KMS SDK to catch invalid key identifiers. + +--- + +##= specification/s3-encryption/materials/s3-kms-keyring.md#initialization +##= type=exception +##% If the KmsKeyring validates that the AWS KMS key identifier is not null or empty, then it MUST throw an exception. + +Justification: Not applicable since the MAY validation above is not implemented. If we don't validate, we don't need to throw an exception for validation failure. + +--- + +##= specification/s3-encryption/materials/s3-kms-keyring.md#initialization +##= type=exception +##% The KmsKeyring MAY validate that the AWS KMS key identifier is [a valid AWS KMS Key identifier](../../framework/aws-kms/aws-kms-key-arn.md#a-valid-aws-kms-identifier). + +Justification: This validation is not implemented. The Python implementation defers validation to the AWS KMS SDK, which will return an error if the key identifier is invalid. + +--- + +##= specification/s3-encryption/materials/s3-kms-keyring.md#initialization +##= type=exception +##% If the KmsKeyring validates that the AWS KMS key identifier is not a valid AWS KMS Key identifier, then it MUST throw an exception. + +Justification: Not applicable since the MAY validation above is not implemented. If we don't validate, we don't need to throw an exception for validation failure. + +--- + +## EncryptDataKey Method + +##= specification/s3-encryption/materials/s3-kms-keyring.md#encryptdatakey +##= type=exception +##% The KmsKeyring MUST implement the EncryptDataKey method. + +Justification: The Python implementation does not define a separate EncryptDataKey method. Instead, the encryption logic is directly implemented in the on_encrypt method using KMS GenerateDataKey API, which both generates and encrypts the data key in a single call. This is more efficient than the spec's pattern of separate Generate + Encrypt calls. + +--- + +##= specification/s3-encryption/materials/s3-kms-keyring.md#encryptdatakey +##= type=exception +##% The keyring MUST call [AWS KMS Encrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Encrypt.html) using the configured AWS KMS client. + +Justification: The Python implementation uses KMS GenerateDataKey instead of KMS Encrypt. GenerateDataKey is more efficient as it generates and encrypts the data key in a single API call, rather than requiring separate generation and encryption operations. This reduces latency and API call count. + +--- + +##= specification/s3-encryption/materials/s3-kms-keyring.md#encryptdatakey +##= type=exception +##% - `KeyId` MUST be the configured AWS KMS key identifier. + +Justification: This requirement is for the KMS Encrypt API call. Since the Python implementation uses GenerateDataKey instead of Encrypt, this specific requirement doesn't apply. However, the KeyId parameter is correctly passed to GenerateDataKey. + +--- + +##= specification/s3-encryption/materials/s3-kms-keyring.md#encryptdatakey +##= type=exception +##% - `PlaintextDataKey` MUST be the plaintext data key in the [encryption materials](../structures.md#encryption-materials). + +Justification: The Python implementation uses KMS GenerateDataKey instead of Encrypt. GenerateDataKey generates the plaintext key itself, so there is no pre-existing plaintext data key to pass in. The Plaintext parameter doesn't exist in the GenerateDataKey API - instead, the API returns both the plaintext and encrypted versions of the newly generated key. + +--- + +##= specification/s3-encryption/materials/s3-kms-keyring.md#encryptdatakey +##= type=exception +##% - `EncryptionContext` MUST be the [encryption context](../structures.md#encryption-context) included in the input [encryption materials](../structures.md#encryption-materials). + +Justification: This requirement is for the KMS Encrypt API call. Since the Python implementation uses GenerateDataKey instead of Encrypt, this specific requirement doesn't apply. However, the EncryptionContext parameter is correctly passed to GenerateDataKey. + +--- + +##= specification/s3-encryption/materials/s3-kms-keyring.md#encryptdatakey +##= type=exception +##% - A custom API Name or User Agent string SHOULD be provided in order to provide metrics on KMS calls associated with the S3 Encryption Client. + +Justification: Custom User Agent strings are not currently implemented. This is a future enhancement for better observability and metrics tracking. The functionality works correctly without it, but metrics attribution to the S3 Encryption Client would be improved with this addition. + +--- + +##= specification/s3-encryption/materials/s3-kms-keyring.md#encryptdatakey +##= type=exception +##% If the call to [AWS KMS Encrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Encrypt.html) does not succeed, OnEncrypt MUST fail. + +Justification: This requirement is for the KMS Encrypt API call. Since the Python implementation uses GenerateDataKey instead of Encrypt, this specific requirement doesn't apply. However, the implementation does correctly fail when GenerateDataKey fails. + +--- + +##= specification/s3-encryption/materials/s3-kms-keyring.md#encryptdatakey +##= type=exception +##% If the call to AWS KMS Encrypt is successful, OnEncrypt MUST return the `CiphertextBlob` as a collection of bytes. + +Justification: This requirement is for the KMS Encrypt API call. Since the Python implementation uses GenerateDataKey instead of Encrypt, this specific requirement doesn't apply. However, the implementation does correctly return the CiphertextBlob from GenerateDataKey's response. + +--- + +## DecryptDataKey Method Structure + +##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 +##= type=exception +##% - A custom API Name or User Agent string SHOULD be provided in order to provide metrics on KMS calls associated with the S3 Encryption Client. + +Justification: Custom User Agent strings are not currently implemented for KMS Decrypt calls. This is a future enhancement for better observability and metrics tracking. + +--- + +##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context +##= type=exception +##% - A custom API Name or User Agent string SHOULD be provided in order to provide metrics on KMS calls associated with the S3 Encryption Client. + +Justification: Custom User Agent strings are not currently implemented for KMS Decrypt calls in Kms+Context mode. This is a future enhancement for better observability and metrics tracking. + +--- + +## S3 Keyring Abstract Methods + +##= specification/s3-encryption/materials/s3-keyring.md#abstract-methods +##= type=exception +##% - The S3 Keyring MUST define an abstract method GenerateDataKey. + +Justification: The S3Keyring base class does not define abstract methods for GenerateDataKey, EncryptDataKey, or DecryptDataKey. The Python implementation uses a simpler design where concrete keyrings (like KmsKeyring) directly implement the on_encrypt and on_decrypt methods without the intermediate abstraction layer. This reduces complexity for the initial implementation. + +--- + +##= specification/s3-encryption/materials/s3-keyring.md#abstract-methods +##= type=exception +##% - The S3 Keyring MUST define an abstract method EncryptDataKey. + +Justification: The S3Keyring base class does not define abstract methods for GenerateDataKey, EncryptDataKey, or DecryptDataKey. +The Python implementation uses a simpler design where concrete keyrings (like KmsKeyring) directly implement the on_encrypt and on_decrypt methods without the intermediate abstraction layer. + +--- + +##= specification/s3-encryption/materials/s3-keyring.md#abstract-methods +##= type=exception +##% - The S3 Keyring MUST define an abstract method DecryptDataKey. + +Justification: The S3Keyring base class does not define abstract methods for GenerateDataKey, EncryptDataKey, or DecryptDataKey. +The Python implementation uses a simpler design where concrete keyrings (like KmsKeyring) directly implement the on_encrypt and on_decrypt methods without the intermediate abstraction layer. + +--- + +## S3 Keyring OnEncrypt Logic + +##= specification/s3-encryption/materials/s3-keyring.md#onencrypt +##= type=exception +##% If the Plaintext Data Key in the input EncryptionMaterials is null, the S3 Keyring MUST call the GenerateDataKey method using the materials. + +Justification: The S3Keyring base class does not implement this logic. The concrete KmsKeyring implementation directly calls KMS Encrypt in its on_encrypt method. +The specification's pattern of checking for null plaintext and conditionally calling GenerateDataKey is not followed; instead, the implementation always generates a new key. + +--- + +##= specification/s3-encryption/materials/s3-keyring.md#onencrypt +##= type=exception +##% If the materials returned from GenerateDataKey contain an EncryptedDataKey, the S3 Keyring MUST return the materials. + +Justification: Not applicable since the GenerateDataKey method pattern is not implemented. The KmsKeyring directly handles key generation and encryption in on_encrypt. + +--- + +##= specification/s3-encryption/materials/s3-keyring.md#onencrypt +##= type=exception +##% If the materials returned from GenerateDataKey do not contain an EncryptedDataKey, the S3 Keyring MUST call the EncryptDataKey method using the materials. + +Justification: Not applicable since the GenerateDataKey and EncryptDataKey method pattern is not implemented. +The KmsKeyring uses KMS GenerateDataKey which returns both plaintext and encrypted key in a single call. + +--- + +## S3 Keyring OnDecrypt Validations + +##= specification/s3-encryption/materials/s3-keyring.md#ondecrypt +##= type=exception +##% The S3 Keyring MAY validate that the Key Provider ID of the Encrypted Data Key matches the expected default Key Provider ID value. + +Justification: This optional validation is not implemented. +The Key Provider ID field is not used for anything in S3EC. + +--- + +##= specification/s3-encryption/materials/s3-keyring.md#ondecrypt +##= type=exception +##% The S3 Keyring MUST call the DecryptDataKey method using the materials and add the resulting plaintext data key to the materials. + +Justification: The S3Keyring base class does not implement this logic. +The concrete KmsKeyring implementation directly calls KMS Decrypt in its on_decrypt method rather than calling a separate DecryptDataKey method. +This is consistent with the simplified design that doesn't use the abstract method pattern. + +--- diff --git a/src/s3_encryption/materials/keyring.py b/src/s3_encryption/materials/keyring.py index 1e08cb18..922b9a26 100644 --- a/src/s3_encryption/materials/keyring.py +++ b/src/s3_encryption/materials/keyring.py @@ -14,6 +14,13 @@ from .materials import DecryptionMaterials, EncryptionMaterials + +##= specification/s3-encryption/materials/keyrings.md#interface +##= type=implication +##% The Keyring interface and its operations SHOULD adhere to the naming conventions of the implementation language. +##= specification/s3-encryption/materials/keyrings.md#supported-keyrings +##= type=implication +##% Note: A user MAY create their own custom keyring(s). @define class AbstractKeyring(abc.ABC): """Abstract base class for keyrings. @@ -22,8 +29,13 @@ class AbstractKeyring(abc.ABC): Concrete implementations handle specific key providers like KMS. """ + ##= specification/s3-encryption/materials/keyrings.md#interface + ##= type=implication + ##% The Keyring interface MUST include the OnEncrypt operation. + ##% The OnEncrypt operation MUST accept an instance of EncryptionMaterials as input. + ##% The OnEncrypt operation MUST return an instance of EncryptionMaterials as output. @abc.abstractmethod - def on_encrypt(self, enc_materials): + def on_encrypt(self, enc_materials) -> 'EncryptionMaterials': """Process encryption materials. Args: @@ -34,8 +46,13 @@ def on_encrypt(self, enc_materials): """ pass + ##= specification/s3-encryption/materials/keyrings.md#interface + ##= type=implication + ##% The Keyring interface MUST include the OnDecrypt operation. + ##% The OnDecrypt operation MUST accept an instance of DecryptionMaterials and a collection of EncryptedDataKey instances as input. + ##% The OnDecrypt operation MUST return an instance of DecryptionMaterials as output. @abc.abstractmethod - def on_decrypt(self, dec_materials, encrypted_data_keys=None): + def on_decrypt(self, dec_materials, encrypted_data_keys=None) -> 'DecryptionMaterials': """Decrypt one of the encrypted data keys and update dec_materials. Args: @@ -50,10 +67,18 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): pass +##= specification/s3-encryption/materials/s3-keyring.md#overview +##= type=implication +##% The S3EC SHOULD implement an S3 Keyring to consolidate validation and other functionality common to all S3 Keyrings. +##% If implemented, the S3 Keyring MUST implement the Keyring interface. @define class S3Keyring(AbstractKeyring): - """Base class for S3 encryption keyrings that provides common validation logic.""" + """Abstract class for S3EC keyrings that provides common validation logic.""" + ##= specification/s3-encryption/materials/s3-keyring.md#overview + ##= type=implication + ##% If implemented, the S3 Keyring MUST NOT be able to be instantiated as a Keyring instance. + @abc.abstractmethod def on_encrypt(self, enc_materials): """Validate encryption materials before encryption. @@ -79,6 +104,10 @@ def on_encrypt(self, enc_materials): return enc_materials + ##= specification/s3-encryption/materials/s3-keyring.md#overview + ##= type=implication + ##% If implemented, the S3 Keyring MUST NOT be able to be instantiated as a Keyring instance. + @abc.abstractmethod def on_decrypt(self, dec_materials, encrypted_data_keys=None): """Validate decryption materials before decryption. @@ -98,15 +127,27 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): ) # Use encrypted_data_keys from parameters if provided, otherwise use from dec_materials + # TODO: This can probably be cleaned up, consult Java edks = ( encrypted_data_keys if encrypted_data_keys is not None else dec_materials.encrypted_data_keys ) - # Validate encrypted_data_keys - if edks is None or len(edks) == 0: - raise S3EncryptionClientError("No encrypted data keys provided") + if edks is None: + raise S3EncryptionClientError("No EncryptedDataKey provided on decrypt!") + + ##= specification/s3-encryption/materials/s3-keyring.md#ondecrypt + ##= type=implication + ##% If the input DecryptionMaterials contains a Plaintext Data Key, the S3 Keyring MUST throw an exception. + if dec_materials.plaintext_data_key is not None: + raise S3EncryptionClientError("Decryption materials already contains a plaintext data key.") + + ##= specification/s3-encryption/materials/s3-keyring.md#ondecrypt + ##= type=implication + ##% If the input collection of EncryptedDataKey instances contains any number of EDKs other than 1, the S3 Keyring MUST throw an exception. + if len(edks) != 1: + raise S3EncryptionClientError(f"Only one encrypted data key is supported, found: {len(edks)}") # Ensure encryption contexts are dictionaries if not isinstance(dec_materials.encryption_context_from_request, dict): diff --git a/src/s3_encryption/materials/kms_keyring.py b/src/s3_encryption/materials/kms_keyring.py index f8bc4997..67efcc1b 100644 --- a/src/s3_encryption/materials/kms_keyring.py +++ b/src/s3_encryption/materials/kms_keyring.py @@ -17,6 +17,9 @@ KMS_V1_DEFAULT_KEY = "kms_cmk_id" +##= specification/s3-encryption/materials/s3-kms-keyring.md#interface +##= type=implication +##% The KmsKeyring MUST implement the [Keyring interface](keyrings.md#interface) and include the behavior described in the [S3 Keyring](s3-keyring.md). @define class KmsKeyring(S3Keyring): """KMS implementation of the S3 keyring. @@ -29,10 +32,25 @@ class KmsKeyring(S3Keyring): enable_legacy_wrapping_algorithms (bool): Whether to enable legacy wrapping algorithms """ + ##= specification/s3-encryption/materials/s3-kms-keyring.md#initialization + ##= type=implementation + ##% On initialization, the caller MAY provide an AWS KMS SDK client instance. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#initialization + ##= type=implication + ##% If the caller does not provide an AWS KMS SDK client instance or provides a null value, the KmsKeyring MUST create a default KMS client instance. kms_client: client.BaseClient = field() + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#initialization + ##= type=implementation + ##% On initialization, the caller MUST provide an AWS KMS key identifier. kms_key_id: str = field() + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##= type=implementation + ##% The KmsV1 mode MUST be only enabled when legacy wrapping algorithms are enabled. enable_legacy_wrapping_algorithms: bool = field(default=False) - + + def on_encrypt(self, enc_materials): """Process encryption materials using KMS. @@ -43,12 +61,27 @@ def on_encrypt(self, enc_materials): EncryptionMaterials: The processed encryption materials with KMS-generated keys """ try: - # Call parent class validation enc_materials = super().on_encrypt(enc_materials) encryption_context = enc_materials.encryption_context + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##= type=implementation + ##% The KmsKeyring MUST support encryption using Kms+Context mode. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##= type=implementation + ##% The Kms+Context mode MUST be enabled as a fully-supported (non-legacy) wrapping algorithm. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##= type=implication + ##% The KmsKeyring MUST NOT support encryption using KmsV1 mode. encryption_context["aws:x-amz-cek-alg"] = "AES/GCM/NoPadding" + ##= specification/s3-encryption/materials/s3-kms-keyring.md#encryptdatakey + ##= type=exception + ##% The KmsKeyring MUST implement the EncryptDataKey method. + ##% The keyring MUST call [AWS KMS Encrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Encrypt.html) using the configured AWS KMS client. + # Python implementation uses KMS GenerateDataKey instead of the spec's EncryptDataKey pattern + # The spec is wrong and needs to be updated. response = self.kms_client.generate_data_key( KeyId=self.kms_key_id, KeySpec="AES_256", EncryptionContext=encryption_context ) @@ -62,6 +95,7 @@ def on_encrypt(self, enc_materials): enc_materials.plaintext_data_key = response["Plaintext"] return enc_materials except Exception: + # If KMS call fails, propagate the exception raise def on_decrypt(self, dec_materials, encrypted_data_keys=None): @@ -77,6 +111,9 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): DecryptionMaterials: The updated dec_materials with the plaintext data key """ try: + ##= specification/s3-encryption/materials/s3-keyring.md#ondecrypt + ##= type=implication + ##% The OnDecrypt operation is responsible for ensuring that the DecryptionMaterials contain valid plaintext and encrypted data keys. # Call parent class validation dec_materials = super().on_decrypt(dec_materials, encrypted_data_keys) @@ -93,24 +130,40 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): for edk in edks: try: edk_bytes = edk.encrypted_data_key + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##= type=implementation + ##% The KmsKeyring MUST support decryption using Kms+Context mode. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey + ##= type=implementation + ##% The KmsKeyring MUST determine whether to decrypt using KmsV1 mode or Kms+Context mode. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey + ##= type=implementation + ##% If the Key Provider Info of the Encrypted Data Key is "kms+context", the KmsKeyring MUST attempt to decrypt using Kms+Context mode. if edk.key_provider_info == "kms+context": encryption_context_from_request = ( dec_materials.encryption_context_from_request ) encryption_context_stored = dec_materials.encryption_context_stored - # Default EC MUST NOT be passed in via request + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implementation + ##% When decrypting using Kms+Context mode, the KmsKeyring MUST validate the provided (request) encryption context with the stored (materials) encryption context. if KMS_CONTEXT_DEFAULT_KEY in encryption_context_from_request: raise S3EncryptionClientError( f"{KMS_CONTEXT_DEFAULT_KEY} is a reserved key for the " f"S3 encryption client" ) - # The stored EC, minus default key/values, MUST match provided EC + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implementation + ##% The stored encryption context with the two reserved keys removed MUST match the provided encryption context. encryption_context_stored_copy = encryption_context_stored.copy() encryption_context_stored_copy.pop(KMS_V1_DEFAULT_KEY, None) encryption_context_stored_copy.pop(KMS_CONTEXT_DEFAULT_KEY, None) + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implementation + ##% If the stored encryption context with the two reserved keys removed does not match the provided encryption context, the KmsKeyring MUST throw an exception. if encryption_context_stored_copy != encryption_context_from_request: # TODO: modeled error raise S3EncryptionClientError( @@ -118,8 +171,13 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): "retrieved from S3" ) - # Update decMaterials with the modified encryption context + ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey + ##= type=implication + ##% If the Key Provider Info of the Encrypted Data Key is "kms", the KmsKeyring MUST attempt to decrypt using KmsV1 mode. elif edk.key_provider_info == "kms": + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##= type=implementation + ##% The KmsKeyring MUST support decryption using KmsV1 mode. if not self.enable_legacy_wrapping_algorithms: raise S3EncryptionClientError( f"Enable legacy wrapping algorithms to use legacy key wrapping " @@ -130,14 +188,42 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): f"{edk.key_provider_info} is not a valid key wrapping algorithm!" ) + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=implementation + ##% To attempt to decrypt a particular [encrypted data key](../structures.md#encrypted-data-key), the KmsKeyring MUST call [AWS KMS Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) with the configured AWS KMS client. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=implementation + ##% - `KeyId` MUST be the configured AWS KMS key identifier. + ##% - `CiphertextBlob` MUST be the [encrypted data key ciphertext](../structures.md#ciphertext). + ##% - `EncryptionContext` MUST be the [encryption context](../structures.md#encryption-context) included in the input [decryption materials](../structures.md#decryption-materials). + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implementation + ##% To attempt to decrypt a particular [encrypted data key](../structures.md#encrypted-data-key), the KmsKeyring MUST call [AWS KMS Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) with the configured AWS KMS client. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implication + ##% - `KeyId` MUST be the configured AWS KMS key identifier. + ##% - `CiphertextBlob` MUST be the [encrypted data key ciphertext](../structures.md#ciphertext). + ##% - `EncryptionContext` MUST be the [encryption context](../structures.md#encryption-context) included in the input [decryption materials](../structures.md#decryption-materials). response = self.kms_client.decrypt( KeyId=self.kms_key_id, CiphertextBlob=edk_bytes, EncryptionContext=dec_materials.encryption_context_stored, ) + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implication + ##% The KmsKeyring MUST immediately return the plaintext as a collection of bytes. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=implication + ##% The KmsKeyring MUST immediately return the plaintext as a collection of bytes. dec_materials.plaintext_data_key = response["Plaintext"] return dec_materials except Exception as e: + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=implementation + ##% If the KmsKeyring fails to successfully decrypt the [encrypted data key](../structures.md#encrypted-data-key), then it MUST throw an exception. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implementation + ##% If the KmsKeyring fails to successfully decrypt the [encrypted data key](../structures.md#encrypted-data-key), then it MUST throw an exception. last_exception = e continue diff --git a/test/test_kms_keyring.py b/test/test_kms_keyring.py new file mode 100644 index 00000000..affdcb98 --- /dev/null +++ b/test/test_kms_keyring.py @@ -0,0 +1,457 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for KMS keyring implementation.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from src.s3_encryption.exceptions import S3EncryptionClientError +from src.s3_encryption.materials.encrypted_data_key import EncryptedDataKey +from src.s3_encryption.materials.kms_keyring import KmsKeyring +from src.s3_encryption.materials.materials import DecryptionMaterials, EncryptionMaterials + + +class TestKmsKeyringInitialization: + """Tests for KMS keyring initialization.""" + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#initialization + ##= type=test + ##% On initialization, the caller MUST provide an AWS KMS key identifier. + def test_initialization_with_required_parameters(self): + """Test that KMS keyring can be initialized with required parameters.""" + mock_kms_client = MagicMock() + kms_key_id = "arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012" + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id=kms_key_id) + + assert keyring.kms_client == mock_kms_client + assert keyring.kms_key_id == kms_key_id + assert keyring.enable_legacy_wrapping_algorithms is False + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#initialization + ##= type=test + ##% On initialization, the caller MAY provide an AWS KMS SDK client instance. + def test_initialization_with_kms_client(self): + """Test that KMS keyring accepts a KMS client instance.""" + mock_kms_client = MagicMock() + kms_key_id = "test-key-id" + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id=kms_key_id) + + assert keyring.kms_client == mock_kms_client + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##= type=test + ##% The KmsV1 mode MUST be only enabled when legacy wrapping algorithms are enabled. + def test_initialization_with_legacy_wrapping_algorithms(self): + """Test that legacy wrapping algorithms can be enabled.""" + mock_kms_client = MagicMock() + kms_key_id = "test-key-id" + + keyring = KmsKeyring( + kms_client=mock_kms_client, + kms_key_id=kms_key_id, + enable_legacy_wrapping_algorithms=True, + ) + + assert keyring.enable_legacy_wrapping_algorithms is True + + +class TestKmsKeyringOnEncrypt: + """Tests for KMS keyring encryption operations.""" + + def test_on_encrypt_returns_encryption_materials(self): + """Test that on_encrypt returns EncryptionMaterials.""" + mock_kms_client = MagicMock() + mock_kms_client.generate_data_key.return_value = { + "CiphertextBlob": b"encrypted-key", + "Plaintext": b"plaintext-key", + } + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + enc_materials = EncryptionMaterials(encryption_context={"key": "value"}) + + result = keyring.on_encrypt(enc_materials) + + assert isinstance(result, EncryptionMaterials) + + def test_on_encrypt_calls_kms_generate_data_key(self): + """Test that on_encrypt calls KMS generate_data_key.""" + mock_kms_client = MagicMock() + mock_kms_client.generate_data_key.return_value = { + "CiphertextBlob": b"encrypted-key", + "Plaintext": b"plaintext-key", + } + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + enc_materials = EncryptionMaterials(encryption_context={"key": "value"}) + + keyring.on_encrypt(enc_materials) + + mock_kms_client.generate_data_key.assert_called_once() + + def test_on_encrypt_uses_correct_kms_parameters(self): + """Test that on_encrypt uses correct KMS parameters.""" + mock_kms_client = MagicMock() + mock_kms_client.generate_data_key.return_value = { + "CiphertextBlob": b"encrypted-key", + "Plaintext": b"plaintext-key", + } + + kms_key_id = "test-key-id" + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id=kms_key_id) + encryption_context = {"key": "value"} + enc_materials = EncryptionMaterials(encryption_context=encryption_context) + + keyring.on_encrypt(enc_materials) + + call_args = mock_kms_client.generate_data_key.call_args + assert call_args.kwargs["KeyId"] == kms_key_id + assert "aws:x-amz-cek-alg" in call_args.kwargs["EncryptionContext"] + assert call_args.kwargs["EncryptionContext"]["key"] == "value" + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##= type=test + ##% The KmsKeyring MUST support encryption using Kms+Context mode. + def test_on_encrypt_adds_kms_context_algorithm(self): + """Test that on_encrypt adds the Kms+Context algorithm to encryption context.""" + mock_kms_client = MagicMock() + mock_kms_client.generate_data_key.return_value = { + "CiphertextBlob": b"encrypted-key", + "Plaintext": b"plaintext-key", + } + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + enc_materials = EncryptionMaterials(encryption_context={}) + + result = keyring.on_encrypt(enc_materials) + + call_args = mock_kms_client.generate_data_key.call_args + assert call_args.kwargs["EncryptionContext"]["aws:x-amz-cek-alg"] == "AES/GCM/NoPadding" + + def test_on_encrypt_sets_encrypted_data_key(self): + """Test that on_encrypt sets the encrypted data key from KMS response.""" + mock_kms_client = MagicMock() + ciphertext_blob = b"encrypted-key-from-kms" + plaintext = b"plaintext-key-from-kms" + mock_kms_client.generate_data_key.return_value = { + "CiphertextBlob": ciphertext_blob, + "Plaintext": plaintext, + } + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + enc_materials = EncryptionMaterials(encryption_context={}) + + result = keyring.on_encrypt(enc_materials) + + assert result.encrypted_data_key is not None + assert result.encrypted_data_key.encrypted_data_key == ciphertext_blob + assert result.encrypted_data_key.key_provider_info == "kms+context" + assert result.plaintext_data_key == plaintext + + def test_on_encrypt_fails_when_kms_fails(self): + """Test that on_encrypt fails when KMS call fails.""" + mock_kms_client = MagicMock() + mock_kms_client.generate_data_key.side_effect = Exception("KMS error") + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + enc_materials = EncryptionMaterials(encryption_context={}) + + with pytest.raises(Exception): + keyring.on_encrypt(enc_materials) + + +class TestKmsKeyringOnDecrypt: + """Tests for KMS keyring decryption operations.""" + + def test_on_decrypt_returns_decryption_materials(self): + """Test that on_decrypt returns DecryptionMaterials.""" + mock_kms_client = MagicMock() + mock_kms_client.decrypt.return_value = {"Plaintext": b"plaintext-key"} + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + edk = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms+context", + encrypted_data_key=b"encrypted-key", + ) + dec_materials = DecryptionMaterials( + iv=b"initialization-vector", + encrypted_data_keys=[edk], + encryption_context_stored={"aws:x-amz-cek-alg": "AES/GCM/NoPadding"}, + encryption_context_from_request={}, + ) + + result = keyring.on_decrypt(dec_materials) + + assert isinstance(result, DecryptionMaterials) + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey + ##= type=test + ##% The KmsKeyring MUST determine whether to decrypt using KmsV1 mode or Kms+Context mode. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##= type=test + ##% The KmsKeyring MUST support decryption using Kms+Context mode. + ##% The Kms+Context mode MUST be enabled as a fully-supported (non-legacy) wrapping algorithm. + def test_on_decrypt_with_kms_context_mode(self): + """Test that on_decrypt handles kms+context mode.""" + mock_kms_client = MagicMock() + mock_kms_client.decrypt.return_value = {"Plaintext": b"plaintext-key"} + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + edk = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms+context", + encrypted_data_key=b"encrypted-key", + ) + dec_materials = DecryptionMaterials( + iv=b"initialization-vector", + encrypted_data_keys=[edk], + encryption_context_stored={"aws:x-amz-cek-alg": "AES/GCM/NoPadding"}, + encryption_context_from_request={}, + ) + + result = keyring.on_decrypt(dec_materials) + + assert result.plaintext_data_key == b"plaintext-key" + mock_kms_client.decrypt.assert_called_once() + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=test + ##% When decrypting using Kms+Context mode, the KmsKeyring MUST validate the provided (request) encryption context with the stored (materials) encryption context. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey + ##= type=test + ##% If the Key Provider Info of the Encrypted Data Key is "kms+context", the KmsKeyring MUST attempt to decrypt using Kms+Context mode. + def test_on_decrypt_validates_encryption_context(self): + """Test that on_decrypt validates encryption context.""" + mock_kms_client = MagicMock() + mock_kms_client.decrypt.return_value = {"Plaintext": b"plaintext-key"} + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + edk = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms+context", + encrypted_data_key=b"encrypted-key", + ) + dec_materials = DecryptionMaterials( + iv=b"initialization-vector", + encrypted_data_keys=[edk], + encryption_context_stored={ + "aws:x-amz-cek-alg": "AES/GCM/NoPadding", + "custom-key": "custom-value", + }, + encryption_context_from_request={"custom-key": "custom-value"}, + ) + + result = keyring.on_decrypt(dec_materials) + + assert result.plaintext_data_key == b"plaintext-key" + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=test + ##% The stored encryption context with the two reserved keys removed MUST match the provided encryption context. + ##% If the stored encryption context with the two reserved keys removed does not match the provided encryption context, the KmsKeyring MUST throw an exception. + def test_on_decrypt_fails_with_mismatched_encryption_context(self): + """Test that on_decrypt fails when encryption contexts don't match.""" + mock_kms_client = MagicMock() + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + edk = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms+context", + encrypted_data_key=b"encrypted-key", + ) + dec_materials = DecryptionMaterials( + iv=b"initialization-vector", + encrypted_data_keys=[edk], + encryption_context_stored={ + "aws:x-amz-cek-alg": "AES/GCM/NoPadding", + "custom-key": "stored-value", + }, + encryption_context_from_request={"custom-key": "different-value"}, + ) + + with pytest.raises(S3EncryptionClientError) as exc_info: + keyring.on_decrypt(dec_materials) + + assert "does not match" in str(exc_info.value) + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=test + ##% When decrypting using Kms+Context mode, the KmsKeyring MUST validate the provided (request) encryption context with the stored (materials) encryption context. + def test_on_decrypt_rejects_reserved_key_in_request_context(self): + """Test that on_decrypt rejects reserved keys in request encryption context.""" + mock_kms_client = MagicMock() + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + edk = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms+context", + encrypted_data_key=b"encrypted-key", + ) + dec_materials = DecryptionMaterials( + iv=b"initialization-vector", + encrypted_data_keys=[edk], + encryption_context_stored={"aws:x-amz-cek-alg": "AES/GCM/NoPadding"}, + encryption_context_from_request={"aws:x-amz-cek-alg": "AES/GCM/NoPadding"}, + ) + + with pytest.raises(S3EncryptionClientError) as exc_info: + keyring.on_decrypt(dec_materials) + + assert "reserved key" in str(exc_info.value) + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey + ##= type=test + ##% If the Key Provider Info of the Encrypted Data Key is "kms", the KmsKeyring MUST attempt to decrypt using KmsV1 mode. + def test_on_decrypt_with_kms_v1_mode(self): + """Test that on_decrypt handles KmsV1 mode when legacy algorithms are enabled.""" + mock_kms_client = MagicMock() + mock_kms_client.decrypt.return_value = {"Plaintext": b"plaintext-key"} + + kms_key_id = "test-key-id" + keyring = KmsKeyring( + kms_client=mock_kms_client, + kms_key_id=kms_key_id, + enable_legacy_wrapping_algorithms=True, + ) + edk = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms", + encrypted_data_key=b"encrypted-key", + ) + dec_materials = DecryptionMaterials( + iv=b"initialization-vector", + encrypted_data_keys=[edk], + encryption_context_stored={}, + encryption_context_from_request={}, + ) + + result = keyring.on_decrypt(dec_materials) + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=test + ##% To attempt to decrypt a particular [encrypted data key](../structures.md#encrypted-data-key), the KmsKeyring MUST call [AWS KMS Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) with the configured AWS KMS client. + call_args = mock_kms_client.decrypt.call_args + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=test + ##% - `KeyId` MUST be the configured AWS KMS key identifier. + assert call_args.kwargs["KeyId"] == kms_key_id + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=test + ##% - `CiphertextBlob` MUST be the [encrypted data key ciphertext](../structures.md#ciphertext). + assert call_args.kwargs["CiphertextBlob"] == encrypted_key + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=test + ##% - `EncryptionContext` MUST be the [encryption context](../structures.md#encryption-context) included in the input [decryption materials](../structures.md#decryption-materials). + assert call_args.kwargs["EncryptionContext"] == encryption_context_stored + assert result.plaintext_data_key == b"plaintext-key" + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##= type=test + ##% The KmsKeyring MUST support decryption using KmsV1 mode. + def test_on_decrypt_rejects_kms_v1_when_legacy_disabled(self): + """Test that on_decrypt rejects KmsV1 mode when legacy algorithms are disabled.""" + mock_kms_client = MagicMock() + + keyring = KmsKeyring( + kms_client=mock_kms_client, + kms_key_id="test-key-id", + enable_legacy_wrapping_algorithms=False, + ) + edk = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms", + encrypted_data_key=b"encrypted-key", + ) + dec_materials = DecryptionMaterials( + iv=b"initialization-vector", + encrypted_data_keys=[edk], + encryption_context_stored={}, + encryption_context_from_request={}, + ) + + with pytest.raises(S3EncryptionClientError) as exc_info: + keyring.on_decrypt(dec_materials) + + assert "legacy wrapping algorithms" in str(exc_info.value) + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=test + ##% To attempt to decrypt a particular [encrypted data key](../structures.md#encrypted-data-key), the KmsKeyring MUST call [AWS KMS Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) with the configured AWS KMS client. + def test_on_decrypt_uses_correct_kms_parameters(self): + """Test that on_decrypt uses correct KMS parameters.""" + mock_kms_client = MagicMock() + mock_kms_client.decrypt.return_value = {"Plaintext": b"plaintext-key"} + + kms_key_id = "test-key-id" + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id=kms_key_id) + encrypted_key = b"encrypted-key-bytes" + edk = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms+context", + encrypted_data_key=encrypted_key, + ) + encryption_context_stored = {"aws:x-amz-cek-alg": "AES/GCM/NoPadding"} + dec_materials = DecryptionMaterials( + iv=b"initialization-vector", + encrypted_data_keys=[edk], + encryption_context_stored=encryption_context_stored, + encryption_context_from_request={}, + ) + + keyring.on_decrypt(dec_materials) + + call_args = mock_kms_client.decrypt.call_args + assert call_args.kwargs["KeyId"] == kms_key_id + assert call_args.kwargs["CiphertextBlob"] == encrypted_key + assert call_args.kwargs["EncryptionContext"] == encryption_context_stored + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=test + ##% If the KmsKeyring fails to successfully decrypt the [encrypted data key](../structures.md#encrypted-data-key), then it MUST throw an exception. + def test_on_decrypt_fails_when_kms_fails(self): + """Test that on_decrypt fails when KMS call fails.""" + mock_kms_client = MagicMock() + mock_kms_client.decrypt.side_effect = Exception("KMS decrypt error") + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + edk = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms", + encrypted_data_key=b"encrypted-key", + ) + dec_materials = DecryptionMaterials( + iv=b"initialization-vector", + encrypted_data_keys=[edk], + encryption_context_stored={}, + encryption_context_from_request={}, + ) + + with pytest.raises(Exception): + keyring.on_decrypt(dec_materials) + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=test + ##% If the KmsKeyring fails to successfully decrypt the [encrypted data key](../structures.md#encrypted-data-key), then it MUST throw an exception. + def test_on_decrypt_fails_when_kms_fails(self): + """Test that on_decrypt fails when KMS call fails.""" + mock_kms_client = MagicMock() + mock_kms_client.decrypt.side_effect = Exception("KMS decrypt error") + + keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + edk = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms+context", + encrypted_data_key=b"encrypted-key", + ) + dec_materials = DecryptionMaterials( + iv=b"initialization-vector", + encrypted_data_keys=[edk], + encryption_context_stored={"aws:x-amz-cek-alg": "AES/GCM/NoPadding"}, + encryption_context_from_request={}, + ) + + with pytest.raises(Exception): + keyring.on_decrypt(dec_materials) From b8dbb7c2e1f5d97f97e9382dc4b908da7b3b3bbf Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Mon, 23 Feb 2026 15:40:19 -0800 Subject: [PATCH 3/9] first pass format --- src/s3_encryption/materials/keyring.py | 13 ++++++++----- src/s3_encryption/materials/kms_keyring.py | 7 +++---- test/test_kms_keyring.py | 4 ++-- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/s3_encryption/materials/keyring.py b/src/s3_encryption/materials/keyring.py index 922b9a26..239438b8 100644 --- a/src/s3_encryption/materials/keyring.py +++ b/src/s3_encryption/materials/keyring.py @@ -14,7 +14,6 @@ from .materials import DecryptionMaterials, EncryptionMaterials - ##= specification/s3-encryption/materials/keyrings.md#interface ##= type=implication ##% The Keyring interface and its operations SHOULD adhere to the naming conventions of the implementation language. @@ -35,7 +34,7 @@ class AbstractKeyring(abc.ABC): ##% The OnEncrypt operation MUST accept an instance of EncryptionMaterials as input. ##% The OnEncrypt operation MUST return an instance of EncryptionMaterials as output. @abc.abstractmethod - def on_encrypt(self, enc_materials) -> 'EncryptionMaterials': + def on_encrypt(self, enc_materials) -> "EncryptionMaterials": """Process encryption materials. Args: @@ -52,7 +51,7 @@ def on_encrypt(self, enc_materials) -> 'EncryptionMaterials': ##% The OnDecrypt operation MUST accept an instance of DecryptionMaterials and a collection of EncryptedDataKey instances as input. ##% The OnDecrypt operation MUST return an instance of DecryptionMaterials as output. @abc.abstractmethod - def on_decrypt(self, dec_materials, encrypted_data_keys=None) -> 'DecryptionMaterials': + def on_decrypt(self, dec_materials, encrypted_data_keys=None) -> "DecryptionMaterials": """Decrypt one of the encrypted data keys and update dec_materials. Args: @@ -141,13 +140,17 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): ##= type=implication ##% If the input DecryptionMaterials contains a Plaintext Data Key, the S3 Keyring MUST throw an exception. if dec_materials.plaintext_data_key is not None: - raise S3EncryptionClientError("Decryption materials already contains a plaintext data key.") + raise S3EncryptionClientError( + "Decryption materials already contains a plaintext data key." + ) ##= specification/s3-encryption/materials/s3-keyring.md#ondecrypt ##= type=implication ##% If the input collection of EncryptedDataKey instances contains any number of EDKs other than 1, the S3 Keyring MUST throw an exception. if len(edks) != 1: - raise S3EncryptionClientError(f"Only one encrypted data key is supported, found: {len(edks)}") + raise S3EncryptionClientError( + f"Only one encrypted data key is supported, found: {len(edks)}" + ) # Ensure encryption contexts are dictionaries if not isinstance(dec_materials.encryption_context_from_request, dict): diff --git a/src/s3_encryption/materials/kms_keyring.py b/src/s3_encryption/materials/kms_keyring.py index 67efcc1b..4a862f05 100644 --- a/src/s3_encryption/materials/kms_keyring.py +++ b/src/s3_encryption/materials/kms_keyring.py @@ -39,18 +39,17 @@ class KmsKeyring(S3Keyring): ##= type=implication ##% If the caller does not provide an AWS KMS SDK client instance or provides a null value, the KmsKeyring MUST create a default KMS client instance. kms_client: client.BaseClient = field() - + ##= specification/s3-encryption/materials/s3-kms-keyring.md#initialization ##= type=implementation ##% On initialization, the caller MUST provide an AWS KMS key identifier. kms_key_id: str = field() - + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes ##= type=implementation ##% The KmsV1 mode MUST be only enabled when legacy wrapping algorithms are enabled. enable_legacy_wrapping_algorithms: bool = field(default=False) - - + def on_encrypt(self, enc_materials): """Process encryption materials using KMS. diff --git a/test/test_kms_keyring.py b/test/test_kms_keyring.py index affdcb98..dac0b1a7 100644 --- a/test/test_kms_keyring.py +++ b/test/test_kms_keyring.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 """Tests for KMS keyring implementation.""" -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import pytest @@ -411,7 +411,7 @@ def test_on_decrypt_uses_correct_kms_parameters(self): ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 ##= type=test ##% If the KmsKeyring fails to successfully decrypt the [encrypted data key](../structures.md#encrypted-data-key), then it MUST throw an exception. - def test_on_decrypt_fails_when_kms_fails(self): + def test_on_decrypt_fails_when_kms_v1_fails(self): """Test that on_decrypt fails when KMS call fails.""" mock_kms_client = MagicMock() mock_kms_client.decrypt.side_effect = Exception("KMS decrypt error") From 5f94e2a7ddb3e258b406cc1c2565f196fb8662c5 Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Mon, 23 Feb 2026 15:57:30 -0800 Subject: [PATCH 4/9] wrap Duvet lines --- src/s3_encryption/materials/keyring.py | 15 +- src/s3_encryption/materials/kms_keyring.py | 252 +++++++++++---------- 2 files changed, 147 insertions(+), 120 deletions(-) diff --git a/src/s3_encryption/materials/keyring.py b/src/s3_encryption/materials/keyring.py index 239438b8..d0ecd96b 100644 --- a/src/s3_encryption/materials/keyring.py +++ b/src/s3_encryption/materials/keyring.py @@ -16,7 +16,8 @@ ##= specification/s3-encryption/materials/keyrings.md#interface ##= type=implication -##% The Keyring interface and its operations SHOULD adhere to the naming conventions of the implementation language. +##% The Keyring interface and its operations SHOULD adhere to the naming conventions of the +##% implementation language. ##= specification/s3-encryption/materials/keyrings.md#supported-keyrings ##= type=implication ##% Note: A user MAY create their own custom keyring(s). @@ -48,7 +49,8 @@ def on_encrypt(self, enc_materials) -> "EncryptionMaterials": ##= specification/s3-encryption/materials/keyrings.md#interface ##= type=implication ##% The Keyring interface MUST include the OnDecrypt operation. - ##% The OnDecrypt operation MUST accept an instance of DecryptionMaterials and a collection of EncryptedDataKey instances as input. + ##% The OnDecrypt operation MUST accept an instance of DecryptionMaterials and a collection + ##% of EncryptedDataKey instances as input. ##% The OnDecrypt operation MUST return an instance of DecryptionMaterials as output. @abc.abstractmethod def on_decrypt(self, dec_materials, encrypted_data_keys=None) -> "DecryptionMaterials": @@ -68,7 +70,8 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None) -> "DecryptionMate ##= specification/s3-encryption/materials/s3-keyring.md#overview ##= type=implication -##% The S3EC SHOULD implement an S3 Keyring to consolidate validation and other functionality common to all S3 Keyrings. +##% The S3EC SHOULD implement an S3 Keyring to consolidate validation and other functionality +##% common to all S3 Keyrings. ##% If implemented, the S3 Keyring MUST implement the Keyring interface. @define class S3Keyring(AbstractKeyring): @@ -138,7 +141,8 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): ##= specification/s3-encryption/materials/s3-keyring.md#ondecrypt ##= type=implication - ##% If the input DecryptionMaterials contains a Plaintext Data Key, the S3 Keyring MUST throw an exception. + ##% If the input DecryptionMaterials contains a Plaintext Data Key, the S3 Keyring MUST + ##% throw an exception. if dec_materials.plaintext_data_key is not None: raise S3EncryptionClientError( "Decryption materials already contains a plaintext data key." @@ -146,7 +150,8 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): ##= specification/s3-encryption/materials/s3-keyring.md#ondecrypt ##= type=implication - ##% If the input collection of EncryptedDataKey instances contains any number of EDKs other than 1, the S3 Keyring MUST throw an exception. + ##% If the input collection of EncryptedDataKey instances contains any number of EDKs + ##% other than 1, the S3 Keyring MUST throw an exception. if len(edks) != 1: raise S3EncryptionClientError( f"Only one encrypted data key is supported, found: {len(edks)}" diff --git a/src/s3_encryption/materials/kms_keyring.py b/src/s3_encryption/materials/kms_keyring.py index 4a862f05..acd1c8c4 100644 --- a/src/s3_encryption/materials/kms_keyring.py +++ b/src/s3_encryption/materials/kms_keyring.py @@ -19,7 +19,8 @@ ##= specification/s3-encryption/materials/s3-kms-keyring.md#interface ##= type=implication -##% The KmsKeyring MUST implement the [Keyring interface](keyrings.md#interface) and include the behavior described in the [S3 Keyring](s3-keyring.md). +##% The KmsKeyring MUST implement the [Keyring interface](keyrings.md#interface) and include +##% the behavior described in the [S3 Keyring](s3-keyring.md). @define class KmsKeyring(S3Keyring): """KMS implementation of the S3 keyring. @@ -37,7 +38,8 @@ class KmsKeyring(S3Keyring): ##% On initialization, the caller MAY provide an AWS KMS SDK client instance. ##= specification/s3-encryption/materials/s3-kms-keyring.md#initialization ##= type=implication - ##% If the caller does not provide an AWS KMS SDK client instance or provides a null value, the KmsKeyring MUST create a default KMS client instance. + ##% If the caller does not provide an AWS KMS SDK client instance or provides a null value, + ##% the KmsKeyring MUST create a default KMS client instance. kms_client: client.BaseClient = field() ##= specification/s3-encryption/materials/s3-kms-keyring.md#initialization @@ -64,13 +66,17 @@ def on_encrypt(self, enc_materials): encryption_context = enc_materials.encryption_context - ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##= specification/s3-encryption/materials/s3-kms-keyring.md# + ##= supported-wrapping-algorithm-modes ##= type=implementation ##% The KmsKeyring MUST support encryption using Kms+Context mode. - ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##= specification/s3-encryption/materials/s3-kms-keyring.md# + ##= supported-wrapping-algorithm-modes ##= type=implementation - ##% The Kms+Context mode MUST be enabled as a fully-supported (non-legacy) wrapping algorithm. - ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes + ##% The Kms+Context mode MUST be enabled as a fully-supported (non-legacy) wrapping + ##% algorithm. + ##= specification/s3-encryption/materials/s3-kms-keyring.md# + ##= supported-wrapping-algorithm-modes ##= type=implication ##% The KmsKeyring MUST NOT support encryption using KmsV1 mode. encryption_context["aws:x-amz-cek-alg"] = "AES/GCM/NoPadding" @@ -78,8 +84,10 @@ def on_encrypt(self, enc_materials): ##= specification/s3-encryption/materials/s3-kms-keyring.md#encryptdatakey ##= type=exception ##% The KmsKeyring MUST implement the EncryptDataKey method. - ##% The keyring MUST call [AWS KMS Encrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Encrypt.html) using the configured AWS KMS client. - # Python implementation uses KMS GenerateDataKey instead of the spec's EncryptDataKey pattern + ##% The keyring MUST call [AWS KMS Encrypt](https://docs.aws.amazon.com/kms/latest/ + ##% APIReference/API_Encrypt.html) using the configured AWS KMS client. + # Python implementation uses KMS GenerateDataKey instead of the spec's + # EncryptDataKey pattern # The spec is wrong and needs to be updated. response = self.kms_client.generate_data_key( KeyId=self.kms_key_id, KeySpec="AES_256", EncryptionContext=encryption_context @@ -112,7 +120,8 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): try: ##= specification/s3-encryption/materials/s3-keyring.md#ondecrypt ##= type=implication - ##% The OnDecrypt operation is responsible for ensuring that the DecryptionMaterials contain valid plaintext and encrypted data keys. + ##% The OnDecrypt operation is responsible for ensuring that the DecryptionMaterials + ##% contain valid plaintext and encrypted data keys. # Call parent class validation dec_materials = super().on_decrypt(dec_materials, encrypted_data_keys) @@ -123,112 +132,125 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): else dec_materials.encrypted_data_keys ) - # Try to decrypt each EDK until one succeeds - # TODO: probably just enforce |EDKs| == 1 and remove loop - last_exception = None - for edk in edks: - try: - edk_bytes = edk.encrypted_data_key - ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes - ##= type=implementation - ##% The KmsKeyring MUST support decryption using Kms+Context mode. - ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey - ##= type=implementation - ##% The KmsKeyring MUST determine whether to decrypt using KmsV1 mode or Kms+Context mode. - ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey - ##= type=implementation - ##% If the Key Provider Info of the Encrypted Data Key is "kms+context", the KmsKeyring MUST attempt to decrypt using Kms+Context mode. - if edk.key_provider_info == "kms+context": - encryption_context_from_request = ( - dec_materials.encryption_context_from_request - ) - encryption_context_stored = dec_materials.encryption_context_stored - - ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context - ##= type=implementation - ##% When decrypting using Kms+Context mode, the KmsKeyring MUST validate the provided (request) encryption context with the stored (materials) encryption context. - if KMS_CONTEXT_DEFAULT_KEY in encryption_context_from_request: - raise S3EncryptionClientError( - f"{KMS_CONTEXT_DEFAULT_KEY} is a reserved key for the " - f"S3 encryption client" - ) - - ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context - ##= type=implementation - ##% The stored encryption context with the two reserved keys removed MUST match the provided encryption context. - encryption_context_stored_copy = encryption_context_stored.copy() - encryption_context_stored_copy.pop(KMS_V1_DEFAULT_KEY, None) - encryption_context_stored_copy.pop(KMS_CONTEXT_DEFAULT_KEY, None) - - ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context - ##= type=implementation - ##% If the stored encryption context with the two reserved keys removed does not match the provided encryption context, the KmsKeyring MUST throw an exception. - if encryption_context_stored_copy != encryption_context_from_request: - # TODO: modeled error - raise S3EncryptionClientError( - "Provided encryption context does not match information " - "retrieved from S3" - ) - - ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey - ##= type=implication - ##% If the Key Provider Info of the Encrypted Data Key is "kms", the KmsKeyring MUST attempt to decrypt using KmsV1 mode. - elif edk.key_provider_info == "kms": - ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes - ##= type=implementation - ##% The KmsKeyring MUST support decryption using KmsV1 mode. - if not self.enable_legacy_wrapping_algorithms: - raise S3EncryptionClientError( - f"Enable legacy wrapping algorithms to use legacy key wrapping " - f"algorithm: {edk.key_provider_info}" - ) - else: - raise S3EncryptionClientError( - f"{edk.key_provider_info} is not a valid key wrapping algorithm!" - ) - - ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 - ##= type=implementation - ##% To attempt to decrypt a particular [encrypted data key](../structures.md#encrypted-data-key), the KmsKeyring MUST call [AWS KMS Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) with the configured AWS KMS client. - ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 - ##= type=implementation - ##% - `KeyId` MUST be the configured AWS KMS key identifier. - ##% - `CiphertextBlob` MUST be the [encrypted data key ciphertext](../structures.md#ciphertext). - ##% - `EncryptionContext` MUST be the [encryption context](../structures.md#encryption-context) included in the input [decryption materials](../structures.md#decryption-materials). - ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context - ##= type=implementation - ##% To attempt to decrypt a particular [encrypted data key](../structures.md#encrypted-data-key), the KmsKeyring MUST call [AWS KMS Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) with the configured AWS KMS client. - ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context - ##= type=implication - ##% - `KeyId` MUST be the configured AWS KMS key identifier. - ##% - `CiphertextBlob` MUST be the [encrypted data key ciphertext](../structures.md#ciphertext). - ##% - `EncryptionContext` MUST be the [encryption context](../structures.md#encryption-context) included in the input [decryption materials](../structures.md#decryption-materials). - response = self.kms_client.decrypt( - KeyId=self.kms_key_id, - CiphertextBlob=edk_bytes, - EncryptionContext=dec_materials.encryption_context_stored, + # The parent class validation ensures there is exactly one EDK + edk = edks[0] + edk_bytes = edk.encrypted_data_key + + ##= specification/s3-encryption/materials/s3-kms-keyring.md# + ##= supported-wrapping-algorithm-modes + ##= type=implementation + ##% The KmsKeyring MUST support decryption using Kms+Context mode. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey + ##= type=implementation + ##% The KmsKeyring MUST determine whether to decrypt using KmsV1 mode or + ##% Kms+Context mode. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey + ##= type=implementation + ##% If the Key Provider Info of the Encrypted Data Key is "kms+context", the + ##% KmsKeyring MUST attempt to decrypt using Kms+Context mode. + if edk.key_provider_info == "kms+context": + encryption_context_from_request = dec_materials.encryption_context_from_request + encryption_context_stored = dec_materials.encryption_context_stored + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implementation + ##% When decrypting using Kms+Context mode, the KmsKeyring MUST validate the + ##% provided (request) encryption context with the stored (materials) encryption + ##% context. + if KMS_CONTEXT_DEFAULT_KEY in encryption_context_from_request: + raise S3EncryptionClientError( + f"{KMS_CONTEXT_DEFAULT_KEY} is a reserved key for the " + f"S3 encryption client" + ) + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implementation + ##% The stored encryption context with the two reserved keys removed MUST match + ##% the provided encryption context. + encryption_context_stored_copy = encryption_context_stored.copy() + encryption_context_stored_copy.pop(KMS_V1_DEFAULT_KEY, None) + encryption_context_stored_copy.pop(KMS_CONTEXT_DEFAULT_KEY, None) + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implementation + ##% If the stored encryption context with the two reserved keys removed does not + ##% match the provided encryption context, the KmsKeyring MUST throw an exception. + if encryption_context_stored_copy != encryption_context_from_request: + # TODO: modeled error + raise S3EncryptionClientError( + "Provided encryption context does not match information " + "retrieved from S3" + ) + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey + ##= type=implication + ##% If the Key Provider Info of the Encrypted Data Key is "kms", the KmsKeyring + ##% MUST attempt to decrypt using KmsV1 mode. + elif edk.key_provider_info == "kms": + ##= specification/s3-encryption/materials/s3-kms-keyring.md# + ##= supported-wrapping-algorithm-modes + ##= type=implementation + ##% The KmsKeyring MUST support decryption using KmsV1 mode. + if not self.enable_legacy_wrapping_algorithms: + raise S3EncryptionClientError( + f"Enable legacy wrapping algorithms to use legacy key wrapping " + f"algorithm: {edk.key_provider_info}" ) - ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context - ##= type=implication - ##% The KmsKeyring MUST immediately return the plaintext as a collection of bytes. - ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 - ##= type=implication - ##% The KmsKeyring MUST immediately return the plaintext as a collection of bytes. - dec_materials.plaintext_data_key = response["Plaintext"] - return dec_materials - except Exception as e: - ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 - ##= type=implementation - ##% If the KmsKeyring fails to successfully decrypt the [encrypted data key](../structures.md#encrypted-data-key), then it MUST throw an exception. - ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context - ##= type=implementation - ##% If the KmsKeyring fails to successfully decrypt the [encrypted data key](../structures.md#encrypted-data-key), then it MUST throw an exception. - last_exception = e - continue - - # If we get here, none of the EDKs could be decrypted - if last_exception: - raise last_exception - raise S3EncryptionClientError("Failed to decrypt any of the encrypted data keys") + else: + raise S3EncryptionClientError( + f"{edk.key_provider_info} is not a valid key wrapping algorithm!" + ) + + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=implementation + ##% To attempt to decrypt a particular [encrypted data key](../structures.md# + ##% encrypted-data-key), the KmsKeyring MUST call [AWS KMS Decrypt](https:// + ##% docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) with the + ##% configured AWS KMS client. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=implementation + ##% - `KeyId` MUST be the configured AWS KMS key identifier. + ##% - `CiphertextBlob` MUST be the [encrypted data key ciphertext]( + ##% ../structures.md#ciphertext). + ##% - `EncryptionContext` MUST be the [encryption context](../structures.md# + ##% encryption-context) included in the input [decryption materials]( + ##% ../structures.md#decryption-materials). + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implementation + ##% To attempt to decrypt a particular [encrypted data key](../structures.md# + ##% encrypted-data-key), the KmsKeyring MUST call [AWS KMS Decrypt](https:// + ##% docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) with the + ##% configured AWS KMS client. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implication + ##% - `KeyId` MUST be the configured AWS KMS key identifier. + ##% - `CiphertextBlob` MUST be the [encrypted data key ciphertext]( + ##% ../structures.md#ciphertext). + ##% - `EncryptionContext` MUST be the [encryption context](../structures.md# + ##% encryption-context) included in the input [decryption materials]( + ##% ../structures.md#decryption-materials). + response = self.kms_client.decrypt( + KeyId=self.kms_key_id, + CiphertextBlob=edk_bytes, + EncryptionContext=dec_materials.encryption_context_stored, + ) + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implication + ##% The KmsKeyring MUST immediately return the plaintext as a collection of + ##% bytes. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=implication + ##% The KmsKeyring MUST immediately return the plaintext as a collection of + ##% bytes. + dec_materials.plaintext_data_key = response["Plaintext"] + return dec_materials except Exception: + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kmsv1 + ##= type=implementation + ##% If the KmsKeyring fails to successfully decrypt the [encrypted data key]( + ##% ../structures.md#encrypted-data-key), then it MUST throw an exception. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=implementation + ##% If the KmsKeyring fails to successfully decrypt the [encrypted data key]( + ##% ../structures.md#encrypted-data-key), then it MUST throw an exception. raise From f69331c3b43aa1e8ecf335e0f077d60257130095 Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Mon, 23 Feb 2026 16:05:24 -0800 Subject: [PATCH 5/9] fix tests --- test/test_decryption_materials_integration.py | 39 ++++++++++++++----- test/test_encryption_materials_integration.py | 18 +++++++-- test/test_kms_keyring.py | 8 +++- 3 files changed, 49 insertions(+), 16 deletions(-) diff --git a/test/test_decryption_materials_integration.py b/test/test_decryption_materials_integration.py index 35b7d9e8..6f167285 100644 --- a/test/test_decryption_materials_integration.py +++ b/test/test_decryption_materials_integration.py @@ -5,15 +5,24 @@ from src.s3_encryption.materials.crypto_materials_manager import DefaultCryptoMaterialsManager from src.s3_encryption.materials.encrypted_data_key import EncryptedDataKey -from src.s3_encryption.materials.keyring import S3Keyring +from src.s3_encryption.materials.kms_keyring import KmsKeyring from src.s3_encryption.materials.materials import DecryptionMaterials class TestDecryptionMaterialsIntegration: def test_keyring_on_decrypt(self): - """Test that S3Keyring.on_decrypt properly handles DecryptionMaterials.""" + """Test that KmsKeyring.on_decrypt properly handles DecryptionMaterials.""" + # Create a mock KMS client + mock_kms_client = MagicMock() + mock_kms_client.decrypt.return_value = { + "Plaintext": b"plaintext-data-key", + } + # Create a keyring - keyring = S3Keyring() + keyring = KmsKeyring( + kms_client=mock_kms_client, + kms_key_id="arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012", + ) # Create an encrypted data key edk = EncryptedDataKey( @@ -22,12 +31,13 @@ def test_keyring_on_decrypt(self): encrypted_data_key=b"encrypted-data-key", ) - # Create decryption materials + # Create decryption materials with matching encryption contexts + # The stored context includes the reserved key, the request context should match (minus reserved keys) materials = DecryptionMaterials( iv=b"initialization-vector", encrypted_data_keys=[edk], - encryption_context_stored={"key1": "value1"}, - encryption_context_from_request={"key2": "value2"}, + encryption_context_stored={"key1": "value1", "aws:x-amz-cek-alg": "AES/GCM/NoPadding"}, + encryption_context_from_request={"key1": "value1"}, ) # Call on_decrypt @@ -37,13 +47,22 @@ def test_keyring_on_decrypt(self): assert isinstance(result, DecryptionMaterials) assert result.iv == b"initialization-vector" assert result.encrypted_data_keys == [edk] - assert result.encryption_context_stored == {"key1": "value1"} - assert result.encryption_context_from_request == {"key2": "value2"} + assert result.encryption_context_stored == {"key1": "value1", "aws:x-amz-cek-alg": "AES/GCM/NoPadding"} + assert result.encryption_context_from_request == {"key1": "value1"} def test_keyring_on_decrypt_default_enc_ctx(self): - """Test that S3Keyring.on_decrypt properly handles DecryptionMaterials.""" + """Test that KmsKeyring.on_decrypt properly handles DecryptionMaterials.""" + # Create a mock KMS client + mock_kms_client = MagicMock() + mock_kms_client.decrypt.return_value = { + "Plaintext": b"plaintext-data-key", + } + # Create a keyring - keyring = S3Keyring() + keyring = KmsKeyring( + kms_client=mock_kms_client, + kms_key_id="arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012", + ) # Create an encrypted data key edk = EncryptedDataKey( diff --git a/test/test_encryption_materials_integration.py b/test/test_encryption_materials_integration.py index 989d17d8..f3ca83f9 100644 --- a/test/test_encryption_materials_integration.py +++ b/test/test_encryption_materials_integration.py @@ -5,15 +5,25 @@ from src.s3_encryption.materials.crypto_materials_manager import DefaultCryptoMaterialsManager from src.s3_encryption.materials.encrypted_data_key import EncryptedDataKey -from src.s3_encryption.materials.keyring import S3Keyring +from src.s3_encryption.materials.kms_keyring import KmsKeyring from src.s3_encryption.materials.materials import EncryptionMaterials class TestEncryptionMaterialsIntegration: def test_keyring_on_encrypt(self): - """Test that S3Keyring.on_encrypt properly handles EncryptionMaterials.""" + """Test that KmsKeyring.on_encrypt properly handles EncryptionMaterials.""" + # Create a mock KMS client + mock_kms_client = MagicMock() + mock_kms_client.generate_data_key.return_value = { + "CiphertextBlob": b"encrypted-data-key", + "Plaintext": b"plaintext-data-key", + } + # Create a keyring - keyring = S3Keyring() + keyring = KmsKeyring( + kms_client=mock_kms_client, + kms_key_id="arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012", + ) # Create encryption materials materials = EncryptionMaterials(encryption_context={"key1": "value1"}) @@ -23,7 +33,7 @@ def test_keyring_on_encrypt(self): # Verify the result is an EncryptionMaterials instance assert isinstance(result, EncryptionMaterials) - assert result.encryption_context == {"key1": "value1"} + assert result.encryption_context == {"key1": "value1", "aws:x-amz-cek-alg": "AES/GCM/NoPadding"} def test_cmm_get_encryption_materials_with_dict(self): """Test that DefaultCryptoMaterialsManager.get_encryption_materials properly handles dictionary input.""" diff --git a/test/test_kms_keyring.py b/test/test_kms_keyring.py index dac0b1a7..5b7e5de1 100644 --- a/test/test_kms_keyring.py +++ b/test/test_kms_keyring.py @@ -309,8 +309,12 @@ def test_on_decrypt_with_kms_v1_mode(self): """Test that on_decrypt handles KmsV1 mode when legacy algorithms are enabled.""" mock_kms_client = MagicMock() mock_kms_client.decrypt.return_value = {"Plaintext": b"plaintext-key"} + kms_key_id = "test-key-id" + encrypted_key = b"encrypted-key" + encryption_context_stored = {"foo": "bar"} + keyring = KmsKeyring( kms_client=mock_kms_client, kms_key_id=kms_key_id, @@ -319,12 +323,12 @@ def test_on_decrypt_with_kms_v1_mode(self): edk = EncryptedDataKey( key_provider_id=b"S3Keyring", key_provider_info="kms", - encrypted_data_key=b"encrypted-key", + encrypted_data_key=encrypted_key, ) dec_materials = DecryptionMaterials( iv=b"initialization-vector", encrypted_data_keys=[edk], - encryption_context_stored={}, + encryption_context_stored=encryption_context_stored, encryption_context_from_request={}, ) From d13a50d260a56a12a13f625d3df01b027c3ea1da Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Mon, 23 Feb 2026 16:06:58 -0800 Subject: [PATCH 6/9] test formatting --- test/test_decryption_materials_integration.py | 5 ++++- test/test_encryption_materials_integration.py | 5 ++++- test/test_kms_keyring.py | 1 - 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/test/test_decryption_materials_integration.py b/test/test_decryption_materials_integration.py index 6f167285..a4e45b4e 100644 --- a/test/test_decryption_materials_integration.py +++ b/test/test_decryption_materials_integration.py @@ -47,7 +47,10 @@ def test_keyring_on_decrypt(self): assert isinstance(result, DecryptionMaterials) assert result.iv == b"initialization-vector" assert result.encrypted_data_keys == [edk] - assert result.encryption_context_stored == {"key1": "value1", "aws:x-amz-cek-alg": "AES/GCM/NoPadding"} + assert result.encryption_context_stored == { + "key1": "value1", + "aws:x-amz-cek-alg": "AES/GCM/NoPadding", + } assert result.encryption_context_from_request == {"key1": "value1"} def test_keyring_on_decrypt_default_enc_ctx(self): diff --git a/test/test_encryption_materials_integration.py b/test/test_encryption_materials_integration.py index f3ca83f9..4b6bfefd 100644 --- a/test/test_encryption_materials_integration.py +++ b/test/test_encryption_materials_integration.py @@ -33,7 +33,10 @@ def test_keyring_on_encrypt(self): # Verify the result is an EncryptionMaterials instance assert isinstance(result, EncryptionMaterials) - assert result.encryption_context == {"key1": "value1", "aws:x-amz-cek-alg": "AES/GCM/NoPadding"} + assert result.encryption_context == { + "key1": "value1", + "aws:x-amz-cek-alg": "AES/GCM/NoPadding", + } def test_cmm_get_encryption_materials_with_dict(self): """Test that DefaultCryptoMaterialsManager.get_encryption_materials properly handles dictionary input.""" diff --git a/test/test_kms_keyring.py b/test/test_kms_keyring.py index 5b7e5de1..fdf62064 100644 --- a/test/test_kms_keyring.py +++ b/test/test_kms_keyring.py @@ -309,7 +309,6 @@ def test_on_decrypt_with_kms_v1_mode(self): """Test that on_decrypt handles KmsV1 mode when legacy algorithms are enabled.""" mock_kms_client = MagicMock() mock_kms_client.decrypt.return_value = {"Plaintext": b"plaintext-key"} - kms_key_id = "test-key-id" encrypted_key = b"encrypted-key" From 5783ad5430d6096fa2ffb9c3b9908ee6f9f9067c Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Mon, 23 Feb 2026 16:39:16 -0800 Subject: [PATCH 7/9] fix duvet --- pyproject.toml | 5 ++++- src/s3_encryption/materials/kms_keyring.py | 15 +++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b7489a03..5100857a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,10 @@ exclude = [".git", "__pycache__", "build", "dist"] [tool.ruff.lint] # Enable all rules by default, then configure specific rule settings below select = ["E", "F", "W", "I", "N", "D", "UP", "B", "A", "C4", "PT", "RET", "SIM", "ARG", "ERA"] -ignore = ["ARG002"] # Allow unused method arguments (e.g., **kwargs for API compatibility) +ignore = [ + "ARG002", # Allow unused method arguments (e.g., **kwargs for API compatibility) + "E501", # Line too long - Duvet annotations require long specification paths +] [tool.ruff.lint.pydocstyle] convention = "google" diff --git a/src/s3_encryption/materials/kms_keyring.py b/src/s3_encryption/materials/kms_keyring.py index acd1c8c4..6c0734ac 100644 --- a/src/s3_encryption/materials/kms_keyring.py +++ b/src/s3_encryption/materials/kms_keyring.py @@ -66,17 +66,14 @@ def on_encrypt(self, enc_materials): encryption_context = enc_materials.encryption_context - ##= specification/s3-encryption/materials/s3-kms-keyring.md# - ##= supported-wrapping-algorithm-modes + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes ##= type=implementation ##% The KmsKeyring MUST support encryption using Kms+Context mode. - ##= specification/s3-encryption/materials/s3-kms-keyring.md# - ##= supported-wrapping-algorithm-modes + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes ##= type=implementation ##% The Kms+Context mode MUST be enabled as a fully-supported (non-legacy) wrapping ##% algorithm. - ##= specification/s3-encryption/materials/s3-kms-keyring.md# - ##= supported-wrapping-algorithm-modes + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes ##= type=implication ##% The KmsKeyring MUST NOT support encryption using KmsV1 mode. encryption_context["aws:x-amz-cek-alg"] = "AES/GCM/NoPadding" @@ -136,8 +133,7 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): edk = edks[0] edk_bytes = edk.encrypted_data_key - ##= specification/s3-encryption/materials/s3-kms-keyring.md# - ##= supported-wrapping-algorithm-modes + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes ##= type=implementation ##% The KmsKeyring MUST support decryption using Kms+Context mode. ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey @@ -187,8 +183,7 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): ##% If the Key Provider Info of the Encrypted Data Key is "kms", the KmsKeyring ##% MUST attempt to decrypt using KmsV1 mode. elif edk.key_provider_info == "kms": - ##= specification/s3-encryption/materials/s3-kms-keyring.md# - ##= supported-wrapping-algorithm-modes + ##= specification/s3-encryption/materials/s3-kms-keyring.md#supported-wrapping-algorithm-modes ##= type=implementation ##% The KmsKeyring MUST support decryption using KmsV1 mode. if not self.enable_legacy_wrapping_algorithms: From 7af1be6df968e943efddd9756b42a9af1ed2b0b8 Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Tue, 3 Mar 2026 14:39:23 -0800 Subject: [PATCH 8/9] PR feedback --- src/s3_encryption/materials/kms_keyring.py | 5 ---- test/test_kms_keyring.py | 27 ++++++++++++++-------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/s3_encryption/materials/kms_keyring.py b/src/s3_encryption/materials/kms_keyring.py index 6c0734ac..a32493fc 100644 --- a/src/s3_encryption/materials/kms_keyring.py +++ b/src/s3_encryption/materials/kms_keyring.py @@ -78,11 +78,6 @@ def on_encrypt(self, enc_materials): ##% The KmsKeyring MUST NOT support encryption using KmsV1 mode. encryption_context["aws:x-amz-cek-alg"] = "AES/GCM/NoPadding" - ##= specification/s3-encryption/materials/s3-kms-keyring.md#encryptdatakey - ##= type=exception - ##% The KmsKeyring MUST implement the EncryptDataKey method. - ##% The keyring MUST call [AWS KMS Encrypt](https://docs.aws.amazon.com/kms/latest/ - ##% APIReference/API_Encrypt.html) using the configured AWS KMS client. # Python implementation uses KMS GenerateDataKey instead of the spec's # EncryptDataKey pattern # The spec is wrong and needs to be updated. diff --git a/test/test_kms_keyring.py b/test/test_kms_keyring.py index fdf62064..04fbfc4b 100644 --- a/test/test_kms_keyring.py +++ b/test/test_kms_keyring.py @@ -217,9 +217,6 @@ def test_on_decrypt_with_kms_context_mode(self): assert result.plaintext_data_key == b"plaintext-key" mock_kms_client.decrypt.assert_called_once() - ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context - ##= type=test - ##% When decrypting using Kms+Context mode, the KmsKeyring MUST validate the provided (request) encryption context with the stored (materials) encryption context. ##= specification/s3-encryption/materials/s3-kms-keyring.md#decryptdatakey ##= type=test ##% If the Key Provider Info of the Encrypted Data Key is "kms+context", the KmsKeyring MUST attempt to decrypt using Kms+Context mode. @@ -250,7 +247,7 @@ def test_on_decrypt_validates_encryption_context(self): ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context ##= type=test - ##% The stored encryption context with the two reserved keys removed MUST match the provided encryption context. + ##% When decrypting using Kms+Context mode, the KmsKeyring MUST validate the provided (request) encryption context with the stored (materials) encryption context. ##% If the stored encryption context with the two reserved keys removed does not match the provided encryption context, the KmsKeyring MUST throw an exception. def test_on_decrypt_fails_with_mismatched_encryption_context(self): """Test that on_decrypt fails when encryption contexts don't match.""" @@ -279,7 +276,7 @@ def test_on_decrypt_fails_with_mismatched_encryption_context(self): ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context ##= type=test - ##% When decrypting using Kms+Context mode, the KmsKeyring MUST validate the provided (request) encryption context with the stored (materials) encryption context. + ##% The stored encryption context with the two reserved keys removed MUST match the provided encryption context. def test_on_decrypt_rejects_reserved_key_in_request_context(self): """Test that on_decrypt rejects reserved keys in request encryption context.""" mock_kms_client = MagicMock() @@ -417,9 +414,14 @@ def test_on_decrypt_uses_correct_kms_parameters(self): def test_on_decrypt_fails_when_kms_v1_fails(self): """Test that on_decrypt fails when KMS call fails.""" mock_kms_client = MagicMock() - mock_kms_client.decrypt.side_effect = Exception("KMS decrypt error") + kms_exception = Exception("KMS decrypt error") + mock_kms_client.decrypt.side_effect = kms_exception - keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") + keyring = KmsKeyring( + kms_client=mock_kms_client, + kms_key_id="test-key-id", + enable_legacy_wrapping_algorithms=True, + ) edk = EncryptedDataKey( key_provider_id=b"S3Keyring", key_provider_info="kms", @@ -432,16 +434,19 @@ def test_on_decrypt_fails_when_kms_v1_fails(self): encryption_context_from_request={}, ) - with pytest.raises(Exception): + with pytest.raises(Exception, match="KMS decrypt error") as exc_info: keyring.on_decrypt(dec_materials) + assert exc_info.value is kms_exception + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context ##= type=test ##% If the KmsKeyring fails to successfully decrypt the [encrypted data key](../structures.md#encrypted-data-key), then it MUST throw an exception. def test_on_decrypt_fails_when_kms_fails(self): """Test that on_decrypt fails when KMS call fails.""" mock_kms_client = MagicMock() - mock_kms_client.decrypt.side_effect = Exception("KMS decrypt error") + kms_exception = Exception("KMS decrypt error") + mock_kms_client.decrypt.side_effect = kms_exception keyring = KmsKeyring(kms_client=mock_kms_client, kms_key_id="test-key-id") edk = EncryptedDataKey( @@ -456,5 +461,7 @@ def test_on_decrypt_fails_when_kms_fails(self): encryption_context_from_request={}, ) - with pytest.raises(Exception): + with pytest.raises(Exception, match="KMS decrypt error") as exc_info: keyring.on_decrypt(dec_materials) + + assert exc_info.value is kms_exception From 62638cc9d1a2988c1521709bd99047a039a25ff6 Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Tue, 3 Mar 2026 14:53:31 -0800 Subject: [PATCH 9/9] fix duvet --- test/test_kms_keyring.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/test_kms_keyring.py b/test/test_kms_keyring.py index 04fbfc4b..d613cbf9 100644 --- a/test/test_kms_keyring.py +++ b/test/test_kms_keyring.py @@ -248,6 +248,8 @@ def test_on_decrypt_validates_encryption_context(self): ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context ##= type=test ##% When decrypting using Kms+Context mode, the KmsKeyring MUST validate the provided (request) encryption context with the stored (materials) encryption context. + ##= specification/s3-encryption/materials/s3-kms-keyring.md#kms-context + ##= type=test ##% If the stored encryption context with the two reserved keys removed does not match the provided encryption context, the KmsKeyring MUST throw an exception. def test_on_decrypt_fails_with_mismatched_encryption_context(self): """Test that on_decrypt fails when encryption contexts don't match."""