diff --git a/.github/workflows/python-integ.yml b/.github/workflows/python-integ.yml index d2761518..0f50c8e8 100644 --- a/.github/workflows/python-integ.yml +++ b/.github/workflows/python-integ.yml @@ -48,34 +48,35 @@ jobs: aws-region: us-west-2 - name: Run unit tests - run: make test-unit + run: | + uv run pytest test/ --ignore=test/integration/ --verbose \ + --cov=src/s3_encryption --cov-report=term-missing --cov-report=html:coverage-unit \ + --cov-fail-under=89 - name: Run integration tests - run: make test-integration + run: | + uv run pytest test/integration/ --verbose \ + --cov=src/s3_encryption --cov-report=term-missing --cov-report=html:coverage-integ \ + --cov-fail-under=83 env: CI_S3_BUCKET: ${{ vars.CI_S3_BUCKET }} CI_KMS_KEY_ALIAS: ${{ vars.CI_KMS_KEY_ALIAS }} + CI_MRK_KEY_ID_PRIMARY: ${{ vars.CI_MRK_KEY_ID_PRIMARY }} + CI_MRK_KEY_ID_REPLICA: ${{ vars.CI_MRK_KEY_ID_REPLICA }} - name: Run examples run: make test-examples - name: Generate coverage HTML report if: always() - run: uv run coverage html -d coverage-report + uses: actions/upload-artifact@v7 + with: + name: coverage-unit + path: coverage-unit/ - - name: Upload coverage report + - name: Upload integration test coverage report if: always() uses: actions/upload-artifact@v7 with: - name: coverage-report - path: coverage-report/ - - - name: Check coverage threshold - run: | - THRESHOLD=93 - ACTUAL=$(uv run coverage report --format=total) - echo "Coverage: ${ACTUAL}% (threshold: ${THRESHOLD}%)" - if [ "$ACTUAL" -gt "$THRESHOLD" ]; then - echo "::warning::Coverage is ${ACTUAL}%, consider updating --fail-under to ${ACTUAL} in python-integ.yml" - fi - uv run coverage report --fail-under=$THRESHOLD + name: coverage-integ + path: coverage-integ/ diff --git a/Makefile b/Makefile index 5960117a..caf05737 100644 --- a/Makefile +++ b/Makefile @@ -23,13 +23,13 @@ format: # Run all tests with combined coverage test: test-unit test-integration test-examples -# Run unit tests (creates .coverage report) +# Run unit tests with coverage test-unit: - uv run pytest test/ --ignore=test/integration/ --verbose --cov=src/s3_encryption --cov-report=term-missing + uv run pytest test/ --ignore=test/integration/ --verbose --cov=src/s3_encryption --cov-report=term-missing --cov-fail-under=89 -# Run integration tests (appends to .coverage report from test-unit) +# Run integration tests with separate coverage test-integration: - uv run pytest test/integration/ --verbose --cov=src/s3_encryption --cov-append --cov-report=term-missing + uv run pytest test/integration/ --verbose --cov=src/s3_encryption --cov-report=term-missing --cov-fail-under=83 test-examples: uv run pytest examples/test/ -v diff --git a/cdk/lib/cdk-stack.ts b/cdk/lib/cdk-stack.ts index b5a28084..329e16ca 100644 --- a/cdk/lib/cdk-stack.ts +++ b/cdk/lib/cdk-stack.ts @@ -65,6 +65,33 @@ export class S3ECPythonGithub extends cdk.Stack { } ) + // Multi-Region Key (MRK) for cross-region testing. + // The primary key is created here in the stack's region (us-west-2). + // A replica MUST be created manually in us-east-1 via the AWS Console + // or a separate CDK stack, since CDK cannot create cross-region replicas + // within a single stack. + const S3ECMRKPrimaryKey = new Key( + this, + "S3ECMRKPrimaryKey", + { + enableKeyRotation: true, + description: "Multi-Region primary key for S3EC cross-region testing", + // multiRegion is not a direct CDK L2 prop; use cfnOptions override + } + ); + // Override to enable multi-region on the underlying CloudFormation resource + const cfnMrkKey = S3ECMRKPrimaryKey.node.defaultChild as cdk.aws_kms.CfnKey; + cfnMrkKey.addPropertyOverride("MultiRegion", true); + + const S3ECMRKPrimaryKeyAlias = new Alias( + this, + "S3ECMRKPrimaryKeyAlias", + { + aliasName: "alias/S3EC-Python-MRK-Primary", + targetKey: S3ECMRKPrimaryKey, + } + ); + // S3 buckets const AccessConfiguration: BlockPublicAccessOptions = { blockPublicAcls: false, @@ -162,6 +189,10 @@ export class S3ECPythonGithub extends cdk.Stack { resources: [ S3ECGithubKMSKey.keyArn, S3ECTestServerKMSKey.keyArn, // Add access to the test-server KMS key + S3ECMRKPrimaryKey.keyArn, // MRK primary key + // MRK replica in us-east-1 — ARN must use wildcard account + // since the replica shares the same key ID but different region + `arn:aws:kms:us-east-1:${this.account}:key/${S3ECMRKPrimaryKey.keyId}`, ] }) ] diff --git a/src/s3_encryption/__init__.py b/src/s3_encryption/__init__.py index dd0441d8..a9d5a3a1 100644 --- a/src/s3_encryption/__init__.py +++ b/src/s3_encryption/__init__.py @@ -294,6 +294,32 @@ def process_instruction_file(self, parsed): parsed["Body"] = streaming_body +def _validate_encryption_context(encryption_context): + """Validate that all encryption context keys and values are US-ASCII. + + S3 applies double-encoding to non-ASCII metadata values that SDKs do not + automatically decode, which causes decryption to fail because the stored + encryption context won't match the original. + + Raises: + S3EncryptionClientError: If any key or value contains non-ASCII characters. + """ + if encryption_context is None: + return + if not isinstance(encryption_context, dict): + raise S3EncryptionClientError("EncryptionContext must be a dictionary") + for k, v in encryption_context.items(): + if not isinstance(k, str) or not isinstance(v, str): + raise S3EncryptionClientError("EncryptionContext keys and values must be strings") + if not k.isascii() or not v.isascii(): + raise S3EncryptionClientError( + f"EncryptionContext keys and values must contain only US-ASCII characters. " + f"Non-ASCII characters in S3 metadata are encoded by the server " + f"and will cause decryption to fail. " + f"First offending entry: {repr(k)}: {repr(v)}" + ) + + @define class S3EncryptionClient: """Client for encrypting and decrypting S3 objects. @@ -322,6 +348,15 @@ def __attrs_post_init__(self): event_system.register("before-call.s3.PutObject", self._plugin.on_put_object_before_call) event_system.register("after-call.s3.GetObject", self._plugin.on_get_object_after_call) + def __getattr__(self, name): + """Proxy unrecognized attributes to the wrapped S3 client. + + This allows the S3EncryptionClient to be used like a regular boto3 S3 + client for operations it doesn't intercept (e.g. copy_object, + list_objects_v2, etc.). + """ + return getattr(self.wrapped_s3_client, name) + def put_object(self, **kwargs): """Encrypt and upload an object to S3. @@ -343,6 +378,7 @@ def put_object(self, **kwargs): """ # Extract EncryptionContext if provided (not a standard S3 parameter) encryption_context = kwargs.pop("EncryptionContext", None) + _validate_encryption_context(encryption_context) # Store encryption context in thread-local storage for the event handler self._plugin._context.encryption_context = encryption_context @@ -380,6 +416,7 @@ def get_object(self, **kwargs): """ # Extract EncryptionContext if provided (not a standard S3 parameter) encryption_context = kwargs.pop("EncryptionContext", None) + _validate_encryption_context(encryption_context) ##= specification/s3-encryption/data-format/metadata-strategy.md#instruction-file ##= type=implementation ##% The S3EC SHOULD support providing a custom Instruction File suffix diff --git a/test/integration/test_i_custom_keyring_cmm.py b/test/integration/test_i_custom_keyring_cmm.py new file mode 100644 index 00000000..45cd3441 --- /dev/null +++ b/test/integration/test_i_custom_keyring_cmm.py @@ -0,0 +1,239 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Integration tests for custom keyring and custom CMM. + +These tests verify that user-implemented AbstractKeyring and +AbstractCryptoMaterialsManager subclasses work end-to-end through +S3EncryptionClient.put_object / get_object. + +WARNING: The custom classes below are test-only stubs that duplicate the +built-in KmsKeyring and DefaultCryptoMaterialsManager logic. They exist +solely to prove the extension points work. Do NOT use them in production. +""" + +import os +from datetime import datetime + +import boto3 + +from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig +from s3_encryption.exceptions import S3EncryptionClientError +from s3_encryption.materials.crypto_materials_manager import AbstractCryptoMaterialsManager +from s3_encryption.materials.encrypted_data_key import EncryptedDataKey +from s3_encryption.materials.keyring import S3Keyring +from s3_encryption.materials.materials import ( + AlgorithmSuite, + CommitmentPolicy, + DecryptionMaterials, + EncryptionMaterials, +) + +bucket = os.environ.get("CI_S3_BUCKET", "s3ec-python-github-test-bucket") +region = os.environ.get("CI_AWS_REGION", "us-west-2") +kms_key_id = os.environ.get( + "CI_KMS_KEY_ALIAS", "arn:aws:kms:us-west-2:370957321024:alias/S3EC-Python-Github-KMS-Key" +) + +KMS_CONTEXT_DEFAULT_KEY = "aws:x-amz-cek-alg" + + +# --------------------------------------------------------------------------- +# Custom keyring — test-only, do NOT use in production code. +# Duplicates KmsKeyring logic to prove the AbstractKeyring extension point. +# --------------------------------------------------------------------------- + + +class CustomTestKmsKeyring(S3Keyring): + """Test-only KMS keyring. Do NOT use in production.""" + + def __init__(self, kms_client, kms_key_id): + self.kms_client = kms_client + self.kms_key_id = kms_key_id + + def on_encrypt(self, enc_materials): + enc_materials = super().on_encrypt(enc_materials) + encryption_context = enc_materials.encryption_context + + if ( + enc_materials.encryption_algorithm + == AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY + ): + encryption_context[KMS_CONTEXT_DEFAULT_KEY] = str( + enc_materials.encryption_algorithm.suite_id + ) + else: + encryption_context[KMS_CONTEXT_DEFAULT_KEY] = ( + enc_materials.encryption_algorithm.cipher_name + ) + + response = self.kms_client.generate_data_key( + KeyId=self.kms_key_id, KeySpec="AES_256", EncryptionContext=encryption_context + ) + enc_materials.encrypted_data_key = EncryptedDataKey( + key_provider_id=b"S3Keyring", + key_provider_info="kms+context", + encrypted_data_key=response["CiphertextBlob"], + ) + enc_materials.plaintext_data_key = response["Plaintext"] + return enc_materials + + def on_decrypt(self, dec_materials, encrypted_data_keys=None): + dec_materials = super().on_decrypt(dec_materials, encrypted_data_keys) + edks = ( + encrypted_data_keys + if encrypted_data_keys is not None + else dec_materials.encrypted_data_keys + ) + edk = edks[0] + + if edk.key_provider_info == "kms+context": + ec_from_request = dec_materials.encryption_context_from_request + ec_stored = dec_materials.encryption_context_stored + + if KMS_CONTEXT_DEFAULT_KEY in ec_from_request: + raise S3EncryptionClientError(f"{KMS_CONTEXT_DEFAULT_KEY} is a reserved key") + + ec_stored_copy = ec_stored.copy() + ec_stored_copy.pop("kms_cmk_id", None) + ec_stored_copy.pop(KMS_CONTEXT_DEFAULT_KEY, None) + + if ec_stored_copy != ec_from_request: + raise S3EncryptionClientError("Provided encryption context does not match") + elif edk.key_provider_info != "kms": + raise S3EncryptionClientError( + f"{edk.key_provider_info} is not a valid key wrapping algorithm!" + ) + + response = self.kms_client.decrypt( + KeyId=self.kms_key_id, + CiphertextBlob=edk.encrypted_data_key, + EncryptionContext=dec_materials.encryption_context_stored, + ) + dec_materials.plaintext_data_key = response["Plaintext"] + return dec_materials + + +# --------------------------------------------------------------------------- +# Custom CMM — test-only, do NOT use in production code. +# Duplicates DefaultCryptoMaterialsManager logic to prove the CMM extension point. +# --------------------------------------------------------------------------- + + +class CustomTestCMM(AbstractCryptoMaterialsManager): + """Test-only CMM. Do NOT use in production.""" + + def __init__(self, keyring): + self.keyring = keyring + + def get_encryption_materials(self, enc_mats_request): + if isinstance(enc_mats_request, dict): + materials = EncryptionMaterials( + encryption_context=enc_mats_request.get("encryption_context", {}) + ) + else: + materials = enc_mats_request + return self.keyring.on_encrypt(materials) + + def decrypt_materials(self, dec_mats_request): + if isinstance(dec_mats_request, dict): + materials = DecryptionMaterials.from_dict(dec_mats_request) + else: + materials = dec_mats_request + return self.keyring.on_decrypt(materials, materials.encrypted_data_keys) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _unique_key(prefix): + return prefix + datetime.now().strftime("%Y-%m-%d-%H:%M:%S-%f") + + +# --------------------------------------------------------------------------- +# Integration tests +# --------------------------------------------------------------------------- + + +class TestCustomKeyring: + """Verify a user-implemented AbstractKeyring subclass works end-to-end.""" + + def test_roundtrip_with_custom_keyring(self): + """Custom keyring MUST encrypt and decrypt successfully.""" + kms_client = boto3.client("kms", region_name=region) + keyring = CustomTestKmsKeyring(kms_client, kms_key_id) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig(keyring=keyring) + s3ec = S3EncryptionClient(wrapped_client, config) + + key = _unique_key("custom-keyring-rt-") + data = b"custom keyring round trip test" + + s3ec.put_object(Bucket=bucket, Key=key, Body=data) + response = s3ec.get_object(Bucket=bucket, Key=key) + assert response["Body"].read() == data + + def test_roundtrip_with_custom_keyring_aes_gcm(self): + """Custom keyring MUST work with non-committing AES-GCM suite.""" + kms_client = boto3.client("kms", region_name=region) + keyring = CustomTestKmsKeyring(kms_client, kms_key_id) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig( + keyring=keyring, + encryption_algorithm=AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + ) + s3ec = S3EncryptionClient(wrapped_client, config) + + key = _unique_key("custom-keyring-gcm-rt-") + data = b"custom keyring AES-GCM round trip" + + s3ec.put_object(Bucket=bucket, Key=key, Body=data) + response = s3ec.get_object(Bucket=bucket, Key=key) + assert response["Body"].read() == data + + +class TestCustomCMM: + """Verify a user-implemented AbstractCryptoMaterialsManager subclass works end-to-end.""" + + def test_roundtrip_with_custom_cmm(self): + """Custom CMM MUST encrypt and decrypt successfully.""" + from s3_encryption.materials.kms_keyring import KmsKeyring + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + custom_cmm = CustomTestCMM(keyring) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig(keyring=keyring, cmm=custom_cmm) + s3ec = S3EncryptionClient(wrapped_client, config) + + key = _unique_key("custom-cmm-rt-") + data = b"custom CMM round trip test" + + s3ec.put_object(Bucket=bucket, Key=key, Body=data) + response = s3ec.get_object(Bucket=bucket, Key=key) + assert response["Body"].read() == data + + def test_roundtrip_with_custom_cmm_aes_gcm(self): + """Custom CMM MUST work with non-committing AES-GCM suite.""" + from s3_encryption.materials.kms_keyring import KmsKeyring + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + custom_cmm = CustomTestCMM(keyring) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig( + keyring=keyring, + cmm=custom_cmm, + encryption_algorithm=AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + ) + s3ec = S3EncryptionClient(wrapped_client, config) + + key = _unique_key("custom-cmm-gcm-rt-") + data = b"custom CMM AES-GCM round trip" + + s3ec.put_object(Bucket=bucket, Key=key, Body=data) + response = s3ec.get_object(Bucket=bucket, Key=key) + assert response["Body"].read() == data diff --git a/test/integration/test_i_key_commitment_policy.py b/test/integration/test_i_key_commitment_policy.py new file mode 100644 index 00000000..ba334a87 --- /dev/null +++ b/test/integration/test_i_key_commitment_policy.py @@ -0,0 +1,200 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Integration tests for key commitment policy enforcement through the front door. + +These tests verify that commitment policy behavior works end-to-end through +S3EncryptionClient.put_object / get_object, not just at the pipeline level. + +Objects are encrypted with one policy and decrypted with another to verify +cross-policy compatibility. +""" + +import os +from datetime import datetime + +import boto3 +import pytest + +from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig +from s3_encryption.exceptions import S3EncryptionClientError +from s3_encryption.materials.kms_keyring import KmsKeyring +from s3_encryption.materials.materials import AlgorithmSuite, CommitmentPolicy + +bucket = os.environ.get("CI_S3_BUCKET", "s3ec-python-github-test-bucket") +region = os.environ.get("CI_AWS_REGION", "us-west-2") +kms_key_id = os.environ.get( + "CI_KMS_KEY_ALIAS", "arn:aws:kms:us-west-2:370957321024:alias/S3EC-Python-Github-KMS-Key" +) + + +def _make_client(algorithm_suite, commitment_policy): + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig( + keyring, + encryption_algorithm=algorithm_suite, + commitment_policy=commitment_policy, + ) + return S3EncryptionClient(wrapped_client, config) + + +def _unique_key(prefix): + return prefix + datetime.now().strftime("%Y-%m-%d-%H:%M:%S-%f") + + +# --------------------------------------------------------------------------- +# Non-committing (V2 GCM) objects decrypted under various policies +# --------------------------------------------------------------------------- + + +class TestNonCommittingObjectDecryptPolicies: + """Verify V2 (non-committing) objects can be decrypted under ALLOW policies + and rejected under REQUIRE_REQUIRE. + """ + + PLAINTEXT = b"non-committing policy integration test" + + @pytest.fixture(autouse=True, scope="class") + def _encrypt_v2_object(self, request): + """Encrypt a single V2 object to be shared across all tests in this class.""" + key = _unique_key("kc-v2-policy-") + writer = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + ) + writer.put_object(Bucket=bucket, Key=key, Body=self.PLAINTEXT) + request.cls.s3_key = key + + def test_forbid_encrypt_allow_decrypt_decrypts_non_committing(self): + """FORBID_ENCRYPT_ALLOW_DECRYPT MUST decrypt non-committing objects.""" + reader = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + ) + response = reader.get_object(Bucket=bucket, Key=self.s3_key) + assert response["Body"].read() == self.PLAINTEXT + + def test_require_encrypt_allow_decrypt_decrypts_non_committing(self): + """REQUIRE_ENCRYPT_ALLOW_DECRYPT MUST decrypt non-committing objects.""" + reader = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT, + ) + response = reader.get_object(Bucket=bucket, Key=self.s3_key) + assert response["Body"].read() == self.PLAINTEXT + + def test_require_require_rejects_non_committing(self): + """REQUIRE_ENCRYPT_REQUIRE_DECRYPT MUST reject non-committing objects.""" + reader = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + with pytest.raises(S3EncryptionClientError, match="cannot decrypt non-key-committing"): + reader.get_object(Bucket=bucket, Key=self.s3_key) + + +# --------------------------------------------------------------------------- +# Committing (V3 KC-GCM) objects decrypted under various policies +# --------------------------------------------------------------------------- + +# Writer policies that produce committing (V3) objects +COMMITTING_WRITER_POLICIES = [ + pytest.param( + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + id="writer=REQUIRE_REQUIRE", + ), + pytest.param( + CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT, + id="writer=REQUIRE_ALLOW", + ), +] + + +@pytest.mark.parametrize("writer_policy", COMMITTING_WRITER_POLICIES) +class TestCommittingObjectDecryptPolicies: + """Verify V3 (committing) objects can be decrypted under all three policies, + regardless of which REQUIRE_ENCRYPT_* policy was used to write them. + """ + + PLAINTEXT = b"committing policy integration test" + + @pytest.fixture(autouse=True) + def _encrypt_v3_object(self, writer_policy): + """Encrypt a V3 object with the parametrized writer policy.""" + key = _unique_key("kc-v3-policy-") + writer = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + writer_policy, + ) + writer.put_object(Bucket=bucket, Key=key, Body=self.PLAINTEXT) + self.s3_key = key + + def test_require_require_decrypts_committing(self): + """REQUIRE_ENCRYPT_REQUIRE_DECRYPT MUST decrypt committing objects.""" + reader = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + response = reader.get_object(Bucket=bucket, Key=self.s3_key) + assert response["Body"].read() == self.PLAINTEXT + + def test_require_encrypt_allow_decrypt_decrypts_committing(self): + """REQUIRE_ENCRYPT_ALLOW_DECRYPT MUST decrypt committing objects.""" + reader = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT, + ) + response = reader.get_object(Bucket=bucket, Key=self.s3_key) + assert response["Body"].read() == self.PLAINTEXT + + def test_forbid_encrypt_allow_decrypt_decrypts_committing(self): + """FORBID_ENCRYPT_ALLOW_DECRYPT MUST decrypt committing objects.""" + reader = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + ) + response = reader.get_object(Bucket=bucket, Key=self.s3_key) + assert response["Body"].read() == self.PLAINTEXT + + +# --------------------------------------------------------------------------- +# Encrypt-side config rejection (no S3 needed, but verifies front-door behavior) +# --------------------------------------------------------------------------- + + +class TestEncryptPolicyRejection: + """Verify that incompatible algorithm + policy combos are rejected at config time.""" + + def test_require_encrypt_allow_decrypt_rejects_non_committing(self): + """REQUIRE_ENCRYPT_ALLOW_DECRYPT MUST reject non-committing algorithm at config time.""" + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + with pytest.raises(S3EncryptionClientError): + S3EncryptionClientConfig( + keyring, + encryption_algorithm=AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT, + ) + + def test_require_encrypt_require_decrypt_rejects_non_committing(self): + """REQUIRE_ENCRYPT_REQUIRE_DECRYPT MUST reject non-committing algorithm at config time.""" + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + with pytest.raises(S3EncryptionClientError): + S3EncryptionClientConfig( + keyring, + encryption_algorithm=AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + + def test_forbid_encrypt_allow_decrypt_rejects_committing(self): + """FORBID_ENCRYPT_ALLOW_DECRYPT MUST reject committing algorithm at config time.""" + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + with pytest.raises(S3EncryptionClientError): + S3EncryptionClientConfig( + keyring, + encryption_algorithm=AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + ) diff --git a/test/integration/test_i_mrk_cross_region.py b/test/integration/test_i_mrk_cross_region.py new file mode 100644 index 00000000..bb9bd538 --- /dev/null +++ b/test/integration/test_i_mrk_cross_region.py @@ -0,0 +1,123 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Integration tests for Multi-Region Key (MRK) cross-region encrypt/decrypt. + +These tests verify that data encrypted with a KMS MRK primary key in one region +can be decrypted using the MRK replica in another region, and vice versa. + +Prerequisites: + - A KMS MRK primary key in us-west-2 (created by CDK stack) + - A KMS MRK replica of the same key in us-east-1 (created manually after CDK deploy) + - Both keys share the same key ID (mrk-...) but have different region ARNs + +Environment variables: + CI_MRK_KEY_ID_PRIMARY: ARN or alias of the MRK primary in us-west-2 + CI_MRK_KEY_ID_REPLICA: ARN of the MRK replica in us-east-1 + CI_S3_BUCKET: S3 bucket for test objects (us-west-2) +""" + +import os +from datetime import datetime + +import boto3 +import pytest + +from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig +from s3_encryption.exceptions import S3EncryptionClientError +from s3_encryption.materials.kms_keyring import KmsKeyring + +bucket = os.environ.get("CI_S3_BUCKET", "s3ec-python-github-test-bucket") +primary_region = os.environ.get("CI_AWS_REGION", "us-west-2") +replica_region = "us-east-1" + +mrk_primary = os.environ.get( + "CI_MRK_KEY_ID_PRIMARY", + "arn:aws:kms:us-west-2:370957321024:key/mrk-cea4cf67c6a046ba829f61f69db5c191", +) +mrk_replica = os.environ.get( + "CI_MRK_KEY_ID_REPLICA", + "arn:aws:kms:us-east-1:370957321024:key/mrk-cea4cf67c6a046ba829f61f69db5c191", +) + + +def _make_client(kms_region, kms_key_id): + """Create an S3EncryptionClient using a KMS client in the given region.""" + kms_client = boto3.client("kms", region_name=kms_region) + keyring = KmsKeyring(kms_client, kms_key_id) + # Always use a primary region S3 client + wrapped_client = boto3.client("s3", region_name=primary_region) + config = S3EncryptionClientConfig(keyring=keyring) + return S3EncryptionClient(wrapped_client, config) + + +def _unique_key(prefix): + return prefix + datetime.now().strftime("%Y-%m-%d-%H:%M:%S-%f") + + +class TestMRKCrossRegion: + """Verify MRK encrypt/decrypt works across regions.""" + + def test_encrypt_primary_decrypt_replica(self): + """Data encrypted with MRK primary MUST decrypt with MRK replica.""" + key = _unique_key("mrk-primary-to-replica-") + data = b"MRK cross-region: primary -> replica" + + writer = _make_client(primary_region, mrk_primary) + writer.put_object(Bucket=bucket, Key=key, Body=data) + + reader = _make_client(replica_region, mrk_replica) + response = reader.get_object(Bucket=bucket, Key=key) + assert response["Body"].read() == data + + def test_encrypt_replica_decrypt_primary(self): + """Data encrypted with MRK replica MUST decrypt with MRK primary.""" + key = _unique_key("mrk-replica-to-primary-") + data = b"MRK cross-region: replica -> primary" + + writer = _make_client(replica_region, mrk_replica) + writer.put_object(Bucket=bucket, Key=key, Body=data) + + reader = _make_client(primary_region, mrk_primary) + response = reader.get_object(Bucket=bucket, Key=key) + assert response["Body"].read() == data + + def test_encrypt_and_decrypt_same_region_primary(self): + """MRK primary round-trip in the same region MUST work.""" + key = _unique_key("mrk-same-region-primary-") + data = b"MRK same-region primary round trip" + + s3ec = _make_client(primary_region, mrk_primary) + s3ec.put_object(Bucket=bucket, Key=key, Body=data) + response = s3ec.get_object(Bucket=bucket, Key=key) + assert response["Body"].read() == data + + def test_encrypt_and_decrypt_same_region_replica(self): + """MRK replica round-trip in the same region MUST work.""" + key = _unique_key("mrk-same-region-replica-") + data = b"MRK same-region replica round trip" + + s3ec = _make_client(replica_region, mrk_replica) + s3ec.put_object(Bucket=bucket, Key=key, Body=data) + response = s3ec.get_object(Bucket=bucket, Key=key) + assert response["Body"].read() == data + + +class TestMRKNonReplicatedRegionFails: + """Verify that using an MRK in a region where it hasn't been replicated fails.""" + + def test_decrypt_with_wrong_region_kms_client_fails(self): + """Decrypting with a KMS client pointed at a non-replicated region MUST fail.""" + key = _unique_key("mrk-wrong-region-") + data = b"MRK wrong region test" + + # Encrypt with primary + writer = _make_client(primary_region, mrk_primary) + writer.put_object(Bucket=bucket, Key=key, Body=data) + + # Try to decrypt using a KMS client in a region where the MRK doesn't exist. + # Use eu-west-1 as a region that almost certainly has no replica. + non_replicated_region = "eu-west-1" + reader = _make_client(non_replicated_region, mrk_primary) + + with pytest.raises(S3EncryptionClientError): + reader.get_object(Bucket=bucket, Key=key) diff --git a/test/integration/test_i_s3_encryption.py b/test/integration/test_i_s3_encryption.py index 36f826bd..4074c6ed 100644 --- a/test/integration/test_i_s3_encryption.py +++ b/test/integration/test_i_s3_encryption.py @@ -138,6 +138,21 @@ def test_binary_data_roundtrip(algorithm_suite, commitment_policy): assert output == data +@pytest.mark.parametrize("algorithm_suite,commitment_policy", ALGORITHM_CONFIGS) +def test_bytesio_body_roundtrip(algorithm_suite, commitment_policy): + """Test that a BytesIO body is encrypted and decrypted correctly.""" + from io import BytesIO + + key = _unique_key("bytesio-body-rt-") + data = b"BytesIO round trip test data" + + s3ec = _make_client(algorithm_suite, commitment_policy) + s3ec.put_object(Bucket=bucket, Key=key, Body=BytesIO(data)) + response = s3ec.get_object(Bucket=bucket, Key=key) + output = response["Body"].read() + assert output == data + + @pytest.mark.parametrize("algorithm_suite,commitment_policy", ALGORITHM_CONFIGS) def test_invalid_body_types(algorithm_suite, commitment_policy): """Test that put_object raises an exception when given invalid body types.""" @@ -275,3 +290,117 @@ def test_delayed_authentication_mode(enable_delayed_auth): s3ec.put_object(Bucket=bucket, Key=key, Body=data) response = s3ec.get_object(Bucket=bucket, Key=key) assert response["Body"].read() == data + + +def test_inaccessible_kms_key_raises_access_denied(): + """put_object with a KMS key we lack permission for MUST surface AccessDeniedException.""" + from botocore.exceptions import ClientError + + fake_key_arn = "arn:aws:kms:us-west-2:123456789012:key/00000000-0000-0000-0000-000000000000" + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, fake_key_arn) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig(keyring=keyring) + s3ec = S3EncryptionClient(wrapped_client, config) + + key = _unique_key("access-denied-") + + with pytest.raises(S3EncryptionClientError, match="Failed to encrypt object") as exc_info: + s3ec.put_object(Bucket=bucket, Key=key, Body=b"should fail") + + # Unwrap and verify the root cause is AccessDeniedException + cause = exc_info.value.__cause__ + assert isinstance(cause, ClientError) + assert cause.response["Error"]["Code"] == "AccessDeniedException" + + +def test_get_nonexistent_object_raises_no_such_key(): + """get_object for a key that doesn't exist MUST surface NoSuchKey.""" + from botocore.exceptions import ClientError + + s3ec = _make_client( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + + with pytest.raises(S3EncryptionClientError, match="NoSuchKey") as exc_info: + s3ec.get_object(Bucket=bucket, Key="this-key-definitely-does-not-exist") + + cause = exc_info.value.__cause__ + assert isinstance(cause, ClientError) + assert cause.response["Error"]["Code"] == "NoSuchKey" + + +@pytest.mark.parametrize("algorithm_suite,commitment_policy", ALGORITHM_CONFIGS) +def test_s3_passthrough_options_preserved(algorithm_suite, commitment_policy): + """S3 options unrelated to encryption (e.g. StorageClass, ContentType) MUST be applied.""" + key = _unique_key("passthrough-opts-") + data = b'{"message": "hello"}' + + s3ec = _make_client(algorithm_suite, commitment_policy) + s3ec.put_object( + Bucket=bucket, + Key=key, + Body=data, + StorageClass="STANDARD_IA", + ContentType="application/json", + ContentDisposition="attachment; filename=test.json", + ) + + # Read back with head_object via the S3EC instance to verify the options were applied + head = s3ec.head_object(Bucket=bucket, Key=key) + assert head["StorageClass"] == "STANDARD_IA" + assert head["ContentType"] == "application/json" + assert head["ContentDisposition"] == "attachment; filename=test.json" + + # Also verify the data round-trips correctly + response = s3ec.get_object(Bucket=bucket, Key=key) + assert response["Body"].read() == data + + +@pytest.mark.parametrize("algorithm_suite,commitment_policy", ALGORITHM_CONFIGS) +def test_copy_object_then_decrypt(algorithm_suite, commitment_policy): + """An encrypted object copied via CopyObject MUST still decrypt correctly.""" + src_key = _unique_key("copy-src-") + dst_key = _unique_key("copy-dst-") + data = b"copy object round trip test" + + s3ec = _make_client(algorithm_suite, commitment_policy) + s3ec.put_object(Bucket=bucket, Key=src_key, Body=data) + + # Copy using the S3EC instance (copy_object proxies to the wrapped S3 client) + s3ec.copy_object( + Bucket=bucket, + Key=dst_key, + CopySource={"Bucket": bucket, "Key": src_key}, + MetadataDirective="COPY", + ) + + # Decrypt the copied object + response = s3ec.get_object(Bucket=bucket, Key=dst_key) + assert response["Body"].read() == data + + +@pytest.mark.parametrize("algorithm_suite,commitment_policy", ALGORITHM_CONFIGS) +def test_non_ascii_encryption_context_rejected(algorithm_suite, commitment_policy): + """Non-US-ASCII characters in EncryptionContext MUST be rejected. + + S3 applies an esoteric double-encoding to non-ASCII metadata values that + most SDKs do not automatically decode. This causes decryption to fail + because the stored encryption context won't match the original. Currently + boto3 rejects non-ASCII header values before the request is sent. + """ + key = _unique_key("non-ascii-ec-") + non_ascii_contexts = [ + {"department": "ingeniería"}, # Latin accented + {"部門": "engineering"}, # CJK key + {"project": "проект"}, # Cyrillic value + {"emoji": "test 🔑"}, # Emoji + {"long😮‍💨": "𐀂"}, # Long Sigh/Psi + ] + + s3ec = _make_client(algorithm_suite, commitment_policy) + + for ec in non_ascii_contexts: + with pytest.raises(S3EncryptionClientError, match="US-ASCII"): + s3ec.put_object(Bucket=bucket, Key=key, Body=b"test", EncryptionContext=ec) diff --git a/test/integration/test_i_s3_encryption_instruction_file.py b/test/integration/test_i_s3_encryption_instruction_file.py index a78a6902..b2d2b5f4 100644 --- a/test/integration/test_i_s3_encryption_instruction_file.py +++ b/test/integration/test_i_s3_encryption_instruction_file.py @@ -129,6 +129,25 @@ def test_decrypt_invalid_instruction_file(): print(f"Error message: {exc_info.value}") +def test_decrypt_instruction_file_wrong_suffix_raises(): + """Decryption MUST fail when the instruction file suffix doesn't match the actual S3 object.""" + from s3_encryption.exceptions import S3EncryptionClientError + + key = TEST_OBJECTS["v3_instruction_file"] + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig( + keyring, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + s3ec = S3EncryptionClient(wrapped_client, config) + + with pytest.raises(S3EncryptionClientError, match="Instruction file body is empty"): + s3ec.get_object(Bucket=bucket, Key=key, InstructionFileSuffix=".wrong-suffix") + + def test_decrypt_v3_instruction_file_custom_suffix(): """Test decrypting V3 object with a custom instruction file suffix.""" key = TEST_OBJECTS["v3_instruction_file"] diff --git a/test/test_encryption.py b/test/test_encryption.py index a384afbd..ede9262c 100644 --- a/test/test_encryption.py +++ b/test/test_encryption.py @@ -135,6 +135,21 @@ def test_message_id_included_in_metadata_kc(self): _, meta = pipeline.encrypt(b"test") assert "x-amz-i" in meta + def test_bytesio_body_encrypts_successfully(self): + """Encryption MUST work when the body is a BytesIO object.""" + cmm, key = _mock_cmm() + pipeline = PutEncryptedObjectPipeline( + cmm, AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY + ) + plaintext = b"BytesIO body test data" + + # The plugin reads BytesIO via .read(), so the pipeline receives bytes. + # Verify the pipeline encrypts bytes from a BytesIO source correctly. + ciphertext, meta = pipeline.encrypt(plaintext) + assert ciphertext != plaintext + assert len(ciphertext) > 0 + assert "x-amz-i" in meta # V3 message ID present + # --------------------------------------------------------------------------- # ALG_AES_256_GCM_IV12_TAG16_NO_KDF diff --git a/test/test_key_commitment.py b/test/test_key_commitment.py index a0be12ce..673bf5da 100644 --- a/test/test_key_commitment.py +++ b/test/test_key_commitment.py @@ -163,3 +163,27 @@ def test_require_require_allows_committing_decrypt(self): ) result = pipeline.decrypt(response, ".instruction", enable_delayed_authentication=False) assert result.read() == plaintext + + def test_require_encrypt_allow_decrypt_allows_committing_decrypt(self): + """REQUIRE_ENCRYPT_ALLOW_DECRYPT MUST allow decryption with committing suites.""" + key = os.urandom(32) + response, dec_mats, plaintext = _v3_kc_gcm_response(key) + + pipeline = _make_pipeline( + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT, + keyring_return=dec_mats, + ) + result = pipeline.decrypt(response, ".instruction", enable_delayed_authentication=False) + assert result.read() == plaintext + + def test_forbid_encrypt_allow_decrypt_allows_committing_decrypt(self): + """FORBID_ENCRYPT_ALLOW_DECRYPT MUST allow decryption with committing suites.""" + key = os.urandom(32) + response, dec_mats, plaintext = _v3_kc_gcm_response(key) + + pipeline = _make_pipeline( + commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + keyring_return=dec_mats, + ) + result = pipeline.decrypt(response, ".instruction", enable_delayed_authentication=False) + assert result.read() == plaintext diff --git a/test/test_pipelines.py b/test/test_pipelines.py index e3d34e35..a66880e4 100644 --- a/test/test_pipelines.py +++ b/test/test_pipelines.py @@ -310,3 +310,127 @@ def test_decrypt_v3_unsupported_wrap_alg(self): S3EncryptionClientError, match="AES/GCM is not a valid key wrapping algorithm" ): pipeline.decrypt(mock_response, ".instruction", enable_delayed_authentication=False) + + def test_decrypt_instruction_file_no_s3_client_raises(self): + """Instruction file fetch MUST fail when no s3_client is available.""" + # Object metadata has no EDK — triggers instruction file path + object_metadata = {} + + mock_keyring = Mock(spec=S3Keyring) + cmm = DefaultCryptoMaterialsManager(mock_keyring) + pipeline = GetEncryptedObjectPipeline( + cmm, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + s3_client=None, # No s3_client + ) + + mock_response = { + "Body": BytesIO(b"encrypted-data"), + "Metadata": object_metadata, + } + + with pytest.raises(S3EncryptionClientError, match="s3_client required"): + pipeline.decrypt( + mock_response, + instruction_suffix=".instruction", + enable_delayed_authentication=False, + bucket="test-bucket", + key="test-key", + ) + + def test_decrypt_instruction_file_missing_bucket_key_raises(self): + """Instruction file fetch MUST fail when Bucket or Key is missing.""" + object_metadata = {} + + mock_s3_client = Mock() + mock_keyring = Mock(spec=S3Keyring) + cmm = DefaultCryptoMaterialsManager(mock_keyring) + pipeline = GetEncryptedObjectPipeline( + cmm, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + s3_client=mock_s3_client, + ) + + mock_response = { + "Body": BytesIO(b"encrypted-data"), + "Metadata": object_metadata, + } + + with pytest.raises(S3EncryptionClientError, match="Bucket and key required"): + pipeline.decrypt( + mock_response, + instruction_suffix=".instruction", + enable_delayed_authentication=False, + bucket=None, + key=None, + ) + + def test_decrypt_instruction_file_s3_not_found_raises(self): + """Instruction file fetch MUST fail when the file doesn't exist in S3.""" + from botocore.exceptions import ClientError + + object_metadata = {} + + mock_s3_client = Mock() + mock_s3_client.get_object.side_effect = ClientError( + {"Error": {"Code": "NoSuchKey", "Message": "The specified key does not exist."}}, + "GetObject", + ) + # The fetch_instruction_file function checks for _s3ec_plugin_context + mock_s3_client._s3ec_plugin_context = Mock() + + mock_keyring = Mock(spec=S3Keyring) + cmm = DefaultCryptoMaterialsManager(mock_keyring) + pipeline = GetEncryptedObjectPipeline( + cmm, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + s3_client=mock_s3_client, + ) + + mock_response = { + "Body": BytesIO(b"encrypted-data"), + "Metadata": object_metadata, + } + + with pytest.raises(S3EncryptionClientError, match="Instruction File"): + pipeline.decrypt( + mock_response, + instruction_suffix=".instruction", + enable_delayed_authentication=False, + bucket="test-bucket", + key="test-key", + ) + + def test_decrypt_instruction_file_empty_metadata_raises(self): + """Instruction file with no valid metadata MUST raise an error.""" + object_metadata = {} + + mock_s3_client = Mock() + # Instruction file returns empty metadata (empty body parsed to nothing) + mock_s3_client.get_object.return_value = { + "Body": BytesIO(b""), + "Metadata": {}, + } + mock_s3_client._s3ec_plugin_context = Mock() + + mock_keyring = Mock(spec=S3Keyring) + cmm = DefaultCryptoMaterialsManager(mock_keyring) + pipeline = GetEncryptedObjectPipeline( + cmm, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + s3_client=mock_s3_client, + ) + + mock_response = { + "Body": BytesIO(b"encrypted-data"), + "Metadata": object_metadata, + } + + with pytest.raises(S3EncryptionClientError, match="empty metadata"): + pipeline.decrypt( + mock_response, + instruction_suffix=".instruction", + enable_delayed_authentication=False, + bucket="test-bucket", + key="test-key", + ) diff --git a/test/test_s3_encryption_client_plugin.py b/test/test_s3_encryption_client_plugin.py index cbc8cd80..1c930a3a 100644 --- a/test/test_s3_encryption_client_plugin.py +++ b/test/test_s3_encryption_client_plugin.py @@ -154,3 +154,17 @@ def test_missing_content_length_raises_error(self): with pytest.raises(S3EncryptionClientError, match="missing ContentLength.*Key: my-object"): plugin.on_get_object_after_call(parsed) + + def test_put_object_rejects_instruction_file_mode(self): + """put_object MUST raise when instruction-file mode is active.""" + mock_keyring = Mock(spec=S3Keyring) + config = S3EncryptionClientConfig(keyring=mock_keyring) + plugin = S3EncryptionClientPlugin(config) + + # Activate instruction file mode + plugin._context.instruction_file_mode = True + + params = {"body": b"test data", "headers": {}} + + with pytest.raises(S3EncryptionClientError, match="not supported in put_object"): + plugin.on_put_object_before_call(params)