diff --git a/src/s3_encryption/decryptor.py b/src/s3_encryption/decryptor.py index e3d4eece..f76c19dc 100644 --- a/src/s3_encryption/decryptor.py +++ b/src/s3_encryption/decryptor.py @@ -72,8 +72,11 @@ def finalize(self, data: bytes) -> bytes: plaintext = self._decryptor.update(data) if data else b"" plaintext += self._decryptor.finalize() return self._unpadder.update(plaintext) + self._unpadder.finalize() - except Exception as e: - raise S3EncryptionClientSecurityError(f"Failed to decrypt CBC content: {e}") from e + except Exception: + # Use a fixed message for all CBC failures to prevent padding oracle attacks. + # Different failure modes (bad padding, truncated ciphertext, wrong key) MUST + # produce identical error responses so an attacker cannot distinguish them. + raise S3EncryptionClientSecurityError("Failed to decrypt CBC content.") from None @define diff --git a/src/s3_encryption/materials/kms_keyring.py b/src/s3_encryption/materials/kms_keyring.py index edf1d27b..abd6fad4 100644 --- a/src/s3_encryption/materials/kms_keyring.py +++ b/src/s3_encryption/materials/kms_keyring.py @@ -200,6 +200,16 @@ def on_decrypt(self, dec_materials, encrypted_data_keys=None): f"Enable legacy wrapping algorithms to use legacy key wrapping " f"algorithm: {edk.key_provider_info}" ) + # The KmsV1 wrapping algorithm does not support caller-provided + # encryption context. If the caller provided encryption context, + # the client MUST reject the request. This prevents a downgrade + # from kms+context to kms from bypassing context validation. + if dec_materials.encryption_context_from_request: + raise S3EncryptionClientError( + "Encryption context is not supported with the KmsV1 (kms) " + "wrapping algorithm. Use kms+context wrapping algorithm to " + "use encryption context." + ) else: raise S3EncryptionClientError( f"{edk.key_provider_info} is not a valid key wrapping algorithm!" diff --git a/src/s3_encryption/pipelines.py b/src/s3_encryption/pipelines.py index 0a107a5f..c4fe4867 100644 --- a/src/s3_encryption/pipelines.py +++ b/src/s3_encryption/pipelines.py @@ -316,6 +316,30 @@ def decrypt( # Determine the algorithm suite from the metadata algorithm_suite = self._determine_algorithm_suite(metadata) + # Reject metadata that contains keys from multiple format versions. + # This prevents format confusion attacks where an attacker injects + # V2 keys via an instruction file to bypass V3 key-commitment verification. + if metadata.has_exclusive_key_collision(): + raise S3EncryptionClientError( + "Object metadata contains keys from multiple format versions. " + "The object or its instruction file may have been tampered with." + ) + + # Also reject V2 format metadata that contains V3 content keys. + # In the instruction file injection scenario, the attacker replaces + # V3 EDK keys with V2 keys, but V3 content keys (x-amz-c, x-amz-d, + # x-amz-i) remain from the object metadata. This combination is + # never produced by legitimate encryption. + if metadata.is_v2_format() and ( + metadata.content_cipher_v3 is not None + or metadata.key_commitment_v3 is not None + or metadata.message_id_v3 is not None + ): + raise S3EncryptionClientError( # pragma: no cover — only reachable via instruction file merge; covered by TestInstructionFileFormatConfusion + "Object metadata contains V2 format keys alongside V3 content keys. " + "The object or its instruction file may have been tampered with." + ) + # Determine which format we're dealing with and get decryption materials if metadata.is_v1_format(): dec_materials = self._decrypt_v1(metadata, encryption_context) @@ -590,7 +614,13 @@ def _decrypt_v3(self, metadata, encryption_context) -> DecryptionMaterials: # Map V3 compressed wrapping algorithm to canonical key_provider_info raw_wrap_alg = metadata.encrypted_data_key_algorithm_v3 or "12" - wrap_alg = self._V3_WRAP_ALG_MAP.get(raw_wrap_alg, raw_wrap_alg) + wrap_alg = self._V3_WRAP_ALG_MAP.get(raw_wrap_alg) + if wrap_alg is None: + raise S3EncryptionClientError( + f"Unknown V3 wrapping algorithm: '{raw_wrap_alg}'. " + f"Valid values are: {list(self._V3_WRAP_ALG_MAP.keys())}. " + f"The object metadata may have been tampered with." + ) encrypted_data_key = EncryptedDataKey( key_provider_id=b"S3Keyring", @@ -607,8 +637,13 @@ def _decrypt_v3(self, metadata, encryption_context) -> DecryptionMaterials: stored_context = {} if wrap_alg == "kms+context": raw_ctx = metadata.encryption_context_v3 - else: + elif wrap_alg in ("AES/GCM", "RSA-OAEP-SHA1"): raw_ctx = metadata.mat_desc_v3 + else: + raise S3EncryptionClientError( # pragma: no cover — defense in depth, unreachable + f"Unexpected V3 wrapping algorithm for context selection: '{wrap_alg}'. " + f"The object metadata may have been tampered with." + ) if raw_ctx is not None: if isinstance(raw_ctx, dict): diff --git a/test-server/java-tests/src/it/java/software/amazon/encryption/s3/RoundTripTests.java b/test-server/java-tests/src/it/java/software/amazon/encryption/s3/RoundTripTests.java index e6cfae84..4763663d 100644 --- a/test-server/java-tests/src/it/java/software/amazon/encryption/s3/RoundTripTests.java +++ b/test-server/java-tests/src/it/java/software/amazon/encryption/s3/RoundTripTests.java @@ -332,6 +332,10 @@ public void kmsV1Legacy(TestUtils.LanguageServerTarget language) { @ParameterizedTest(name = "{displayName} for Encrypt: Java, Decrypt: {0}") @MethodSource("software.amazon.encryption.s3.TestUtils#clientsForTest") public void kmsV1LegacyWithEncCtx(TestUtils.LanguageServerTarget language) { + if (KMSV1_ENCRYPTION_CONTEXT_UNSUPPORTED.contains(language.getLanguageName())) { + throw new TestAbortedException( + "KmsV1 with encryption context not supported for: " + language.getLanguageName()); + } S3ECTestServerClient client = testServerClientFor(language); final String objectKey = appendTestSuffix("test-key-kms-v1-with-enc-ctx-" + language); final String input = "simple-test-input"; diff --git a/test-server/java-tests/src/it/java/software/amazon/encryption/s3/TestUtils.java b/test-server/java-tests/src/it/java/software/amazon/encryption/s3/TestUtils.java index d488fd2d..c1464eaf 100644 --- a/test-server/java-tests/src/it/java/software/amazon/encryption/s3/TestUtils.java +++ b/test-server/java-tests/src/it/java/software/amazon/encryption/s3/TestUtils.java @@ -98,6 +98,11 @@ public class TestUtils { public static final Set ENCRYPTION_CONTEXT_ON_ENCRYPT_UNSUPPORTED = Set.of(NET_V3_TRANSITION, NET_V4); + // Languages that reject caller-provided encryption context when the + // wrapping algorithm is KmsV1 ("kms"). + public static final Set KMSV1_ENCRYPTION_CONTEXT_UNSUPPORTED = + Set.of(PYTHON_V3); + public static final Set RE_ENCRYPT_SUPPORTED = Set.of(JAVA_V3_TRANSITION, JAVA_V4); diff --git a/test/integration/test_i_security.py b/test/integration/test_i_security.py new file mode 100644 index 00000000..67782c67 --- /dev/null +++ b/test/integration/test_i_security.py @@ -0,0 +1,630 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Security integration tests for S3 Encryption Client. + +These tests verify that the client correctly handles metadata tampering +scenarios, particularly wrapping algorithm downgrade attempts that modify +metadata to bypass encryption context validation. +""" + +import base64 +import json +import os +from datetime import datetime +from unittest.mock import MagicMock + +import boto3 +import pytest +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.padding import PKCS7 + +from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig +from s3_encryption.decryptor import AesCbcDecryptor +from s3_encryption.exceptions import S3EncryptionClientError, S3EncryptionClientSecurityError +from s3_encryption.materials.crypto_materials_manager import DefaultCryptoMaterialsManager +from s3_encryption.materials.kms_keyring import KmsKeyring +from s3_encryption.materials.materials import AlgorithmSuite, CommitmentPolicy +from s3_encryption.pipelines import GetEncryptedObjectPipeline + +bucket = os.environ.get("CI_S3_BUCKET", "s3ec-python-github-test-bucket") +region = os.environ.get("CI_AWS_REGION", "us-west-2") +kms_key_id = os.environ.get( + "CI_KMS_KEY_ALIAS", "arn:aws:kms:us-west-2:370957321024:alias/S3EC-Python-Github-KMS-Key" +) + + +def _unique_key(prefix): + return prefix + datetime.now().strftime("%Y-%m-%d-%H:%M:%S-%f") + + +def _make_client(algorithm_suite, commitment_policy, enable_legacy_wrapping=False): + """Create an S3EncryptionClient with the given config.""" + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring( + kms_client, kms_key_id, enable_legacy_wrapping_algorithms=enable_legacy_wrapping + ) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig( + keyring, + encryption_algorithm=algorithm_suite, + commitment_policy=commitment_policy, + ) + return S3EncryptionClient(wrapped_client, config) + + +class TestWrappingAlgorithmDowngradeAttack: + """Tests for wrapping algorithm downgrade scenarios. + + These tests verify behavior when the wrapping algorithm metadata is + modified from kms+context to kms. In V3 format, "kms" is not a valid + compressed wrapping algorithm code, so the client MUST reject it. + """ + + def test_v3_downgrade_wrap_alg_to_kms_rejected_without_legacy(self): + """Tampering x-amz-w from '12' to 'kms' MUST fail when legacy wrapping is disabled. + + The default KmsKeyring does not enable legacy wrapping algorithms, + so the 'kms' wrapping algorithm value should be rejected outright. + """ + key = _unique_key("sec-downgrade-no-legacy-") + data = b"sensitive data with context" + encryption_context = {"project": "alpha"} + + # 1. Encrypt normally with kms+context (V3 format) + s3ec = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + s3ec.put_object(Bucket=bucket, Key=key, Body=data, EncryptionContext=encryption_context) + + # 2. Attacker tampers x-amz-w from '12' to 'kms' via S3 copy + plain_s3 = boto3.client("s3") + head = plain_s3.head_object(Bucket=bucket, Key=key) + original_metadata = head["Metadata"] + assert ( + original_metadata.get("x-amz-w") == "12" + ), f"Expected x-amz-w='12', got {original_metadata.get('x-amz-w')}" + + tampered_metadata = original_metadata.copy() + tampered_metadata["x-amz-w"] = "kms" + + plain_s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={"Bucket": bucket, "Key": key}, + Metadata=tampered_metadata, + MetadataDirective="REPLACE", + ) + + # 3. Decryption with mismatched context MUST fail + with pytest.raises((S3EncryptionClientError, Exception)): + s3ec.get_object(Bucket=bucket, Key=key, EncryptionContext={"project": "beta"}) + + def test_v3_downgrade_wrap_alg_to_kms_rejected_with_correct_context(self): + """Tampering x-amz-w from '12' to 'kms' MUST fail even with the original context. + + The V3 wrapping algorithm validation rejects "kms" as an invalid + compressed code regardless of what encryption context the caller + provides. The rejection happens before any context comparison. + """ + key = _unique_key("sec-downgrade-no-legacy-correct-ctx-") + data = b"sensitive data with context" + encryption_context = {"project": "alpha"} + + # 1. Encrypt normally with kms+context (V3 format) + s3ec = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + s3ec.put_object(Bucket=bucket, Key=key, Body=data, EncryptionContext=encryption_context) + + # 2. Tamper x-amz-w from '12' to 'kms' + plain_s3 = boto3.client("s3") + head = plain_s3.head_object(Bucket=bucket, Key=key) + tampered_metadata = head["Metadata"].copy() + tampered_metadata["x-amz-w"] = "kms" + + plain_s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={"Bucket": bucket, "Key": key}, + Metadata=tampered_metadata, + MetadataDirective="REPLACE", + ) + + # 3. Decryption with the ORIGINAL (correct) context MUST still fail + with pytest.raises((S3EncryptionClientError, Exception)): + s3ec.get_object(Bucket=bucket, Key=key, EncryptionContext=encryption_context) + + def test_v3_downgrade_wrap_alg_to_kms_rejected_with_legacy(self): + """Tampering x-amz-w from '12' to 'kms' MUST still fail even with legacy enabled. + + Even when enable_legacy_wrapping_algorithms=True, the KmsV1 path + passes the *stored* encryption context to KMS Decrypt. Since the + data key was originally encrypted with the 'alpha' context, KMS + itself will reject the Decrypt call (the ciphertext is bound to + the original context). The mismatched 'beta' context should never + produce a successful decryption. + """ + key = _unique_key("sec-downgrade-legacy-") + data = b"sensitive data with context" + encryption_context = {"project": "alpha"} + + # 1. Encrypt with kms+context (V3) + s3ec_encrypt = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + s3ec_encrypt.put_object( + Bucket=bucket, Key=key, Body=data, EncryptionContext=encryption_context + ) + + # 2. Attacker tampers x-amz-w from '12' to 'kms' + plain_s3 = boto3.client("s3") + head = plain_s3.head_object(Bucket=bucket, Key=key) + tampered_metadata = head["Metadata"].copy() + tampered_metadata["x-amz-w"] = "kms" + + plain_s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={"Bucket": bucket, "Key": key}, + Metadata=tampered_metadata, + MetadataDirective="REPLACE", + ) + + # 3. Decrypt with legacy enabled but mismatched context MUST fail + s3ec_legacy = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + enable_legacy_wrapping=True, + ) + with pytest.raises((S3EncryptionClientError, Exception)): + s3ec_legacy.get_object(Bucket=bucket, Key=key, EncryptionContext={"project": "beta"}) + + def test_v3_downgrade_wrap_alg_correct_context_still_fails(self): + """Tampering x-amz-w from '12' to 'kms' MUST fail even with the correct context. + + The KmsV1 path uses the *stored* encryption context (from x-amz-t) + for the KMS Decrypt call. But the stored context for kms+context + includes the reserved key 'aws:x-amz-cek-alg'. When the wrapping + algorithm is changed to 'kms', the keyring may not reconstruct the + correct KMS encryption context, causing KMS to reject the call. + This verifies the attack fails regardless of what context the + caller provides. + """ + key = _unique_key("sec-downgrade-correct-ctx-") + data = b"sensitive data with context" + encryption_context = {"project": "alpha"} + + # 1. Encrypt with kms+context (V3) + s3ec = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + s3ec.put_object(Bucket=bucket, Key=key, Body=data, EncryptionContext=encryption_context) + + # 2. Attacker tampers x-amz-w + plain_s3 = boto3.client("s3") + head = plain_s3.head_object(Bucket=bucket, Key=key) + tampered_metadata = head["Metadata"].copy() + tampered_metadata["x-amz-w"] = "kms" + + plain_s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={"Bucket": bucket, "Key": key}, + Metadata=tampered_metadata, + MetadataDirective="REPLACE", + ) + + # 3. Even with the CORRECT original context, decryption should fail + # because the wrapping algorithm mismatch corrupts the KMS call + s3ec_legacy = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + enable_legacy_wrapping=True, + ) + with pytest.raises((S3EncryptionClientError, Exception)): + s3ec_legacy.get_object(Bucket=bucket, Key=key, EncryptionContext=encryption_context) + + def test_v3_downgrade_with_matdesc_injection(self): + """Tampering x-amz-w to 'kms' AND copying x-amz-t into x-amz-m MUST be rejected. + + "kms" is not a valid V3 compressed wrapping algorithm code, so the + client rejects it before the matdesc injection has any effect. + """ + key = _unique_key("sec-v3-downgrade-matdesc-") + data = b"sensitive data with context" + encryption_context = {"project": "alpha"} + + # 1. Encrypt with kms+context (V3) + s3ec = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + s3ec.put_object(Bucket=bucket, Key=key, Body=data, EncryptionContext=encryption_context) + + # 2. Attacker tampers x-amz-w AND copies x-amz-t into x-amz-m + plain_s3 = boto3.client("s3") + head = plain_s3.head_object(Bucket=bucket, Key=key) + tampered_metadata = head["Metadata"].copy() + + # Downgrade wrapping algorithm + tampered_metadata["x-amz-w"] = "kms" + # Copy the original bound context from x-amz-t into x-amz-m + # so the KmsV1 path reads it as mat_desc and passes it to KMS Decrypt + tampered_metadata["x-amz-m"] = tampered_metadata["x-amz-t"] + + plain_s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={"Bucket": bucket, "Key": key}, + Metadata=tampered_metadata, + MetadataDirective="REPLACE", + ) + + # 3. Decrypt with legacy enabled + mismatched context MUST fail + s3ec_legacy = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + enable_legacy_wrapping=True, + ) + with pytest.raises((S3EncryptionClientError, Exception)): + s3ec_legacy.get_object(Bucket=bucket, Key=key, EncryptionContext={"project": "beta"}) + + +class TestV2WrappingAlgorithmDowngradeAttack: + """V2 wrapping algorithm downgrade tests. + + V2 stores the wrapping algorithm in x-amz-wrap-alg. The KmsV1 ("kms") + wrapping algorithm does not support caller-provided encryption context. + When a caller provides encryption context on decrypt and the wrapping + algorithm is "kms", the client MUST reject the request. This is the + canonical behavior established by the Java AmazonS3EncryptionClientV2. + """ + + def test_v2_downgrade_wrap_alg_to_kms_correct_context(self): + """Tampering x-amz-wrap-alg to 'kms' MUST fail even with the original correct context. + + The KmsV1 wrapping algorithm does not support encryption context. + The client MUST reject when a caller provides any encryption context + and the wrapping algorithm is 'kms', regardless of whether the + context matches the stored matdesc. + """ + key = _unique_key("sec-v2-downgrade-correct-ctx-") + data = b"sensitive v2 data" + encryption_context = {"project": "alpha"} + + # 1. Encrypt with V2 format (AES_GCM, kms+context wrapping) + s3ec = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + ) + s3ec.put_object(Bucket=bucket, Key=key, Body=data, EncryptionContext=encryption_context) + + # 2. Tamper x-amz-wrap-alg from 'kms+context' to 'kms' + plain_s3 = boto3.client("s3") + head = plain_s3.head_object(Bucket=bucket, Key=key) + tampered_metadata = head["Metadata"].copy() + tampered_metadata["x-amz-wrap-alg"] = "kms" + + plain_s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={"Bucket": bucket, "Key": key}, + Metadata=tampered_metadata, + MetadataDirective="REPLACE", + ) + + # 3. Decrypt with legacy enabled + CORRECT original context MUST still fail + s3ec_legacy = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + enable_legacy_wrapping=True, + ) + with pytest.raises((S3EncryptionClientError, Exception)): + s3ec_legacy.get_object(Bucket=bucket, Key=key, EncryptionContext=encryption_context) + + def test_v2_downgrade_wrap_alg_to_kms_mismatched_context(self): + """Tampering x-amz-wrap-alg from 'kms+context' to 'kms' with wrong context. + + The KmsV1 wrapping algorithm does not support encryption context. + The client MUST reject when a caller provides mismatched encryption + context and the wrapping algorithm is 'kms'. + """ + key = _unique_key("sec-v2-downgrade-") + data = b"sensitive v2 data" + encryption_context = {"project": "alpha"} + + # 1. Encrypt with V2 format (AES_GCM, kms+context wrapping) + s3ec = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + ) + s3ec.put_object(Bucket=bucket, Key=key, Body=data, EncryptionContext=encryption_context) + + # 2. Attacker tampers x-amz-wrap-alg from 'kms+context' to 'kms' + plain_s3 = boto3.client("s3") + head = plain_s3.head_object(Bucket=bucket, Key=key) + original_metadata = head["Metadata"] + assert ( + original_metadata.get("x-amz-wrap-alg") == "kms+context" + ), f"Expected x-amz-wrap-alg='kms+context', got {original_metadata.get('x-amz-wrap-alg')}" + + tampered_metadata = original_metadata.copy() + tampered_metadata["x-amz-wrap-alg"] = "kms" + + plain_s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={"Bucket": bucket, "Key": key}, + Metadata=tampered_metadata, + MetadataDirective="REPLACE", + ) + + # 3. Decrypt with legacy enabled + mismatched context + s3ec_legacy = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + enable_legacy_wrapping=True, + ) + with pytest.raises((S3EncryptionClientError, Exception)): + s3ec_legacy.get_object(Bucket=bucket, Key=key, EncryptionContext={"project": "beta"}) + + +class TestEncryptionContextBypassAttempts: + """Tests verifying encryption context cannot be bypassed through other vectors.""" + + def test_v3_no_context_on_decrypt_after_context_on_encrypt(self): + """Omitting EncryptionContext on get_object MUST fail if object was encrypted with one.""" + key = _unique_key("sec-no-ctx-decrypt-") + data = b"data requiring context" + encryption_context = {"project": "alpha"} + + s3ec = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + s3ec.put_object(Bucket=bucket, Key=key, Body=data, EncryptionContext=encryption_context) + + with pytest.raises(S3EncryptionClientError): + s3ec.get_object(Bucket=bucket, Key=key) + + def test_v3_tamper_stored_context_metadata(self): + """Tampering x-amz-t (stored encryption context) MUST cause KMS Decrypt to fail. + + The KMS ciphertext is bound to the original encryption context. + Modifying x-amz-t changes what the client sends to KMS Decrypt, + causing a mismatch with the ciphertext's bound context. + """ + key = _unique_key("sec-tamper-ctx-") + data = b"data with bound context" + encryption_context = {"project": "alpha"} + + s3ec = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + s3ec.put_object(Bucket=bucket, Key=key, Body=data, EncryptionContext=encryption_context) + + # Tamper the stored encryption context in x-amz-t + plain_s3 = boto3.client("s3") + head = plain_s3.head_object(Bucket=bucket, Key=key) + tampered_metadata = head["Metadata"].copy() + + # Replace the stored context with attacker-controlled values + tampered_metadata["x-amz-t"] = json.dumps({"project": "beta", "aws:x-amz-cek-alg": "115"}) + + plain_s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={"Bucket": bucket, "Key": key}, + Metadata=tampered_metadata, + MetadataDirective="REPLACE", + ) + + # Decryption with the tampered context should fail at KMS + with pytest.raises((S3EncryptionClientError, Exception)): + s3ec.get_object(Bucket=bucket, Key=key, EncryptionContext={"project": "beta"}) + + +class TestCBCErrorIndistinguishability: + """Tests verifying that CBC decryption errors are indistinguishable. + + A padding oracle requires the caller to distinguish between padding + errors and other decryption failures. These tests verify that all CBC + failure modes produce the same error type and message, preventing + an attacker from using error responses to deduce padding validity. + """ + + def _encrypt_cbc(self, key, iv, plaintext): + """Helper to encrypt with AES-CBC + PKCS7 padding.""" + cipher = Cipher(algorithms.AES(key), modes.CBC(iv)) + encryptor = cipher.encryptor() + padder = PKCS7(128).padder() + padded = padder.update(plaintext) + padder.finalize() + return encryptor.update(padded) + encryptor.finalize() + + def _make_cbc_decryptor(self, key, iv, content_length): + """Helper to create an AesCbcDecryptor.""" + cipher = Cipher(algorithms.AES(key), modes.CBC(iv)) + unpadder = PKCS7(128).unpadder() + return AesCbcDecryptor(cipher.decryptor(), unpadder, content_length) + + def test_wrong_key_and_tampered_ciphertext_produce_same_error(self): + """Wrong key and tampered ciphertext MUST produce identical error messages. + + Both cause PKCS7 unpadding to fail, but the error message and type + MUST be the same so an attacker cannot distinguish between them. + """ + key = os.urandom(32) + iv = os.urandom(16) + ciphertext = self._encrypt_cbc(key, iv, b"test data for padding oracle check") + + # Wrong key: decryption produces garbage, unpadding fails + wrong_key = os.urandom(32) + decryptor1 = self._make_cbc_decryptor(wrong_key, iv, len(ciphertext)) + with pytest.raises(S3EncryptionClientSecurityError) as exc1: + decryptor1.finalize(ciphertext) + + # Tampered ciphertext: last byte flipped, unpadding fails + tampered = ciphertext[:-1] + bytes([ciphertext[-1] ^ 0x01]) + decryptor2 = self._make_cbc_decryptor(key, iv, len(tampered)) + with pytest.raises(S3EncryptionClientSecurityError) as exc2: + decryptor2.finalize(tampered) + + # Both MUST produce the same error message + assert str(exc1.value) == str(exc2.value), ( + f"Error messages differ: wrong_key={str(exc1.value)!r}, " + f"tampered={str(exc2.value)!r}" + ) + + # Neither message should contain details about the underlying failure + assert ( + "padding" not in str(exc1.value).lower() + ), f"Error message leaks padding information: {str(exc1.value)!r}" + + def test_truncated_ciphertext_produces_same_error(self): + """Truncated ciphertext MUST produce the same error as padding failure. + + A non-block-aligned ciphertext causes a different exception in the + cryptography library. The error message MUST be identical to prevent + an attacker from distinguishing truncation from padding failure. + """ + key = os.urandom(32) + iv = os.urandom(16) + ciphertext = self._encrypt_cbc(key, iv, b"test data for truncation check") + + # Padding failure (wrong key) + wrong_key = os.urandom(32) + decryptor1 = self._make_cbc_decryptor(wrong_key, iv, len(ciphertext)) + with pytest.raises(S3EncryptionClientSecurityError) as exc1: + decryptor1.finalize(ciphertext) + + # Truncated ciphertext (not block-aligned) + truncated = ciphertext[:-3] + decryptor2 = self._make_cbc_decryptor(key, iv, len(truncated)) + with pytest.raises(S3EncryptionClientSecurityError) as exc2: + decryptor2.finalize(truncated) + + # Both MUST produce the same error message + assert str(exc1.value) == str(exc2.value), ( + f"Error messages differ: padding_fail={str(exc1.value)!r}, " + f"truncated={str(exc2.value)!r}" + ) + + +class TestInstructionFileFormatConfusion: + """Tests for instruction file metadata injection causing format confusion. + + When a V3 object uses instruction files, the instruction file metadata + is merged with object metadata. If an attacker injects V2-format keys + into the instruction file (or directly into object metadata), the merged + metadata may contain keys from multiple format versions. The client + detects this via has_exclusive_key_collision() and the V2+V3 content + key coexistence check, rejecting the tampered metadata before format + dispatch. + """ + + def test_v2_keys_injected_into_v3_metadata_rejected(self): + """Injecting V2 keys into V3 object metadata MUST be rejected. + + Encrypt a V3 object, then tamper the S3 metadata to add V2 keys + alongside the existing V3 content keys. The client MUST reject + this because V2 and V3 keys should never coexist. + """ + key = _unique_key("sec-v2-inject-v3-") + data = b"data for format confusion test" + encryption_context = {"project": "alpha"} + + # 1. Encrypt with V3 format + s3ec = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + s3ec.put_object(Bucket=bucket, Key=key, Body=data, EncryptionContext=encryption_context) + + # 2. Tamper: inject V2 keys alongside existing V3 metadata + plain_s3 = boto3.client("s3") + head = plain_s3.head_object(Bucket=bucket, Key=key) + tampered_metadata = head["Metadata"].copy() + + # Add V2 keys — the V3 keys (x-amz-c, x-amz-d, x-amz-i, x-amz-3, x-amz-w) remain + tampered_metadata["x-amz-key-v2"] = tampered_metadata.get("x-amz-3", "fake") + tampered_metadata["x-amz-cek-alg"] = "AES/GCM/NoPadding" + tampered_metadata["x-amz-iv"] = "AAAAAAAAAAAAAAAA" + tampered_metadata["x-amz-wrap-alg"] = "kms+context" + + plain_s3.copy_object( + Bucket=bucket, + Key=key, + CopySource={"Bucket": bucket, "Key": key}, + Metadata=tampered_metadata, + MetadataDirective="REPLACE", + ) + + # 3. Decrypt MUST fail — metadata has both V2 and V3 keys + s3ec_legacy = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT, + enable_legacy_wrapping=True, + ) + with pytest.raises((S3EncryptionClientError, Exception)): + s3ec_legacy.get_object(Bucket=bucket, Key=key, EncryptionContext=encryption_context) + + def test_exclusive_key_collision_detected_during_decrypt(self): + """The decrypt pipeline MUST reject metadata with exclusive key collisions. + + When merged metadata contains both V2 and V3 exclusive keys, + the pipeline detects the collision and raises an error. + """ + # Create a mock CMM that would return decryption materials + mock_cmm = MagicMock(spec=DefaultCryptoMaterialsManager) + + pipeline = GetEncryptedObjectPipeline( + cmm=mock_cmm, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT, + enable_legacy_unauthenticated_modes=False, + ) + + # Build a response with merged V2+V3 metadata (simulating the + # instruction file injection after merge) + fake_edk = base64.b64encode(os.urandom(32)).decode() + fake_iv = base64.b64encode(os.urandom(12)).decode() + fake_message_id = base64.b64encode(os.urandom(28)).decode() + fake_commitment = base64.b64encode(os.urandom(28)).decode() + + merged_metadata = { + # V2 keys (from attacker instruction file) + "x-amz-key-v2": fake_edk, + "x-amz-cek-alg": "AES/GCM/NoPadding", + "x-amz-iv": fake_iv, + "x-amz-wrap-alg": "kms+context", + "x-amz-matdesc": '{"aws:x-amz-cek-alg": "AES/GCM/NoPadding"}', + # V3 keys (from object metadata) + "x-amz-c": "115", + "x-amz-d": fake_commitment, + "x-amz-i": fake_message_id, + "x-amz-w": "12", + "x-amz-3": fake_edk, + } + + fake_body = MagicMock() + fake_body.read.return_value = os.urandom(48) # fake ciphertext + + response = { + "Body": fake_body, + "Metadata": merged_metadata, + "ContentLength": 48, + } + + # This SHOULD raise an error due to exclusive key collision, + # but currently routes to _decrypt_v2 instead + with pytest.raises(S3EncryptionClientError): + pipeline.decrypt( + response, + instruction_suffix=".instruction", + enable_delayed_authentication=False, + encryption_context={}, + ) diff --git a/test/test_kms_keyring.py b/test/test_kms_keyring.py index 0a5d66de..8c5b2ab2 100644 --- a/test/test_kms_keyring.py +++ b/test/test_kms_keyring.py @@ -467,3 +467,58 @@ def test_on_decrypt_fails_when_kms_fails(self): keyring.on_decrypt(dec_materials) assert exc_info.value is kms_exception + + def test_on_decrypt_kms_v1_rejects_any_encryption_context(self): + """KmsV1 path must reject any caller-provided encryption context.""" + mock_kms_client = MagicMock() + keyring = KmsKeyring( + mock_kms_client, + "arn:aws:kms:us-east-1:123456789012:key/test-key", + enable_legacy_wrapping_algorithms=True, + ) + + edk = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms", + encrypted_data_key=b"encrypted-key", + ) + dec_materials = DecryptionMaterials( + iv=b"initialization-vector", + encrypted_data_keys=[edk], + encryption_context_stored={"project": "alpha", "kms_cmk_id": "some-key"}, + encryption_context_from_request={"project": "alpha"}, + ) + + with pytest.raises(S3EncryptionClientError, match="not supported with the KmsV1"): + keyring.on_decrypt(dec_materials) + + mock_kms_client.decrypt.assert_not_called() + + def test_on_decrypt_kms_v1_rejects_mismatched_encryption_context(self): + """KmsV1 path must reject mismatched caller-provided encryption context.""" + mock_kms_client = MagicMock() + keyring = KmsKeyring( + mock_kms_client, + "arn:aws:kms:us-east-1:123456789012:key/test-key", + enable_legacy_wrapping_algorithms=True, + ) + + edk = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms", + encrypted_data_key=b"encrypted-key", + ) + dec_materials = DecryptionMaterials( + iv=b"initialization-vector", + encrypted_data_keys=[edk], + encryption_context_stored={"project": "alpha", "kms_cmk_id": "some-key"}, + encryption_context_from_request={"project": "beta"}, + ) + + with pytest.raises(S3EncryptionClientError, match="not supported with the KmsV1"): + keyring.on_decrypt(dec_materials) + + mock_kms_client.decrypt.assert_not_called() + + # KMS should never be called when context doesn't match + mock_kms_client.decrypt.assert_not_called() diff --git a/test/test_pipelines.py b/test/test_pipelines.py index a66880e4..edd9ba8d 100644 --- a/test/test_pipelines.py +++ b/test/test_pipelines.py @@ -434,3 +434,38 @@ def test_decrypt_instruction_file_empty_metadata_raises(self): bucket="test-bucket", key="test-key", ) + + def test_decrypt_rejects_exclusive_key_collision(self): + """Metadata with both V2 and V3 EDK keys MUST be rejected.""" + import base64 + import os + + mock_cmm = Mock() + pipeline = GetEncryptedObjectPipeline( + cmm=mock_cmm, + commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + ) + + fake_edk = base64.b64encode(os.urandom(32)).decode() + fake_iv = base64.b64encode(os.urandom(12)).decode() + # Metadata with both V2 (x-amz-key-v2) and V3 (x-amz-3) EDK keys + metadata = { + "x-amz-key-v2": fake_edk, + "x-amz-cek-alg": "AES/GCM/NoPadding", + "x-amz-iv": fake_iv, + "x-amz-wrap-alg": "kms+context", + "x-amz-3": fake_edk, + "x-amz-c": "115", + "x-amz-w": "12", + "x-amz-d": base64.b64encode(os.urandom(28)).decode(), + "x-amz-i": base64.b64encode(os.urandom(28)).decode(), + } + + mock_response = { + "Body": BytesIO(os.urandom(48)), + "Metadata": metadata, + "ContentLength": 48, + } + + with pytest.raises(S3EncryptionClientError, match="multiple format versions"): + pipeline.decrypt(mock_response, ".instruction", enable_delayed_authentication=False)