diff --git a/.github/workflows/python-integ.yml b/.github/workflows/python-integ.yml index b845e725..76941b77 100644 --- a/.github/workflows/python-integ.yml +++ b/.github/workflows/python-integ.yml @@ -56,6 +56,9 @@ jobs: CI_S3_BUCKET: ${{ vars.CI_S3_BUCKET }} CI_KMS_KEY_ALIAS: ${{ vars.CI_KMS_KEY_ALIAS }} + - name: Run examples + run: uv run pytest examples/ -v -m examples + - name: Generate coverage HTML report if: always() run: uv run coverage html -d coverage-report diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 00000000..f94fd12a --- /dev/null +++ b/examples/__init__.py @@ -0,0 +1,2 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 diff --git a/examples/src/__init__.py b/examples/src/__init__.py new file mode 100644 index 00000000..f94fd12a --- /dev/null +++ b/examples/src/__init__.py @@ -0,0 +1,2 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 diff --git a/examples/src/delayed_auth_streaming_example.py b/examples/src/delayed_auth_streaming_example.py new file mode 100644 index 00000000..61a24bb5 --- /dev/null +++ b/examples/src/delayed_auth_streaming_example.py @@ -0,0 +1,78 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +""" +This example demonstrates streaming decryption with delayed authentication +using the S3 Encryption Client. + +By default, the S3 Encryption Client buffers the entire ciphertext and verifies +the authentication tag before releasing any plaintext. This is the safest mode, +but requires holding the entire object in memory. + +With delayed authentication enabled, plaintext is released incrementally as it +is decrypted, before the authentication tag has been verified. This allows +processing large files without buffering the entire object in memory. + +WARNING: With delayed authentication, plaintext is released before it has been +authenticated. An attacker could modify the ciphertext and the client would +release tampered plaintext before detecting the modification. Only use this +mode when you need to process files too large to buffer in memory and you +understand the security implications. + +This example: +1. Creates a KMS Keyring +2. Configures the S3 Encryption Client with delayed authentication enabled +3. Encrypts and uploads a large object to S3 +4. Streams the decrypted object back, reading it in chunks +5. Verifies the decrypted content matches the original +""" + +from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig +from s3_encryption.materials.kms_keyring import KmsKeyring + +# 10 MB of example data +EXAMPLE_DATA: bytes = b"A" * (10 * 1024 * 1024) +CHUNK_SIZE = 1024 * 1024 # 1 MB + + +def delayed_auth_streaming_decrypt( + s3_client, kms_client, kms_key_id: str, bucket: str, key: str +): + """Demonstrate streaming decryption with delayed authentication. + + Args: + s3_client: boto3 S3 client. + kms_client: boto3 KMS client. + kms_key_id: KMS key ARN or alias to use for encryption/decryption. + bucket: S3 bucket name. + key: S3 object key. + """ + # 1. Create a KMS Keyring. + keyring = KmsKeyring(kms_client=kms_client, kms_key_id=kms_key_id) + + # 2. Configure the S3 Encryption Client with delayed authentication. + config = S3EncryptionClientConfig( + keyring=keyring, + enable_delayed_authentication=True, + ) + s3ec = S3EncryptionClient(wrapped_s3_client=s3_client, config=config) + + # 3. Encrypt and upload the object. + s3ec.put_object(Bucket=bucket, Key=key, Body=EXAMPLE_DATA) + + # 4. Stream the decrypted object back in chunks. + # With delayed authentication, plaintext is released incrementally + # without buffering the entire object in memory. + response = s3ec.get_object(Bucket=bucket, Key=key) + body = response["Body"] + + chunks = [] + while True: + chunk = body.read(CHUNK_SIZE) + if not chunk: + break + chunks.append(chunk) + + plaintext = b"".join(chunks) + + # 5. Verify the decrypted content matches the original. + assert plaintext == EXAMPLE_DATA, "Decrypted plaintext does not match original data" diff --git a/examples/src/instruction_file_example.py b/examples/src/instruction_file_example.py new file mode 100644 index 00000000..b93e3307 --- /dev/null +++ b/examples/src/instruction_file_example.py @@ -0,0 +1,56 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +""" +This example demonstrates decrypting S3 objects that store their encryption +metadata in instruction files rather than S3 object metadata. + +An instruction file is a companion S3 object that contains the encryption +metadata (encrypted data key, IV, algorithm, etc.) as JSON. By default, +the instruction file has the same key as the encrypted object with a +".instruction" suffix appended. + +You can also use a custom instruction file suffix. This requires configuring +the S3 Encryption Client with the matching suffix. + +This example: +1. Decrypts an object using the default instruction file suffix (".instruction") +2. Decrypts the same object using a custom instruction file suffix +""" + +from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig +from s3_encryption.materials.kms_keyring import KmsKeyring + + +def instruction_file_get( + s3_client, kms_client, kms_key_id: str, bucket: str, key: str, expected_plaintext: bytes +): + """Demonstrate decrypting objects with default and custom instruction file suffixes. + + Args: + s3_client: boto3 S3 client. + kms_client: boto3 KMS client. + kms_key_id: KMS key ARN or alias used to encrypt the object. + bucket: S3 bucket containing the encrypted object and instruction files. + key: S3 object key of the encrypted object. + expected_plaintext: Expected plaintext content for verification. + """ + keyring = KmsKeyring(kms_client=kms_client, kms_key_id=kms_key_id) + + # 1. Decrypt using the default instruction file suffix (".instruction"). + # The client will fetch ".instruction" for the encryption metadata. + config = S3EncryptionClientConfig(keyring=keyring) + s3ec = S3EncryptionClient(wrapped_s3_client=s3_client, config=config) + + response = s3ec.get_object(Bucket=bucket, Key=key) + plaintext = response["Body"].read() + assert plaintext == expected_plaintext, "Default suffix: decrypted plaintext does not match" + + # 2. Decrypt using a custom instruction file suffix. + # The client will fetch ".custom-suffix-instruction" for the encryption metadata. + # InstructionFileSuffix is a per-request keyword argument on get_object, + # so the same client can use different suffixes per request. + response = s3ec.get_object( + Bucket=bucket, Key=key, InstructionFileSuffix=".custom-suffix-instruction" + ) + plaintext = response["Body"].read() + assert plaintext == expected_plaintext, "Custom suffix: decrypted plaintext does not match" diff --git a/examples/src/kms_keyring_put_get_example.py b/examples/src/kms_keyring_put_get_example.py new file mode 100644 index 00000000..a55158ef --- /dev/null +++ b/examples/src/kms_keyring_put_get_example.py @@ -0,0 +1,81 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +""" +This example demonstrates a basic put/get roundtrip using the S3 Encryption Client +with a KMS Keyring. + +The KMS Keyring uses a symmetric KMS key to generate and decrypt data keys. +The S3 Encryption Client encrypts the object before uploading to S3 and decrypts +it on download, so the data is protected at rest. + +This example: +1. Creates a KMS Keyring with the provided KMS key ID +2. Wraps a boto3 S3 client with the S3 Encryption Client +3. Creates an encryption context bound to the S3 bucket and key +4. Puts an encrypted object to S3 +5. Gets and decrypts the object from S3 +6. Verifies the decrypted plaintext matches the original + +Here is an example KMS Key Policy statement that would validate the +Encryption Context used in this example:: + + Sid: RestrictToEncryptionContextBucket + Effect: Allow + Principal: + AWS: "arn:aws:iam:::role/" + Action: + - kms:GenerateDataKey + - kms:Decrypt + Resource: "*" + Condition: + StringEquals: + "kms:EncryptionContext:aws-s3-bucket": "" +""" + +from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig +from s3_encryption.materials.kms_keyring import KmsKeyring + +EXAMPLE_DATA: bytes = b"Hello, S3 Encryption Client!" + + +def kms_keyring_put_get(s3_client, kms_client, kms_key_id: str, bucket: str, key: str): + """Demonstrate an encrypt/decrypt cycle using a KMS Keyring with S3. + + Args: + s3_client: boto3 S3 client. + kms_client: boto3 KMS client. + kms_key_id: KMS key ARN or alias to use for encryption/decryption. + bucket: S3 bucket name. + key: S3 object key. + """ + # 1. Create a KMS Keyring. + keyring = KmsKeyring(kms_client=kms_client, kms_key_id=kms_key_id) + + # 2. Wrap the S3 client with the S3 Encryption Client. + # The default commitment policy is REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + # which enforces key-committing algorithm suites on both encrypt and decrypt. + config = S3EncryptionClientConfig(keyring=keyring) + s3ec = S3EncryptionClient(wrapped_s3_client=s3_client, config=config) + + # 3. Create an encryption context. + # The encryption context is a set of key-value pairs that are bound to the ciphertext. + # Including the bucket and key ensures the ciphertext is tied to this specific S3 object. + # This will also be visible to KMS when evaluating key policies. + # See the example KMS Key Policy in this module's docstring. + # The encryption context is optional, but strongly recommended. + encryption_context = { + "aws-s3-bucket": bucket, + "aws-s3-key": key, + } + + # 4. Put an encrypted object. + s3ec.put_object(Bucket=bucket, Key=key, Body=EXAMPLE_DATA, EncryptionContext=encryption_context) + + # 5. Get and decrypt the object. + # If you specified an encryption context during encryption, + # you must provide the same encryption context during decryption. + response = s3ec.get_object(Bucket=bucket, Key=key, EncryptionContext=encryption_context) + plaintext = response["Body"].read() + + # 6. Optional Verify the decrypted plaintext matches the original. + assert plaintext == EXAMPLE_DATA, "Decrypted plaintext does not match original data" diff --git a/examples/src/legacy_decrypt_example.py b/examples/src/legacy_decrypt_example.py new file mode 100644 index 00000000..cbccc96b --- /dev/null +++ b/examples/src/legacy_decrypt_example.py @@ -0,0 +1,60 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +""" +This example demonstrates how to decrypt legacy S3 objects that were encrypted +using older versions of the S3 Encryption Client (V1 or V2). + +Legacy objects use the KmsV1 wrapping algorithm and may use unauthenticated +content encryption (AES-CBC). To decrypt these objects, you must: +1. Enable legacy wrapping algorithms on the KMS Keyring +2. Enable legacy unauthenticated modes on the S3 Encryption Client config +3. Use a commitment policy that allows non-key-committing algorithm suites + +This example: +1. Creates a KMS Keyring with legacy wrapping algorithms enabled +2. Configures the S3 Encryption Client with legacy decryption support +3. Decrypts a legacy V1 object from S3 +4. Verifies the decrypted plaintext matches the expected content +""" + +from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig +from s3_encryption.materials.kms_keyring import KmsKeyring +from s3_encryption.materials.materials import CommitmentPolicy + + +def decrypt_legacy_object(s3_client, kms_client, kms_key_id: str, bucket: str, key: str): + """Decrypt a legacy S3 object encrypted by an older S3 Encryption Client. + + Args: + s3_client: boto3 S3 client. + kms_client: boto3 KMS client. + kms_key_id: KMS key ARN or alias used to encrypt the object. + bucket: S3 bucket name. + key: S3 object key. + + Returns: + Decrypted plaintext bytes. + """ + # 1. Create a KMS Keyring with legacy wrapping algorithms enabled. + # This allows the keyring to decrypt data keys wrapped using the KmsV1 mode, + # which older S3 Encryption Clients used. + keyring = KmsKeyring( + kms_client=kms_client, + kms_key_id=kms_key_id, + enable_legacy_wrapping_algorithms=True, + ) + + # 2. Configure the S3 Encryption Client for legacy decryption. + # - enable_legacy_unauthenticated_modes: allows decryption of AES-CBC content + # - REQUIRE_ENCRYPT_ALLOW_DECRYPT: new objects are encrypted with key-committing + # algorithm suites, while still allowing decryption of legacy objects + config = S3EncryptionClientConfig( + keyring=keyring, + enable_legacy_unauthenticated_modes=True, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT, + ) + s3ec = S3EncryptionClient(wrapped_s3_client=s3_client, config=config) + + # 3. Decrypt the legacy object. + response = s3ec.get_object(Bucket=bucket, Key=key) + return response["Body"].read() diff --git a/examples/test/__init__.py b/examples/test/__init__.py new file mode 100644 index 00000000..f94fd12a --- /dev/null +++ b/examples/test/__init__.py @@ -0,0 +1,2 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 diff --git a/examples/test/test_i_delayed_auth_streaming_example.py b/examples/test/test_i_delayed_auth_streaming_example.py new file mode 100644 index 00000000..501c7be0 --- /dev/null +++ b/examples/test/test_i_delayed_auth_streaming_example.py @@ -0,0 +1,27 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Test suite for the delayed auth streaming decrypt example.""" +import boto3 +import pytest + +from ..src.delayed_auth_streaming_example import delayed_auth_streaming_decrypt + +pytestmark = [pytest.mark.examples] + +BUCKET = "s3ec-python-github-test-bucket" +KEY = "examples/delayed-auth-streaming" +KMS_KEY_ID = "arn:aws:kms:us-west-2:370957321024:alias/S3EC-Python-Github-KMS-Key" + + +def test_delayed_auth_streaming_decrypt(): + s3_client = boto3.client("s3", region_name="us-west-2") + kms_client = boto3.client("kms", region_name="us-west-2") + delayed_auth_streaming_decrypt( + s3_client=s3_client, + kms_client=kms_client, + kms_key_id=KMS_KEY_ID, + bucket=BUCKET, + key=KEY, + ) + # Clean up + s3_client.delete_object(Bucket=BUCKET, Key=KEY) diff --git a/examples/test/test_i_instruction_file_example.py b/examples/test/test_i_instruction_file_example.py new file mode 100644 index 00000000..938147f8 --- /dev/null +++ b/examples/test/test_i_instruction_file_example.py @@ -0,0 +1,27 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Test suite for the instruction file example.""" + +import boto3 +import pytest + +from ..src.instruction_file_example import instruction_file_get + +pytestmark = [pytest.mark.examples] + +BUCKET = "s3ec-static-test-objects" +KEY = "static-v3-instruction-file-from-java-v4" +KMS_KEY_ID = "arn:aws:kms:us-west-2:370957321024:key/a3889cd9-99eb-4138-a93a-aea9d52ec2ef" + + +def test_instruction_file_get(): + s3_client = boto3.client("s3", region_name="us-west-2") + kms_client = boto3.client("kms", region_name="us-west-2") + instruction_file_get( + s3_client=s3_client, + kms_client=kms_client, + kms_key_id=KMS_KEY_ID, + bucket=BUCKET, + key=KEY, + expected_plaintext=KEY.encode("utf-8"), + ) diff --git a/examples/test/test_i_kms_keyring_put_get_example.py b/examples/test/test_i_kms_keyring_put_get_example.py new file mode 100644 index 00000000..08759041 --- /dev/null +++ b/examples/test/test_i_kms_keyring_put_get_example.py @@ -0,0 +1,27 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Test suite for the KMS Keyring put/get example.""" +import boto3 +import pytest + +from ..src.kms_keyring_put_get_example import kms_keyring_put_get + +pytestmark = [pytest.mark.examples] + +BUCKET = "s3ec-python-github-test-bucket" +KEY = "examples/kms-keyring-put-get" +KMS_KEY_ID = "arn:aws:kms:us-west-2:370957321024:alias/S3EC-Python-Github-KMS-Key" + + +def test_kms_keyring_put_get(): + s3_client = boto3.client("s3", region_name="us-west-2") + kms_client = boto3.client("kms", region_name="us-west-2") + kms_keyring_put_get( + s3_client=s3_client, + kms_client=kms_client, + kms_key_id=KMS_KEY_ID, + bucket=BUCKET, + key=KEY, + ) + # Clean up + s3_client.delete_object(Bucket=BUCKET, Key=KEY) diff --git a/examples/test/test_i_legacy_decrypt_example.py b/examples/test/test_i_legacy_decrypt_example.py new file mode 100644 index 00000000..93f67d0c --- /dev/null +++ b/examples/test/test_i_legacy_decrypt_example.py @@ -0,0 +1,27 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Test suite for the legacy decrypt example.""" +import boto3 +import pytest + +from ..src.legacy_decrypt_example import decrypt_legacy_object + +pytestmark = [pytest.mark.examples] + +BUCKET = "s3ec-static-test-objects" +KEY = "static-v1-instruction-file-from-java-v1" +KMS_KEY_ID = "arn:aws:kms:us-west-2:370957321024:key/a3889cd9-99eb-4138-a93a-aea9d52ec2ef" + + +def test_decrypt_legacy_object(): + s3_client = boto3.client("s3", region_name="us-west-2") + kms_client = boto3.client("kms", region_name="us-west-2") + plaintext = decrypt_legacy_object( + s3_client=s3_client, + kms_client=kms_client, + kms_key_id=KMS_KEY_ID, + bucket=BUCKET, + key=KEY, + ) + assert plaintext == KEY.encode("utf-8") + # Avoid deleting the static object, it is used in the integration tests diff --git a/pyproject.toml b/pyproject.toml index 93fcbfcf..781e89e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -68,3 +68,8 @@ source = ["src/s3_encryption"] [tool.coverage.report] show_missing = true + +[tool.pytest.ini_options] +markers = [ + "examples: S3 Encryption Client example tests", +] diff --git a/src/s3_encryption/__init__.py b/src/s3_encryption/__init__.py index f2af6d7a..872b3f1a 100644 --- a/src/s3_encryption/__init__.py +++ b/src/s3_encryption/__init__.py @@ -26,15 +26,41 @@ _CTX_KEY = "key" _CTX_S3_CLIENT = "s3_client" _CTX_INSTRUCTION_FILE_MODE = "instruction_file_mode" +_CTX_INSTRUCTION_FILE_SUFFIX = "instruction_file_suffix" # Attributes to clean up after get_object completes # (s3_client is intentionally excluded — it is not request-scoped) -_GET_OBJECT_CLEANUP_ATTRS = (_CTX_ENCRYPTION_CONTEXT, _CTX_BUCKET, _CTX_KEY) +_GET_OBJECT_CLEANUP_ATTRS = ( + _CTX_ENCRYPTION_CONTEXT, + _CTX_BUCKET, + _CTX_KEY, + _CTX_INSTRUCTION_FILE_SUFFIX, +) @define class S3EncryptionClientConfig: - """Configuration object for the S3 Encryption Client.""" + """Configuration for the S3 Encryption Client. + + Attributes: + keyring: Keyring used for encrypting/decrypting data keys. + encryption_algorithm: Algorithm suite for encryption. Defaults to + ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY (V3 key-committing). + commitment_policy: Key commitment policy for encryption and decryption. + Defaults to REQUIRE_ENCRYPT_REQUIRE_DECRYPT. + enable_legacy_unauthenticated_modes: If True, allow decryption of objects + encrypted with legacy CBC algorithm suites. Defaults to False. + cmm: Crypto materials manager. Defaults to a DefaultCryptoMaterialsManager + wrapping the provided keyring. + enable_delayed_authentication: If True, release plaintext from streams + before GCM tag verification. Defaults to False. Has no effect for + CBC encrypted ciphertext, which is always streamed as there is no + authentication tag. + + Raises: + S3EncryptionClientError: If the encryption algorithm is legacy, or if + the algorithm suite is incompatible with the commitment policy. + """ keyring: AbstractKeyring encryption_algorithm: AlgorithmSuite = field( @@ -49,16 +75,15 @@ class S3EncryptionClientConfig: ##% The option to enable legacy unauthenticated modes MUST be set to false by default. enable_legacy_unauthenticated_modes: bool = field(default=False) cmm: AbstractCryptoMaterialsManager = field() - ##= specification/s3-encryption/data-format/metadata-strategy.md#instruction-file - ##= type=implementation - ##% The S3EC SHOULD support providing a custom Instruction File suffix - ##% on GetObject requests, regardless of whether or not re-encryption is supported. - ##= specification/s3-encryption/data-format/metadata-strategy.md#instruction-file + ##= specification/s3-encryption/client.md#enable-delayed-authentication ##= type=implementation - ##% The default Instruction File behavior uses the same S3 object key - ##% as its associated object suffixed with ".instruction". - instruction_file_suffix: str = field(default=".instruction") + ##% The S3EC MUST support the option to enable or disable Delayed Authentication mode. + + ##= specification/s3-encryption/client.md#enable-delayed-authentication + ##= type=implication + ##% Delayed Authentication mode MUST be set to false by default. + enable_delayed_authentication: bool = field(default=False) @cmm.default def _default_cmm_for_keyring(self): @@ -197,10 +222,18 @@ def on_get_object_after_call(self, parsed, **kwargs): # The parsed response already has the Body as a StreamingBody # We need to read it, decrypt it, and replace it + # content_length is going to the cipher-text's content length + content_length = parsed.get("ContentLength") + if content_length is None: + obj_key = getattr(self._context, _CTX_KEY, None) + raise S3EncryptionClientError( + f"S3 response is missing ContentLength and is invalid. Key: {obj_key}" + ) # Create a response dict that matches what the pipeline expects response = { "Body": parsed.get("Body"), "Metadata": parsed.get("Metadata", {}), + "ContentLength": content_length, } # Create a pipeline and decrypt the data @@ -212,18 +245,15 @@ def on_get_object_after_call(self, parsed, **kwargs): ) decrypted_data = pipeline.decrypt( response, - encryption_context, + instruction_suffix=getattr(self._context, _CTX_INSTRUCTION_FILE_SUFFIX, ".instruction"), + enable_delayed_authentication=self.config.enable_delayed_authentication, + encryption_context=encryption_context, bucket=getattr(self._context, _CTX_BUCKET, None), key=getattr(self._context, _CTX_KEY, None), - instruction_suffix=self.config.instruction_file_suffix, ) - # Create a new streaming body with the decrypted data - stream = io.BytesIO(decrypted_data) - streaming_body = StreamingBody(stream, len(decrypted_data)) - - # Replace body with decrypted data - parsed["Body"] = streaming_body + # Replace body with decrypting stream + parsed["Body"] = decrypted_data def process_instruction_file(self, parsed): """Process instruction file in plaintext mode. @@ -316,6 +346,93 @@ def put_object(self, **kwargs): if hasattr(self._plugin._context, _CTX_ENCRYPTION_CONTEXT): delattr(self._plugin._context, _CTX_ENCRYPTION_CONTEXT) + ##= specification/s3-encryption/client.md#required-api-operations + ##% - DeleteObject MUST be implemented by the S3EC. + def delete_object(self, **kwargs): + """Delete an object and its associated instruction file from S3. + + Args: + **kwargs: Arguments to pass to the S3 client's delete_object method. + Must include Bucket and Key parameters. + May include InstructionFileSuffix to override the default + ".instruction" suffix for instruction file deletion. + + Returns: + The response from the S3 client's delete_object call for the object. + + Raises: + S3EncryptionClientError: If the delete operation fails. + """ + ##= specification/s3-encryption/data-format/metadata-strategy.md#instruction-file + ##= type=implementation + ##% The default Instruction File behavior uses the same S3 object key + ##% as its associated object suffixed with ".instruction". + instruction_file_suffix = kwargs.pop("InstructionFileSuffix", ".instruction") + + try: + ##= specification/s3-encryption/client.md#required-api-operations + ##= type=implementation + ##% - DeleteObject MUST delete the given object key. + response = self.wrapped_s3_client.delete_object(**kwargs) + + ##= specification/s3-encryption/client.md#required-api-operations + ##= type=implementation + ##% - DeleteObject MUST delete the associated instruction file + ##% using the default instruction file suffix. + instruction_key = kwargs["Key"] + instruction_file_suffix + self.wrapped_s3_client.delete_object(Bucket=kwargs["Bucket"], Key=instruction_key) + + return response + except S3EncryptionClientError: + raise + except Exception as e: + raise S3EncryptionClientError(f"Failed to delete object: {str(e)}") from e + + ##= specification/s3-encryption/client.md#required-api-operations + ##% - DeleteObjects MUST be implemented by the S3EC. + def delete_objects(self, **kwargs): + """Delete multiple objects and their associated instruction files from S3. + + 2 requests are issued, one for the objects, and one for the instruction files. + If either requests fail, the operation fails, and maybe tried again to clean up any missed files. + + Args: + **kwargs: Arguments to pass to the S3 client's delete_objects method. + Must include Bucket and Delete (with Objects list) parameters. + May include InstructionFileSuffix to override the default + ".instruction" suffix for instruction file deletion. + + Returns: + The response from the S3 client's delete_objects call for the objects. + + Raises: + S3EncryptionClientError: If either delete operations fails. + """ + instruction_file_suffix = kwargs.pop("InstructionFileSuffix", ".instruction") + + try: + ##= specification/s3-encryption/client.md#required-api-operations + ##= type=implementation + ##% - DeleteObjects MUST delete each of the given objects. + response = self.wrapped_s3_client.delete_objects(**kwargs) + + ##= specification/s3-encryption/client.md#required-api-operations + ##= type=implementation + ##% - DeleteObjects MUST delete each of the corresponding instruction files + ##% using the default instruction file suffix. + instruction_objects = [ + {"Key": obj["Key"] + instruction_file_suffix} for obj in kwargs["Delete"]["Objects"] + ] + self.wrapped_s3_client.delete_objects( + Bucket=kwargs["Bucket"], Delete={"Objects": instruction_objects} + ) + + return response + except S3EncryptionClientError: + raise + except Exception as e: + raise S3EncryptionClientError(f"Failed to delete objects: {str(e)}") from e + def get_object(self, **kwargs): """Download and decrypt an object from S3. @@ -325,6 +442,8 @@ def get_object(self, **kwargs): Args: **kwargs: Arguments to pass to the S3 client's get_object method. May include EncryptionContext if it was used during encryption. + May include InstructionFileSuffix to override the default + ".instruction" suffix for instruction file lookups. Returns: The response from the S3 client's get_object method with the Body @@ -335,9 +454,20 @@ def get_object(self, **kwargs): """ # Extract EncryptionContext if provided (not a standard S3 parameter) encryption_context = kwargs.pop("EncryptionContext", None) + ##= specification/s3-encryption/data-format/metadata-strategy.md#instruction-file + ##= type=implementation + ##% The S3EC SHOULD support providing a custom Instruction File suffix + ##% on GetObject requests, regardless of whether or not re-encryption is supported. + + ##= specification/s3-encryption/data-format/metadata-strategy.md#instruction-file + ##= type=implementation + ##% The default Instruction File behavior uses the same S3 object key + ##% as its associated object suffixed with ".instruction". + instruction_file_suffix = kwargs.pop("InstructionFileSuffix", ".instruction") # Store encryption context in thread-local storage for the event handler setattr(self._plugin._context, _CTX_ENCRYPTION_CONTEXT, encryption_context) + setattr(self._plugin._context, _CTX_INSTRUCTION_FILE_SUFFIX, instruction_file_suffix) # Store wrapped client in thread-local storage for # the event handler to fetch instruction files setattr(self._plugin._context, _CTX_S3_CLIENT, self.wrapped_s3_client) diff --git a/src/s3_encryption/materials/materials.py b/src/s3_encryption/materials/materials.py index f2e8fd4f..80f682f0 100644 --- a/src/s3_encryption/materials/materials.py +++ b/src/s3_encryption/materials/materials.py @@ -172,6 +172,26 @@ def kc_gcm_iv(self) -> bytes: ##% the IV used in the AES-GCM content encryption/decryption MUST consist entirely of bytes with the value 0x01. return b"\x01" * self.cipher_iv_length_bytes + @property + def cipher_block_size_bits(self) -> int: + """Block size of the cipher in bits.""" + return self._cipher_block_size_bits + + @property + def cipher_block_size_bytes(self) -> int: + """Block size of the cipher in bytes.""" + return self._cipher_block_size_bits // 8 + + @property + def cipher_tag_length_bits(self) -> int: + """Authentication tag length of the cipher in bits.""" + return self._cipher_tag_length_bits + + @property + def cipher_tag_length_bytes(self) -> int: + """Authentication tag length of the cipher in bytes.""" + return self._cipher_tag_length_bits // 8 + class CommitmentPolicy(Enum): """Commitment policies controlling key-commitment behavior.""" diff --git a/src/s3_encryption/pipelines.py b/src/s3_encryption/pipelines.py index d0e9ba79..59428138 100644 --- a/src/s3_encryption/pipelines.py +++ b/src/s3_encryption/pipelines.py @@ -15,7 +15,7 @@ from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.padding import PKCS7 -from .exceptions import S3EncryptionClientError, S3EncryptionClientSecurityError +from .exceptions import S3EncryptionClientError from .instruction_file import fetch_instruction_file from .key_derivation import derive_keys, verify_commitment from .materials.crypto_materials_manager import AbstractCryptoMaterialsManager @@ -27,6 +27,11 @@ EncryptionMaterials, ) from .metadata import ObjectMetadata +from .stream import ( + CBCDecryptingStream, + GCMBufferedDecryptingStream, + GCMDelayedAuthDecryptingStream, +) @define @@ -222,26 +227,28 @@ def _determine_algorithm_suite(self, metadata) -> AlgorithmSuite: def decrypt( self, response, + instruction_suffix, + enable_delayed_authentication, encryption_context=None, bucket=None, key=None, - instruction_suffix=".instruction", ): """Decrypt the data after it is retrieved from S3. Args: response (dict): The response from S3 containing the encrypted data and metadata + instruction_suffix (str): suffix for instruction file + enable_delayed_authentication (bool): If True, release plaintext before GCM tag verification. encryption_context (dict, optional): Additional context for decryption bucket (str, optional): S3 bucket name (required for instruction file) key (str, optional): S3 object key (required for instruction file) - instruction_suffix(str, optional): suffix for instruction file; defaults to ".instruction". Returns: - bytes: The decrypted data + A decrypting stream (BufferedDecryptingStream or DelayedAuthDecryptingStream). """ # Convert the metadata dictionary to an ObjectMetadata instance - # TODO: Stream + Buffered Decryption - encrypted_data = response.get("Body").read() + streaming_body = response.get("Body") + content_length = response.get("ContentLength") encryption_metadata = response.get("Metadata", {}) metadata = ObjectMetadata.from_dict(encryption_metadata) @@ -254,10 +261,12 @@ def decrypt( if self.s3_client is None: raise S3EncryptionClientError("s3_client required to fetch instruction file") - # TODO: we should validate that these parameters must be None - # when not in instruction file mode. if bucket is None or key is None: raise S3EncryptionClientError("Bucket and key required to fetch instruction file") + if instruction_suffix is None: + raise S3EncryptionClientError( + "instruction_suffix required to fetch instruction file" + ) instruction_key = key + instruction_suffix instruction_metadata = fetch_instruction_file(self.s3_client, bucket, instruction_key) @@ -380,24 +389,141 @@ def decrypt( ##= type=implementation ##% When the commitment policy is REQUIRE_ENCRYPT_ALLOW_DECRYPT, the S3EC MUST allow decryption using algorithm suites which do not support key commitment. - # Perform decryption based on algorithm suite + if enable_delayed_authentication is None: + raise S3EncryptionClientError("enable_delayed_authentication must be explicitly set") + + # Build cipher decryptor and return streaming wrapper based on algorithm suite match dec_materials.algorithm_suite: case AlgorithmSuite.ALG_AES_256_CBC_IV16_NO_KDF: - return self._decrypt_cbc_content(dec_materials, encrypted_data) + ##= specification/s3-encryption/decryption.md#cbc-decryption + ##= type=implementation + ##% If an object is encrypted with ALG_AES_256_CBC_IV16_NO_KDF and + ##% [legacy unauthenticated algorithm suites](#legacy-decryption) is enabled, + ##% then the S3EC MUST create a cipher with AES in CBC Mode with PKCS5Padding or + ##% PKCS7Padding compatible padding for a 16-byte block cipher + ##% (example: for the Java JCE, this is "AES/CBC/PKCS5Padding"). + ##= specification/s3-encryption/decryption.md#cbc-decryption + ##= type=implementation + ##% If the cipher object cannot be created as described above, + ##% Decryption MUST fail. + ##= specification/s3-encryption/decryption.md#cbc-decryption + ##= type=implementation + ##% The error SHOULD detail why the cipher could not be initialized + ##% (such as CBC or PKCS5Padding is not supported by the underlying crypto provider). + cipher = Cipher( + algorithms.AES(dec_materials.plaintext_data_key), + modes.CBC(dec_materials.iv), + ) + decryptor = cipher.decryptor() + # Remove PKCS7 padding (compatible with PKCS5Padding for 16-byte block ciphers) + unpadder = PKCS7(dec_materials.algorithm_suite.cipher_block_size_bits).unpadder() + return CBCDecryptingStream( + streaming_body, decryptor, unpadder=unpadder, content_length=content_length + ) case AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF: ##= specification/s3-encryption/encryption.md#alg-aes-256-gcm-iv12-tag16-no-kdf ##= type=implementation ##% The client MUST NOT provide any AAD when encrypting with ##% ALG_AES_256_GCM_IV12_TAG16_NO_KDF. - aesgcm = AESGCM(dec_materials.plaintext_data_key) - return aesgcm.decrypt( - nonce=dec_materials.iv, data=encrypted_data, associated_data=None + cipher = Cipher( + algorithms.AES(dec_materials.plaintext_data_key), modes.GCM(dec_materials.iv) + ) + decryptor = cipher.decryptor() + return self._make_decrypting_gcm_stream( + streaming_body, + decryptor, + tag_length=dec_materials.algorithm_suite.cipher_tag_length_bytes, + enable_delayed_authentication=enable_delayed_authentication, + content_length=content_length, ) case AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY: - return self._decrypt_kc_gcm_content(dec_materials, encrypted_data, metadata) + return self._decrypt_kc_gcm_streaming( + dec_materials, + metadata, + streaming_body, + enable_delayed_authentication=enable_delayed_authentication, + content_length=content_length, + ) case _: raise S3EncryptionClientError("Unknown algorithm suite!") + @staticmethod + def _make_decrypting_gcm_stream( + streaming_body, decryptor, tag_length, enable_delayed_authentication, content_length + ): + """Return the appropriate decrypting stream. + + When delayed auth is disabled, BufferedDecryptingStream buffers all + ciphertext and verifies before releasing any plaintext. + When delayed auth is enabled, the CBC or GCM specific stream is used. + """ + if enable_delayed_authentication: + return GCMDelayedAuthDecryptingStream( + streaming_body, + decryptor, + tag_length=tag_length, + content_length=content_length, + ) + ##= specification/s3-encryption/client.md#enable-delayed-authentication + ##= type=implementation + ##% When disabled the S3EC MUST NOT release plaintext from a stream which has not been authenticated. + return GCMBufferedDecryptingStream( + streaming_body, decryptor, tag_length=tag_length, content_length=content_length + ) + + def _decrypt_kc_gcm_streaming( + self, dec_materials, metadata, streaming_body, enable_delayed_authentication, content_length + ): + """Decrypt content encrypted with ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY. + + Performs HKDF key derivation, key commitment verification, then returns + a streaming decryptor. + """ + message_id = base64.b64decode(metadata.message_id_v3) + stored_commitment = base64.b64decode(metadata.key_commitment_v3) + + ##= specification/s3-encryption/decryption.md#decrypting-with-commitment + ##= type=implementation + ##% When using an algorithm suite which supports key commitment, the client MUST verify + ##% that the [derived key commitment](./key-derivation.md#hkdf-operation) contains the + ##% same bytes as the stored key commitment retrieved from the stored object's metadata. + ##= specification/s3-encryption/decryption.md#decrypting-with-commitment + ##= type=implementation + ##% When using an algorithm suite which supports key commitment, the client MUST verify the key commitment values match before deriving + ##% the [derived encryption key](./key-derivation.md#hkdf-operation). + derived_encryption_key, derived_commitment = derive_keys( + dec_materials.plaintext_data_key, message_id, dec_materials.algorithm_suite + ) + verify_commitment(stored_commitment, derived_commitment) + + ##= specification/s3-encryption/key-derivation.md#hkdf-operation + ##= type=implementation + ##% When encrypting or decrypting with ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + ##% the IV used in the AES-GCM content encryption/decryption MUST consist entirely of bytes with the value 0x01. + ##= specification/s3-encryption/key-derivation.md#hkdf-operation + ##= type=implementation + ##% The IV's total length MUST match the IV length defined by the algorithm suite. + ##= specification/s3-encryption/key-derivation.md#hkdf-operation + ##= type=implementation + ##% The client MUST initialize the cipher, or call an AES-GCM encryption API, with the derived encryption key, an IV containing only bytes with the value 0x01, + ##% and the tag length defined in the Algorithm Suite when encrypting or decrypting with ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY. + ##= specification/s3-encryption/key-derivation.md#hkdf-operation + ##= type=implementation + ##% The client MUST set the AAD to the Algorithm Suite ID represented as bytes. + cipher = Cipher( + algorithms.AES(derived_encryption_key), + modes.GCM(dec_materials.algorithm_suite.kc_gcm_iv), + ) + decryptor = cipher.decryptor() + decryptor.authenticate_additional_data(dec_materials.algorithm_suite.suite_id_bytes) + return self._make_decrypting_gcm_stream( + streaming_body, + decryptor, + tag_length=dec_materials.algorithm_suite.cipher_tag_length_bytes, + enable_delayed_authentication=enable_delayed_authentication, + content_length=content_length, + ) + def _decrypt_v2(self, metadata, encryption_context) -> DecryptionMaterials: """Prepare V2 decryption materials.""" return self._decrypt_v1_v2( @@ -440,40 +566,6 @@ def _decrypt_v1_v2( return self.cmm.decrypt_materials(dec_materials) - def _decrypt_cbc_content(self, dec_materials, encrypted_data): - """Decrypt content encrypted with ALG_AES_256_CBC_IV16_NO_KDF.""" - ##= specification/s3-encryption/decryption.md#cbc-decryption - ##= type=implementation - ##% If an object is encrypted with ALG_AES_256_CBC_IV16_NO_KDF and - ##% [legacy unauthenticated algorithm suites](#legacy-decryption) is enabled, - ##% then the S3EC MUST create a cipher with AES in CBC Mode with PKCS5Padding or - ##% PKCS7Padding compatible padding for a 16-byte block cipher - ##% (example: for the Java JCE, this is "AES/CBC/PKCS5Padding"). - ##= specification/s3-encryption/decryption.md#cbc-decryption - ##= type=implementation - ##% If the cipher object cannot be created as described above, - ##% Decryption MUST fail. - ##= specification/s3-encryption/decryption.md#cbc-decryption - ##= type=implementation - ##% The error SHOULD detail why the cipher could not be initialized - ##% (such as CBC or PKCS5Padding is not supported by the underlying crypto provider). - try: - cipher = Cipher( - algorithms.AES(dec_materials.plaintext_data_key), - modes.CBC(dec_materials.iv), - ) - decryptor = cipher.decryptor() - padded_plaintext = decryptor.update(encrypted_data) + decryptor.finalize() - - # Remove PKCS7 padding (compatible with PKCS5Padding for 16-byte block ciphers) - unpadder = PKCS7(128).unpadder() - return unpadder.update(padded_plaintext) + unpadder.finalize() - except Exception as e: - raise S3EncryptionClientSecurityError( - f"Failed to decrypt CBC content: {e}. " - "Ensure the underlying crypto provider supports AES/CBC/PKCS7Padding." - ) from e - ##= specification/s3-encryption/data-format/content-metadata.md#v3-only ##% The V3 format uses compression here such that each wrapping algorithm is represented by a two digit string. ##= specification/s3-encryption/data-format/content-metadata.md#v3-only @@ -527,50 +619,3 @@ def _decrypt_v3(self, metadata, encryption_context) -> DecryptionMaterials: ) return self.cmm.decrypt_materials(dec_materials) - - def _decrypt_kc_gcm_content(self, dec_materials, encrypted_data, metadata): - """Decrypt content encrypted with ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY. - - Performs HKDF key derivation, key commitment verification, and AES-GCM decryption. - """ - message_id = base64.b64decode(metadata.message_id_v3) - stored_commitment = base64.b64decode(metadata.key_commitment_v3) - - ##= specification/s3-encryption/encryption.md#alg-aes-256-gcm-hkdf-sha512-commit-key - ##= type=implementation - ##% The client MUST use HKDF to derive the key commitment value and the derived encrypting key as described in [Key Derivation](key-derivation.md). - derived_encryption_key, derived_commitment = derive_keys( - dec_materials.plaintext_data_key, message_id, dec_materials.algorithm_suite - ) - - ##= specification/s3-encryption/decryption.md#decrypting-with-commitment - ##= type=implementation - ##% When using an algorithm suite which supports key commitment, the client MUST verify - ##% that the [derived key commitment](./key-derivation.md#hkdf-operation) contains the - ##% same bytes as the stored key commitment retrieved from the stored object's metadata. - ##= specification/s3-encryption/decryption.md#decrypting-with-commitment - ##= type=implementation - ##% When using an algorithm suite which supports key commitment, the client MUST verify the key commitment values match before deriving - ##% the [derived encryption key](./key-derivation.md#hkdf-operation). - verify_commitment(stored_commitment, derived_commitment) - - ##= specification/s3-encryption/key-derivation.md#hkdf-operation - ##= type=implementation - ##% When encrypting or decrypting with ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, - ##% the IV used in the AES-GCM content encryption/decryption MUST consist entirely of bytes with the value 0x01. - ##= specification/s3-encryption/key-derivation.md#hkdf-operation - ##= type=implementation - ##% The IV's total length MUST match the IV length defined by the algorithm suite. - ##= specification/s3-encryption/key-derivation.md#hkdf-operation - ##= type=implementation - ##% The client MUST initialize the cipher, or call an AES-GCM encryption API, with the derived encryption key, an IV containing only bytes with the value 0x01, - ##% and the tag length defined in the Algorithm Suite when encrypting or decrypting with ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY. - ##= specification/s3-encryption/key-derivation.md#hkdf-operation - ##= type=implementation - ##% The client MUST set the AAD to the Algorithm Suite ID represented as bytes. - aesgcm = AESGCM(derived_encryption_key) - return aesgcm.decrypt( - nonce=dec_materials.algorithm_suite.kc_gcm_iv, - data=encrypted_data, - associated_data=dec_materials.algorithm_suite.suite_id_bytes, - ) diff --git a/src/s3_encryption/stream.py b/src/s3_encryption/stream.py new file mode 100644 index 00000000..ea02dc5b --- /dev/null +++ b/src/s3_encryption/stream.py @@ -0,0 +1,287 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Streaming decryption support for S3 Encryption Client.""" + +import io + +from attrs import define, field +from botocore.response import StreamingBody + +from .exceptions import S3EncryptionClientError + +##= specification/s3-encryption/client.md#set-buffer-size +##= type=exception +##= reason=Optional Feature that is a two-way door to implement later +##% The S3EC SHOULD accept a configurable buffer size which refers to the maximum ciphertext length in bytes to store in memory when Delayed Authentication mode is disabled. +##= specification/s3-encryption/client.md#set-buffer-size +##= type=exception +##= reason=Optional Feature that is a two-way door to implement later +##% If Delayed Authentication mode is enabled, and the buffer size has been set to a value other than its default, the S3EC MUST throw an exception. +##= specification/s3-encryption/client.md#set-buffer-size +##= type=exception +##= reason=Optional Feature that is a two-way door to implement later +##% If Delayed Authentication mode is disabled, and no buffer size is provided, the S3EC MUST set the buffer size to a reasonable default. + + +# slots=False because StreamingBody extends IOBase which already has __weakref__. +@define(slots=False) +class GCMBufferedDecryptingStream(StreamingBody): + """A stream that buffers all ciphertext, decrypts, then releases plaintext. + + Extends botocore's StreamingBody so it can be used as a drop-in replacement + for parsed["Body"], inheriting iter_chunks, iter_lines, __iter__, etc. + """ + + _body: object = field() + _decryptor: object = field() + _tag_length: int = field() + # _content_length intentionally collides with super's _content_length + _content_length: int = field() + _plaintext: object = field(init=False, default=None) + + def __attrs_post_init__(self): # noqa: D105 + # By passing in content_length, and updating _amount_read in read(), + # we support the super's normal progression. + # However, we do not support the super's _verify_content_length. + super().__init__(io.BytesIO(), content_length=self._content_length) + + def _decrypt(self): + """Read all ciphertext, decrypt and verify, cache plaintext.""" + if self._plaintext is not None: + return + try: + data = self._body.read() + if self._tag_length > 0: + ciphertext, tag = data[: -self._tag_length], data[-self._tag_length :] + plaintext = self._decryptor.update(ciphertext) + self._decryptor.finalize_with_tag( + tag + ) + else: + plaintext = self._decryptor.update(data) + self._decryptor.finalize() + except Exception as e: + raise S3EncryptionClientError(f"Failed to decrypt object: {e}") from e + self._plaintext = io.BytesIO(plaintext) + self._raw_stream = self._plaintext + + # Inherited iter_chunks, iter_lines, __iter__, and __next__ all delegate + # to self.read(), which calls _decrypt(). No override needed. + + def readable(self): # noqa: D102 + self._decrypt() + return self._raw_stream.readable() + + def read(self, amt=None): + """Reads the entire ciphertext stream and then returns decrypted data. + + Args: + amt: Number of bytes to read. If None, reads all remaining data. + + Returns: + bytes: Decrypted plaintext bytes. + """ + self._decrypt() + chunk = self._plaintext.read() if amt is None else self._plaintext.read(amt) + # super._amount_read can be used for progress tracking + # noinspection PyUnresolvedReferences + self._amount_read += len(chunk) + return chunk + + def readinto(self, b): # noqa: D102 + self._decrypt() + return self._raw_stream.readinto(b) + + def tell(self): # noqa: D102 + self._decrypt() + return self._raw_stream.tell() + + def __enter__(self): # noqa: D105 + self._decrypt() + return self + + def close(self): + """Close the underlying stream.""" + if hasattr(self._body, "close"): + self._body.close() + + +##= specification/s3-encryption/client.md#enable-delayed-authentication +##= type=implementation +##% When enabled, the S3EC MAY release plaintext from a stream which has not been authenticated. +# slots=False because StreamingBody extends IOBase which already has __weakref__. +@define(slots=False) +class CBCDecryptingStream(StreamingBody): + """A delayed-auth stream for AES-CBC decryption. + + Extends botocore's StreamingBody so it can be used as a drop-in replacement + for parsed["Body"], inheriting iter_chunks, iter_lines, __iter__, etc. + + CBC has no auth tag, so plaintext is released incrementally via + cipher.update(). A 1-byte peek-ahead detects stream exhaustion so the + PKCS7 unpadder can be finalized. + """ + + _body: object = field() + _decryptor: object = field() + _unpadder: object = field() + # _content_length intentionally collides with super's _content_length + _content_length: int = field() + _peek_buffer: bytes = field(init=False, default=b"") + _finalized: bool = field(init=False, default=False) + + def __attrs_post_init__(self): # noqa: D105 + # By passing in content_length, and updating _amount_read in read(), + # we support the super's normal progression. + # However, we do not support the super's _verify_content_length. + super().__init__(io.BytesIO(), content_length=self._content_length) + + # Inherited iter_chunks, iter_lines, __iter__, and __next__ all delegate + # to self.read(). No override needed. + + def readable(self): # noqa: D102 + return not self._finalized + + def read(self, amt=None): + """Read and decrypt CBC ciphertext, releasing plaintext incrementally.""" + # Stream already fully consumed and finalized; nothing left to return. + if self._finalized: + return b"" + + # Read the next chunk of raw ciphertext from the underlying stream. + raw = self._body.read(amt) + + # Prepend any previously held-back peek byte to the new data. + data = self._peek_buffer + raw + self._peek_buffer = b"" + + # No data at all; the stream is empty. + if not data: + return self._finalize() + + # Decrypt incrementally; plaintext is released immediately. + plaintext = self._decryptor.update(data) + plaintext = self._unpadder.update(plaintext) + + # Peek 1 byte ahead to detect stream exhaustion. If the stream + # is exhausted we must finalize now to flush the unpadder. + peek = self._body.read(1) + if peek: + # Stream continues; stash the peeked byte for the next read. + self._peek_buffer = peek + else: + # Stream exhausted; finalize to flush any remaining padding. + plaintext += self._finalize() + + # super._amount_read can be used for progress tracking + # noinspection PyUnresolvedReferences + self._amount_read += len(plaintext) + return plaintext + + def _finalize(self): + """Finalize CBC decryption and flush the unpadder.""" + self._finalized = True + try: + plaintext = self._decryptor.finalize() + return self._unpadder.update(plaintext) + self._unpadder.finalize() + except Exception as e: + raise S3EncryptionClientError(f"Failed to decrypt CBC content: {e}") from e + + def __enter__(self): # noqa: D105 + return self + + def close(self): + """Close the underlying stream.""" + if hasattr(self._body, "close"): + self._body.close() + + +##= specification/s3-encryption/client.md#enable-delayed-authentication +##= type=implementation +##% When enabled, the S3EC MAY release plaintext from a stream which has not been authenticated. +# slots=False because StreamingBody extends IOBase which already has __weakref__. +@define(slots=False) +class GCMDelayedAuthDecryptingStream(StreamingBody): + """A delayed-auth stream for AES-GCM decryption. + + Extends botocore's StreamingBody so it can be used as a drop-in replacement + for parsed["Body"], inheriting iter_chunks, iter_lines, __iter__, etc. + + Plaintext is released incrementally via cipher.update(). The content_length + from the S3 GetObject response tells us exactly how many bytes are ciphertext + vs. the trailing GCM auth tag. The tag is only verified via finalize_with_tag() when the ciphertext is + fully consumed. + """ + + _body: object = field() + _decryptor: object = field() + _tag_length: int = field() + # _content_length intentionally collides with super's _content_length + _content_length: int = field() + _ciphertext_remaining: int = field(init=False) + _finalized: bool = field(init=False, default=False) + + def __attrs_post_init__(self): # noqa: D105 + # By passing in content_length, and updating _amount_read in read(), + # we support the super's normal progression. + # However, we do not support the super's _verify_content_length. + super().__init__(io.BytesIO(), content_length=self._content_length) + self._ciphertext_remaining = self._content_length - self._tag_length + if self._ciphertext_remaining < 0: + raise S3EncryptionClientError( + f"Malformed Input: Content Length ({self._content_length}) is less than GCM tag length ({self._tag_length})" + ) + + # Inherited iter_chunks, iter_lines, __iter__, and __next__ all delegate + # to self.read(). No override needed. + + def readable(self): # noqa: D102 + return not self._finalized + + def read(self, amt=None): + """Read and decrypt GCM ciphertext, holding back the trailing auth tag.""" + # Stream already fully consumed and finalized; nothing left to return. + if self._finalized: + return b"" + + # No ciphertext left — read the tag and finalize. + if self._ciphertext_remaining <= 0: + return self._finalize() + + # Read at most ciphertext_remaining bytes (never into the tag). + to_read = ( + self._ciphertext_remaining if amt is None else min(amt, self._ciphertext_remaining) + ) + raw = self._body.read(to_read) + + if not raw: + return self._finalize() + + self._ciphertext_remaining -= len(raw) + plaintext = self._decryptor.update(raw) + + # If we've consumed all ciphertext, finalize now. + if self._ciphertext_remaining <= 0: + plaintext += self._finalize() + + # super._amount_read can be used for progress tracking + # noinspection PyUnresolvedReferences + self._amount_read += len(plaintext) + return plaintext + + def _finalize(self): + """Read the GCM tag from the stream and verify it.""" + if self._finalized: + return b"" + self._finalized = True + try: + tag = self._body.read(self._tag_length) + return self._decryptor.finalize_with_tag(tag) + except Exception as e: + raise S3EncryptionClientError(f"Failed to decrypt GCM content: {e}") from e + + def __enter__(self): # noqa: D105 + return self + + def close(self): + """Close the underlying stream.""" + if hasattr(self._body, "close"): + self._body.close() diff --git a/test/integration/test_i_s3_encryption.py b/test/integration/test_i_s3_encryption.py index 15133c05..36f826bd 100644 --- a/test/integration/test_i_s3_encryption.py +++ b/test/integration/test_i_s3_encryption.py @@ -252,3 +252,26 @@ def test_put_object_uses_configured_algorithm(algorithm_suite, commitment_policy meta_key, expected_value = _EXPECTED_ALGORITHM_METADATA[algorithm_suite] assert meta_key in metadata, f"Expected metadata key '{meta_key}' not found in {metadata}" assert metadata[meta_key] == expected_value + + +##= specification/s3-encryption/client.md#enable-delayed-authentication +##= type=test +##% The S3EC MUST support the option to enable or disable Delayed Authentication mode. +@pytest.mark.parametrize("enable_delayed_auth", [False, True], ids=["buffered", "delayed-auth"]) +def test_delayed_authentication_mode(enable_delayed_auth): + """S3EC MUST support enabling and disabling delayed authentication.""" + key = _unique_key("delayed-auth-mode-") + data = b"test delayed authentication mode" + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig( + keyring, + enable_delayed_authentication=enable_delayed_auth, + ) + s3ec = S3EncryptionClient(wrapped_client, config) + + s3ec.put_object(Bucket=bucket, Key=key, Body=data) + response = s3ec.get_object(Bucket=bucket, Key=key) + assert response["Body"].read() == data diff --git a/test/integration/test_i_s3_encryption_instruction_file.py b/test/integration/test_i_s3_encryption_instruction_file.py index f4f70704..0c8b2b2a 100644 --- a/test/integration/test_i_s3_encryption_instruction_file.py +++ b/test/integration/test_i_s3_encryption_instruction_file.py @@ -24,6 +24,8 @@ "v2_instruction_file": "static-v2-instruction-file-from-java-v4", "v3_instruction_file": "static-v3-instruction-file-from-java-v4", "negative_v2_instruction_file": "NEGATIVE-static-v2-instruction-file-test-from-java-v4", + "large_v2_instruction_file": "static-large-v2-instruction-file-from-java-v4-52428800", + "large_v3_instruction_file": "static-large-v3-instruction-file-from-java-v4-52428800", } @@ -53,7 +55,8 @@ def test_decrypt_v1_instruction_file(): print("Success! V1 instruction file decryption completed.") -def test_decrypt_v2_instruction_file(): +@pytest.mark.parametrize("delayed_auth", [False, True], ids=["buffered", "delayed-auth"]) +def test_decrypt_v2_instruction_file(delayed_auth): """Test decrypting V2 object with instruction file. V2 format uses ALG_AES_256_GCM_IV12_TAG16_NO_KDF (no key commitment). @@ -68,6 +71,7 @@ def test_decrypt_v2_instruction_file(): keyring, encryption_algorithm=AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + enable_delayed_authentication=delayed_auth, ) s3ec = S3EncryptionClient(wrapped_client, config) @@ -133,19 +137,21 @@ def test_decrypt_v3_instruction_file_custom_suffix(): wrapped_client = boto3.client("s3") config = S3EncryptionClientConfig( keyring, - instruction_file_suffix=".custom-suffix-instruction", commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, ) s3ec = S3EncryptionClient(wrapped_client, config) - response = s3ec.get_object(Bucket=bucket, Key=key) + response = s3ec.get_object( + Bucket=bucket, Key=key, InstructionFileSuffix=".custom-suffix-instruction" + ) output = response["Body"].read().decode("utf-8") assert output == "static-v3-instruction-file-from-java-v4" print("Success! V3 custom suffix instruction file decryption completed.") -def test_decrypt_v2_instruction_file_custom_suffix(): +@pytest.mark.parametrize("delayed_auth", [False, True], ids=["buffered", "delayed-auth"]) +def test_decrypt_v2_instruction_file_custom_suffix(delayed_auth): """Test decrypting V2 object with a custom instruction file suffix.""" key = TEST_OBJECTS["v2_instruction_file"] @@ -155,13 +161,61 @@ def test_decrypt_v2_instruction_file_custom_suffix(): config = S3EncryptionClientConfig( keyring, encryption_algorithm=AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, - instruction_file_suffix=".custom-suffix-instruction", commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + enable_delayed_authentication=delayed_auth, ) s3ec = S3EncryptionClient(wrapped_client, config) - response = s3ec.get_object(Bucket=bucket, Key=key) + response = s3ec.get_object( + Bucket=bucket, Key=key, InstructionFileSuffix=".custom-suffix-instruction" + ) output = response["Body"].read().decode("utf-8") assert output == "static-v2-instruction-file-from-java-v4" print("Success! V2 custom suffix instruction file decryption completed.") + + +LARGE_FILE_SIZE = 52428800 # 50 MB + + +def test_decrypt_large_v2_instruction_file_delayed_auth(): + """Test streaming decryption of a 50 MB V2 object with delayed authentication.""" + key = TEST_OBJECTS["large_v2_instruction_file"] + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig( + keyring, + enable_delayed_authentication=True, + encryption_algorithm=AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + ) + s3ec = S3EncryptionClient(wrapped_client, config) + + response = s3ec.get_object(Bucket=bucket, Key=key) + total = 0 + while chunk := response["Body"].read(65536): + total += len(chunk) + + assert total == LARGE_FILE_SIZE + + +# TODO(v3): enable once V3 decryption is implemented +@pytest.mark.skip(reason="V3 decryption not yet implemented") +def test_decrypt_large_v3_instruction_file_delayed_auth(): + """Test streaming decryption of a 50 MB V3 object with delayed authentication.""" + key = TEST_OBJECTS["large_v3_instruction_file"] + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig(keyring, enable_delayed_authentication=True) + s3ec = S3EncryptionClient(wrapped_client, config) + + response = s3ec.get_object(Bucket=bucket, Key=key) + total = 0 + while chunk := response["Body"].read(65536): + total += len(chunk) + + assert total == LARGE_FILE_SIZE diff --git a/test/integration/test_i_s3_encryption_streaming.py b/test/integration/test_i_s3_encryption_streaming.py new file mode 100644 index 00000000..553cd16b --- /dev/null +++ b/test/integration/test_i_s3_encryption_streaming.py @@ -0,0 +1,195 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Integration tests for streaming decryption modes (buffered vs delayed-auth). + +These tests verify that BufferedDecryptingGCMStream and DelayedAuthGCMDecryptingStream +produce correct plaintext for real S3 round-trips across algorithm suites. +""" + +import os +from datetime import datetime + +import boto3 +import pytest + +from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig +from s3_encryption.materials.kms_keyring import KmsKeyring +from s3_encryption.materials.materials import AlgorithmSuite, CommitmentPolicy +from s3_encryption.stream import ( + GCMBufferedDecryptingStream, + GCMDelayedAuthDecryptingStream, +) + +bucket = os.environ.get("CI_S3_BUCKET", "s3ec-python-github-test-bucket") +region = os.environ.get("CI_AWS_REGION", "us-west-2") +kms_key_id = os.environ.get( + "CI_KMS_KEY_ALIAS", "arn:aws:kms:us-west-2:370957321024:alias/S3EC-Python-Github-KMS-Key" +) + +GCM_CONFIGS = [ + pytest.param( + AlgorithmSuite.ALG_AES_256_GCM_IV12_TAG16_NO_KDF, + CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, + id="AES_GCM", + ), + pytest.param( + AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, + CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + id="KC_GCM", + ), +] + + +def _make_client(algorithm_suite, commitment_policy, delayed_auth): + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig( + keyring, + encryption_algorithm=algorithm_suite, + commitment_policy=commitment_policy, + enable_delayed_authentication=delayed_auth, + ) + return S3EncryptionClient(wrapped_client, config) + + +def _unique_key(prefix): + return prefix + datetime.now().strftime("%Y-%m-%d-%H:%M:%S-%f") + + +# --------------------------------------------------------------------------- +# Buffered mode: verifies tag before releasing plaintext +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("algorithm_suite,commitment_policy", GCM_CONFIGS) +def test_buffered_roundtrip(algorithm_suite, commitment_policy): + """Buffered mode decrypts correctly for a simple round-trip.""" + key = _unique_key("buffered-rt-") + data = b"buffered mode round trip test data" + + s3ec = _make_client(algorithm_suite, commitment_policy, delayed_auth=False) + s3ec.put_object(Bucket=bucket, Key=key, Body=data) + response = s3ec.get_object(Bucket=bucket, Key=key) + + body = response["Body"] + assert isinstance(body, GCMBufferedDecryptingStream) + assert body.read() == data + + +@pytest.mark.parametrize("algorithm_suite,commitment_policy", GCM_CONFIGS) +def test_buffered_partial_reads(algorithm_suite, commitment_policy): + """Buffered mode supports partial read(amt) calls.""" + key = _unique_key("buffered-partial-") + data = os.urandom(1024) + + s3ec = _make_client(algorithm_suite, commitment_policy, delayed_auth=False) + s3ec.put_object(Bucket=bucket, Key=key, Body=data) + response = s3ec.get_object(Bucket=bucket, Key=key) + + result = b"" + while chunk := response["Body"].read(100): + result += chunk + assert result == data + + +# --------------------------------------------------------------------------- +# Delayed-auth mode: releases plaintext before tag verification +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("algorithm_suite,commitment_policy", GCM_CONFIGS) +def test_delayed_auth_roundtrip(algorithm_suite, commitment_policy): + """Delayed-auth mode decrypts correctly for a simple round-trip.""" + key = _unique_key("delayed-rt-") + data = b"delayed auth round trip test data" + + s3ec = _make_client(algorithm_suite, commitment_policy, delayed_auth=True) + s3ec.put_object(Bucket=bucket, Key=key, Body=data) + response = s3ec.get_object(Bucket=bucket, Key=key) + + body = response["Body"] + assert isinstance(body, GCMDelayedAuthDecryptingStream) + assert body.read() == data + + +@pytest.mark.parametrize("algorithm_suite,commitment_policy", GCM_CONFIGS) +def test_delayed_auth_chunked_reads(algorithm_suite, commitment_policy): + """Delayed-auth mode supports chunked streaming reads.""" + key = _unique_key("delayed-chunked-") + data = os.urandom(4096) + + s3ec = _make_client(algorithm_suite, commitment_policy, delayed_auth=True) + s3ec.put_object(Bucket=bucket, Key=key, Body=data) + response = s3ec.get_object(Bucket=bucket, Key=key) + + result = b"" + while chunk := response["Body"].read(256): + result += chunk + assert result == data + + +# --------------------------------------------------------------------------- +# Both modes produce identical plaintext +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("algorithm_suite,commitment_policy", GCM_CONFIGS) +def test_buffered_and_delayed_produce_same_plaintext(algorithm_suite, commitment_policy): + """Both streaming modes must produce identical plaintext for the same object.""" + key = _unique_key("same-plaintext-") + data = os.urandom(2048) + + # Encrypt once + writer = _make_client(algorithm_suite, commitment_policy, delayed_auth=False) + writer.put_object(Bucket=bucket, Key=key, Body=data) + + # Decrypt with buffered + buffered = _make_client(algorithm_suite, commitment_policy, delayed_auth=False) + resp_buf = buffered.get_object(Bucket=bucket, Key=key) + plaintext_buf = resp_buf["Body"].read() + + # Decrypt with delayed-auth + delayed = _make_client(algorithm_suite, commitment_policy, delayed_auth=True) + resp_del = delayed.get_object(Bucket=bucket, Key=key) + plaintext_del = resp_del["Body"].read() + + assert plaintext_buf == plaintext_del == data + + +# --------------------------------------------------------------------------- +# Empty body +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("delayed_auth", [False, True], ids=["buffered", "delayed-auth"]) +@pytest.mark.parametrize("algorithm_suite,commitment_policy", GCM_CONFIGS) +def test_empty_body_roundtrip(algorithm_suite, commitment_policy, delayed_auth): + """Both modes handle empty plaintext correctly.""" + key = _unique_key("empty-stream-") + + s3ec = _make_client(algorithm_suite, commitment_policy, delayed_auth=delayed_auth) + s3ec.put_object(Bucket=bucket, Key=key, Body=b"") + response = s3ec.get_object(Bucket=bucket, Key=key) + assert response["Body"].read() == b"" + + +# --------------------------------------------------------------------------- +# Large object streaming +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("algorithm_suite,commitment_policy", GCM_CONFIGS) +def test_delayed_auth_large_object(algorithm_suite, commitment_policy): + """Delayed-auth streams a 1 MB object correctly via chunked reads.""" + key = _unique_key("delayed-large-") + data = os.urandom(1024 * 1024) # 1 MB + + s3ec = _make_client(algorithm_suite, commitment_policy, delayed_auth=True) + s3ec.put_object(Bucket=bucket, Key=key, Body=data) + response = s3ec.get_object(Bucket=bucket, Key=key) + + result = b"" + while chunk := response["Body"].read(65536): + result += chunk + assert result == data diff --git a/test/test_decryption.py b/test/test_decryption.py index 6d22d439..4f8941c0 100644 --- a/test/test_decryption.py +++ b/test/test_decryption.py @@ -106,7 +106,9 @@ def test_cbc_object_rejected_when_legacy_disabled(self): ) with pytest.raises(S3EncryptionClientError, match="ALG_AES_256_CBC_IV16_NO_KDF"): - pipeline.decrypt(_response(_v1_cbc_metadata())) + pipeline.decrypt( + _response(_v1_cbc_metadata()), ".instruction", enable_delayed_authentication=False + ) ##= specification/s3-encryption/decryption.md#cbc-decryption ##= type=test @@ -148,8 +150,10 @@ def test_cbc_decryption_succeeds_when_legacy_enabled(self): keyring_return=dec_mats, ) - result = pipeline.decrypt(_response(metadata, ciphertext)) - assert result == plaintext + result = pipeline.decrypt( + _response(metadata, ciphertext), ".instruction", enable_delayed_authentication=False + ) + assert result.read() == plaintext ##= specification/s3-encryption/decryption.md#cbc-decryption ##= type=test @@ -192,8 +196,10 @@ def test_cbc_decryption_fails_with_wrong_key(self): keyring_return=dec_mats, ) - with pytest.raises(S3EncryptionClientSecurityError, match="Failed to decrypt CBC content"): - pipeline.decrypt(_response(metadata, ciphertext)) + with pytest.raises(S3EncryptionClientError, match="Failed to decrypt CBC content"): + pipeline.decrypt( + _response(metadata, ciphertext), ".instruction", enable_delayed_authentication=False + ).read() # --------------------------------------------------------------------------- @@ -289,7 +295,11 @@ def test_commitment_verified_before_content_decryption(self): with pytest.raises( S3EncryptionClientSecurityError, match="Key commitment verification failed" ): - pipeline.decrypt(_response(metadata, b"fake-ciphertext")) + pipeline.decrypt( + _response(metadata, b"fake-ciphertext"), + ".instruction", + enable_delayed_authentication=False, + ) # --------------------------------------------------------------------------- @@ -322,7 +332,9 @@ def test_require_decrypt_rejects_non_committing_suite(self): ) with pytest.raises(S3EncryptionClientError, match="cannot decrypt non-key-committing"): - pipeline.decrypt(_response(_v2_gcm_metadata())) + pipeline.decrypt( + _response(_v2_gcm_metadata()), ".instruction", enable_delayed_authentication=False + ) def test_allow_decrypt_accepts_non_committing_suite(self): """REQUIRE_ENCRYPT_ALLOW_DECRYPT MUST allow non-committing algorithm suites.""" @@ -352,8 +364,10 @@ def test_allow_decrypt_accepts_non_committing_suite(self): keyring_return=dec_mats, ) - result = pipeline.decrypt(_response(metadata, ciphertext)) - assert result == plaintext + result = pipeline.decrypt( + _response(metadata, ciphertext), ".instruction", enable_delayed_authentication=False + ) + assert result.read() == plaintext # --------------------------------------------------------------------------- @@ -387,4 +401,6 @@ def test_legacy_cbc_rejected_by_default(self): ) with pytest.raises(S3EncryptionClientError, match="not configured to decrypt"): - pipeline.decrypt(_response(_v1_cbc_metadata())) + pipeline.decrypt( + _response(_v1_cbc_metadata()), ".instruction", enable_delayed_authentication=False + ) diff --git a/test/test_default_algorithm_commitment.py b/test/test_default_algorithm_commitment.py index 1640451a..0b55b9aa 100644 --- a/test/test_default_algorithm_commitment.py +++ b/test/test_default_algorithm_commitment.py @@ -91,5 +91,7 @@ def test_default_encryption_decryptable_with_require_decrypt(self): cmm, commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, ) - result = decrypt_pipeline.decrypt(response) - assert result == plaintext + result = decrypt_pipeline.decrypt( + response, ".instruction", enable_delayed_authentication=False + ) + assert result.read() == plaintext diff --git a/test/test_key_commitment.py b/test/test_key_commitment.py index a82fc9fd..79b50b9b 100644 --- a/test/test_key_commitment.py +++ b/test/test_key_commitment.py @@ -111,8 +111,8 @@ def test_forbid_encrypt_allows_non_committing_decrypt(self): commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT, keyring_return=dec_mats, ) - result = pipeline.decrypt(response) - assert result == plaintext + result = pipeline.decrypt(response, ".instruction", enable_delayed_authentication=False) + assert result.read() == plaintext ##= specification/s3-encryption/key-commitment.md#commitment-policy ##= type=test @@ -126,8 +126,8 @@ def test_require_encrypt_allow_decrypt_allows_non_committing_decrypt(self): commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT, keyring_return=dec_mats, ) - result = pipeline.decrypt(response) - assert result == plaintext + result = pipeline.decrypt(response, ".instruction", enable_delayed_authentication=False) + assert result.read() == plaintext ##= specification/s3-encryption/key-commitment.md#commitment-policy ##= type=test @@ -142,7 +142,7 @@ def test_require_require_rejects_non_committing_decrypt(self): keyring_return=dec_mats, ) with pytest.raises(S3EncryptionClientError, match="cannot decrypt non-key-committing"): - pipeline.decrypt(response) + pipeline.decrypt(response, ".instruction", enable_delayed_authentication=False) def test_require_require_allows_committing_decrypt(self): """REQUIRE_ENCRYPT_REQUIRE_DECRYPT MUST allow decryption with committing suites.""" @@ -153,5 +153,5 @@ def test_require_require_allows_committing_decrypt(self): commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, keyring_return=dec_mats, ) - result = pipeline.decrypt(response) - assert result == plaintext + result = pipeline.decrypt(response, ".instruction", enable_delayed_authentication=False) + assert result.read() == plaintext diff --git a/test/test_pipelines.py b/test/test_pipelines.py index 3d32b8cc..e3d34e35 100644 --- a/test/test_pipelines.py +++ b/test/test_pipelines.py @@ -66,7 +66,13 @@ def test_decrypt_v1_from_instruction_file(self): # Should fail when trying to decrypt (proving instruction file was fetched) with pytest.raises(Exception, match="Keyring called"): - pipeline.decrypt(mock_response, bucket="test-bucket", key="test-key") + pipeline.decrypt( + mock_response, + instruction_suffix=".instruction", + enable_delayed_authentication=False, + bucket="test-bucket", + key="test-key", + ) # Verify instruction file was fetched mock_s3_client.get_object.assert_called_once_with( @@ -125,7 +131,13 @@ def test_decrypt_v2_from_instruction_file(self): # Should fail when trying to decrypt (proving instruction file was fetched) with pytest.raises(Exception, match="Keyring called"): - pipeline.decrypt(mock_response, bucket="test-bucket", key="test-key") + pipeline.decrypt( + mock_response, + instruction_suffix=".instruction", + enable_delayed_authentication=False, + bucket="test-bucket", + key="test-key", + ) # Verify instruction file was fetched mock_s3_client.get_object.assert_called_once_with( @@ -199,7 +211,13 @@ def test_decrypt_v3_from_instruction_file(self): with pytest.raises( S3EncryptionClientSecurityError, match="Key commitment verification failed" ): - pipeline.decrypt(mock_response, bucket="test-bucket", key="test-key") + pipeline.decrypt( + mock_response, + instruction_suffix=".instruction", + enable_delayed_authentication=False, + bucket="test-bucket", + key="test-key", + ) # Verify instruction file was fetched mock_s3_client.get_object.assert_called_once_with( @@ -250,9 +268,10 @@ def test_decrypt_with_custom_instruction_file_suffix(self): with pytest.raises(Exception, match="Keyring called"): pipeline.decrypt( mock_response, + instruction_suffix=".custom-suffix", + enable_delayed_authentication=False, bucket="test-bucket", key="test-key", - instruction_suffix=".custom-suffix", ) mock_s3_client.get_object.assert_called_once_with( @@ -290,4 +309,4 @@ def test_decrypt_v3_unsupported_wrap_alg(self): with pytest.raises( S3EncryptionClientError, match="AES/GCM is not a valid key wrapping algorithm" ): - pipeline.decrypt(mock_response) + pipeline.decrypt(mock_response, ".instruction", enable_delayed_authentication=False) diff --git a/test/test_s3_encryption_client_delete.py b/test/test_s3_encryption_client_delete.py new file mode 100644 index 00000000..1279abab --- /dev/null +++ b/test/test_s3_encryption_client_delete.py @@ -0,0 +1,108 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Unit tests for S3EncryptionClient.delete_object.""" + +from unittest.mock import Mock, call + +import pytest + +from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig +from s3_encryption.exceptions import S3EncryptionClientError +from s3_encryption.materials.keyring import S3Keyring + + +def _make_client(): + """Create an S3EncryptionClient with a mocked wrapped S3 client.""" + mock_keyring = Mock(spec=S3Keyring) + mock_s3 = Mock() + mock_s3.meta.events = Mock() + config = S3EncryptionClientConfig(keyring=mock_keyring) + s3ec = S3EncryptionClient(wrapped_s3_client=mock_s3, config=config) + return s3ec, mock_s3 + + +class TestDeleteObject: + ##= specification/s3-encryption/client.md#required-api-operations + ##= type=test + ##% - DeleteObject MUST delete the given object key. + def test_deletes_object(self): + """delete_object forwards the call to the wrapped client.""" + s3ec, mock_s3 = _make_client() + mock_s3.delete_object.return_value = {"DeleteMarker": True} + + response = s3ec.delete_object(Bucket="bucket", Key="key") + + assert response == {"DeleteMarker": True} + assert mock_s3.delete_object.call_args_list[0] == call(Bucket="bucket", Key="key") + + ##= specification/s3-encryption/client.md#required-api-operations + ##= type=test + ##% - DeleteObject MUST delete the associated instruction file + ##% using the default instruction file suffix. + def test_deletes_instruction_file(self): + """delete_object also deletes the instruction file with default suffix.""" + s3ec, mock_s3 = _make_client() + + s3ec.delete_object(Bucket="bucket", Key="key") + + assert mock_s3.delete_object.call_count == 2 + assert mock_s3.delete_object.call_args_list[1] == call( + Bucket="bucket", Key="key.instruction" + ) + + def test_returns_object_delete_response(self): + """delete_object returns the response from the object deletion, not the instruction file.""" + s3ec, mock_s3 = _make_client() + object_response = {"DeleteMarker": True, "VersionId": "v1"} + instruction_response = {"DeleteMarker": False, "VersionId": "v2"} + mock_s3.delete_object.side_effect = [object_response, instruction_response] + + response = s3ec.delete_object(Bucket="bucket", Key="key") + + assert response == object_response + + def test_wraps_unexpected_errors(self): + """delete_object wraps unexpected errors in S3EncryptionClientError.""" + s3ec, mock_s3 = _make_client() + mock_s3.delete_object.side_effect = RuntimeError("network error") + + with pytest.raises(S3EncryptionClientError, match="Failed to delete object"): + s3ec.delete_object(Bucket="bucket", Key="key") + + def test_reraises_s3_encryption_client_error(self): + """delete_object re-raises S3EncryptionClientError without wrapping.""" + s3ec, mock_s3 = _make_client() + mock_s3.delete_object.side_effect = S3EncryptionClientError("original error") + + with pytest.raises(S3EncryptionClientError, match="original error"): + s3ec.delete_object(Bucket="bucket", Key="key") + + def test_passes_extra_kwargs(self): + """delete_object forwards extra kwargs like VersionId to the wrapped client.""" + s3ec, mock_s3 = _make_client() + + s3ec.delete_object(Bucket="bucket", Key="key", VersionId="abc123") + + assert mock_s3.delete_object.call_args_list[0] == call( + Bucket="bucket", Key="key", VersionId="abc123" + ) + + def test_custom_instruction_file_suffix(self): + """delete_object uses a custom instruction file suffix when provided.""" + s3ec, mock_s3 = _make_client() + + s3ec.delete_object(Bucket="bucket", Key="key", InstructionFileSuffix=".custom-suffix") + + assert mock_s3.delete_object.call_count == 2 + assert mock_s3.delete_object.call_args_list[1] == call( + Bucket="bucket", Key="key.custom-suffix" + ) + + def test_instruction_file_suffix_not_forwarded_to_s3(self): + """InstructionFileSuffix is popped from kwargs and not sent to S3.""" + s3ec, mock_s3 = _make_client() + + s3ec.delete_object(Bucket="bucket", Key="key", InstructionFileSuffix=".custom") + + # First call (object delete) should not contain InstructionFileSuffix + assert mock_s3.delete_object.call_args_list[0] == call(Bucket="bucket", Key="key") diff --git a/test/test_s3_encryption_client_delete_objects.py b/test/test_s3_encryption_client_delete_objects.py new file mode 100644 index 00000000..c1045ca3 --- /dev/null +++ b/test/test_s3_encryption_client_delete_objects.py @@ -0,0 +1,188 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Unit tests for S3EncryptionClient.delete_objects.""" + +from unittest.mock import Mock, call + +import pytest + +from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig +from s3_encryption.exceptions import S3EncryptionClientError +from s3_encryption.materials.keyring import S3Keyring + + +def _make_client(): + """Create an S3EncryptionClient with a mocked wrapped S3 client.""" + mock_keyring = Mock(spec=S3Keyring) + mock_s3 = Mock() + mock_s3.meta.events = Mock() + config = S3EncryptionClientConfig(keyring=mock_keyring) + s3ec = S3EncryptionClient(wrapped_s3_client=mock_s3, config=config) + return s3ec, mock_s3 + + +class TestDeleteObjects: + ##= specification/s3-encryption/client.md#required-api-operations + ##= type=test + ##% - DeleteObjects MUST delete each of the given objects. + def test_deletes_objects(self): + """delete_objects forwards the Delete parameter to the wrapped client.""" + s3ec, mock_s3 = _make_client() + mock_s3.delete_objects.return_value = { + "Deleted": [{"Key": "key1"}, {"Key": "key2"}], + } + + delete_param = {"Objects": [{"Key": "key1"}, {"Key": "key2"}]} + response = s3ec.delete_objects(Bucket="bucket", Delete=delete_param) + + assert response == {"Deleted": [{"Key": "key1"}, {"Key": "key2"}]} + assert mock_s3.delete_objects.call_args_list[0] == call( + Bucket="bucket", Delete=delete_param + ) + + ##= specification/s3-encryption/client.md#required-api-operations + ##= type=test + ##% - DeleteObjects MUST delete each of the corresponding instruction files + ##% using the default instruction file suffix. + def test_deletes_instruction_files(self): + """delete_objects also deletes instruction files for each object.""" + s3ec, mock_s3 = _make_client() + mock_s3.delete_objects.return_value = {"Deleted": []} + + s3ec.delete_objects( + Bucket="bucket", + Delete={"Objects": [{"Key": "key1"}, {"Key": "key2"}]}, + ) + + assert mock_s3.delete_objects.call_count == 2 + assert mock_s3.delete_objects.call_args_list[1] == call( + Bucket="bucket", + Delete={ + "Objects": [ + {"Key": "key1.instruction"}, + {"Key": "key2.instruction"}, + ], + }, + ) + + def test_returns_object_delete_response(self): + """delete_objects returns the response from the object deletion, not the instruction file deletion.""" + s3ec, mock_s3 = _make_client() + object_response = {"Deleted": [{"Key": "key1"}]} + instruction_response = {"Deleted": [{"Key": "key1.instruction"}]} + mock_s3.delete_objects.side_effect = [object_response, instruction_response] + + response = s3ec.delete_objects( + Bucket="bucket", + Delete={"Objects": [{"Key": "key1"}]}, + ) + + assert response == object_response + + def test_wraps_unexpected_errors(self): + """delete_objects wraps unexpected errors in S3EncryptionClientError.""" + s3ec, mock_s3 = _make_client() + mock_s3.delete_objects.side_effect = RuntimeError("network error") + + with pytest.raises(S3EncryptionClientError, match="Failed to delete objects"): + s3ec.delete_objects( + Bucket="bucket", + Delete={"Objects": [{"Key": "key1"}]}, + ) + + def test_reraises_s3_encryption_client_error(self): + """delete_objects re-raises S3EncryptionClientError without wrapping.""" + s3ec, mock_s3 = _make_client() + mock_s3.delete_objects.side_effect = S3EncryptionClientError("original error") + + with pytest.raises(S3EncryptionClientError, match="original error"): + s3ec.delete_objects( + Bucket="bucket", + Delete={"Objects": [{"Key": "key1"}]}, + ) + + def test_passes_extra_kwargs(self): + """delete_objects forwards extra kwargs to the wrapped client.""" + s3ec, mock_s3 = _make_client() + mock_s3.delete_objects.return_value = {"Deleted": []} + + s3ec.delete_objects( + Bucket="bucket", + Delete={"Objects": [{"Key": "key1"}]}, + RequestPayer="requester", + ) + + assert mock_s3.delete_objects.call_args_list[0] == call( + Bucket="bucket", + Delete={"Objects": [{"Key": "key1"}]}, + RequestPayer="requester", + ) + + def test_custom_instruction_file_suffix(self): + """delete_objects uses a custom instruction file suffix when provided.""" + s3ec, mock_s3 = _make_client() + mock_s3.delete_objects.return_value = {"Deleted": []} + + s3ec.delete_objects( + Bucket="bucket", + Delete={"Objects": [{"Key": "key1"}]}, + InstructionFileSuffix=".custom-suffix", + ) + + assert mock_s3.delete_objects.call_count == 2 + assert mock_s3.delete_objects.call_args_list[1] == call( + Bucket="bucket", + Delete={"Objects": [{"Key": "key1.custom-suffix"}]}, + ) + + def test_instruction_file_suffix_not_forwarded_to_s3(self): + """InstructionFileSuffix is popped from kwargs and not sent to S3.""" + s3ec, mock_s3 = _make_client() + mock_s3.delete_objects.return_value = {"Deleted": []} + + s3ec.delete_objects( + Bucket="bucket", + Delete={"Objects": [{"Key": "key1"}]}, + InstructionFileSuffix=".custom", + ) + + assert mock_s3.delete_objects.call_args_list[0] == call( + Bucket="bucket", + Delete={"Objects": [{"Key": "key1"}]}, + ) + + def test_preserves_version_ids_in_objects(self): + """delete_objects preserves VersionId in the Objects list.""" + s3ec, mock_s3 = _make_client() + mock_s3.delete_objects.return_value = {"Deleted": []} + + s3ec.delete_objects( + Bucket="bucket", + Delete={ + "Objects": [ + {"Key": "key1", "VersionId": "v1"}, + {"Key": "key2", "VersionId": "v2"}, + ] + }, + ) + + # First call preserves VersionId + assert mock_s3.delete_objects.call_args_list[0] == call( + Bucket="bucket", + Delete={ + "Objects": [ + {"Key": "key1", "VersionId": "v1"}, + {"Key": "key2", "VersionId": "v2"}, + ] + }, + ) + # Instruction file call does NOT include VersionId + assert mock_s3.delete_objects.call_args_list[1] == call( + Bucket="bucket", + Delete={ + "Objects": [ + {"Key": "key1.instruction"}, + {"Key": "key2.instruction"}, + ], + }, + ) diff --git a/test/test_s3_encryption_client_plugin.py b/test/test_s3_encryption_client_plugin.py index bdc48c79..cbc8cd80 100644 --- a/test/test_s3_encryption_client_plugin.py +++ b/test/test_s3_encryption_client_plugin.py @@ -139,3 +139,18 @@ def test_instruction_file_mode_invalid_keys_raises_error(self): # Should raise error with pytest.raises(S3EncryptionClientError, match="Instruction file contains invalid keys"): plugin.on_get_object_after_call(parsed) + + def test_missing_content_length_raises_error(self): + """Test that a missing ContentLength in the S3 response raises an error.""" + mock_keyring = Mock(spec=S3Keyring) + config = S3EncryptionClientConfig(keyring=mock_keyring) + plugin = S3EncryptionClientPlugin(config) + plugin._context.key = "my-object" + + parsed = { + "Body": StreamingBody(io.BytesIO(b"data"), 4), + "Metadata": {}, + } + + with pytest.raises(S3EncryptionClientError, match="missing ContentLength.*Key: my-object"): + plugin.on_get_object_after_call(parsed) diff --git a/test/test_stream.py b/test/test_stream.py new file mode 100644 index 00000000..93ba7e66 --- /dev/null +++ b/test/test_stream.py @@ -0,0 +1,600 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Unit tests for streaming decryption behavior.""" + +import os +from io import BytesIO +from unittest.mock import Mock + +import pytest +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.ciphers.aead import AESGCM + +from s3_encryption.exceptions import S3EncryptionClientError +from s3_encryption.materials import AlgorithmSuite +from s3_encryption.stream import ( + CBCDecryptingStream, + GCMBufferedDecryptingStream, + GCMDelayedAuthDecryptingStream, +) + + +def _encrypt_gcm(plaintext: bytes): + """Encrypt plaintext with AES-GCM, return (ciphertext_with_tag, key, nonce).""" + key = os.urandom(32) + nonce = os.urandom(12) + ciphertext_with_tag = AESGCM(key).encrypt(nonce, plaintext, None) + return ciphertext_with_tag, key, nonce + + +def _make_gcm_decryptor(key, nonce): + """Create a GCM decryptor object.""" + return Cipher(algorithms.AES(key), modes.GCM(nonce)).decryptor() + + +def _make_streaming_body(data: bytes): + """Create a mock StreamingBody wrapping data.""" + body = Mock() + stream = BytesIO(data) + body.read = stream.read + body.close = Mock() + body._stream = stream + return body + + +class TestDelayedAuthReleasesBeforeVerification: + """Delayed auth releases plaintext before the GCM tag is verified.""" + + ##= specification/s3-encryption/client.md#enable-delayed-authentication + ##= type=test + ##% When enabled, the S3EC MAY release plaintext from a stream which has not been authenticated. + def test_delayed_auth_releases_plaintext_before_tag_verification(self): + plaintext = os.urandom(4096) + ciphertext_with_tag, key, nonce = _encrypt_gcm(plaintext) + body = _make_streaming_body(ciphertext_with_tag) + + decryptor = _make_gcm_decryptor(key, nonce) + stream = GCMDelayedAuthDecryptingStream( + body, + decryptor, + tag_length=AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY.cipher_tag_length_bytes, + content_length=len(ciphertext_with_tag), + ) + # read(256) decrypts a partial chunk via cipher.update(), releasing + # plaintext without consuming the full ciphertext stream. The GCM tag + # at the end of the stream has not been reached yet. + chunk = stream.read(256) + + # Plaintext was returned before the stream was fully consumed + assert len(chunk) > 0 + # _finalized is False: the GCM tag has NOT been verified yet + assert not stream._finalized + # Ciphertext remains unread in the underlying stream + assert body._stream.tell() < len(ciphertext_with_tag) + + # Finish reading the stream and verify full plaintext matches + remaining = stream.read() + assert chunk + remaining == plaintext + + +class TestBufferedWithholdsUntilVerification: + """Buffered mode does not release plaintext until the GCM tag is verified.""" + + ##= specification/s3-encryption/client.md#enable-delayed-authentication + ##= type=test + ##% When disabled the S3EC MUST NOT release plaintext from a stream which has not been authenticated. + def test_buffered_verifies_tag_before_releasing_any_plaintext(self): + plaintext = os.urandom(4096) + ciphertext_with_tag, key, nonce = _encrypt_gcm(plaintext) + body = _make_streaming_body(ciphertext_with_tag) + + decryptor = _make_gcm_decryptor(key, nonce) + stream = GCMBufferedDecryptingStream( + body, + decryptor, + tag_length=AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY.cipher_tag_length_bytes, + content_length=len(ciphertext_with_tag), + ) + # read(1) triggers _decrypt(), which calls self._body.read() with no amt, + # consuming the entire ciphertext and verifying the GCM tag before + # returning even 1 byte of plaintext. + chunk = stream.read(1) + + assert chunk == plaintext[:1] + # _plaintext being set confirms full decrypt+verify already happened + assert stream._plaintext is not None + + +def _encrypt_cbc(plaintext: bytes): + """Encrypt plaintext with AES-CBC + PKCS7 padding, return (ciphertext, key, iv, unpadder).""" + from cryptography.hazmat.primitives.padding import PKCS7 + + key = os.urandom(32) + iv = os.urandom(16) + padder = PKCS7(128).padder() + padded = padder.update(plaintext) + padder.finalize() + encryptor = Cipher(algorithms.AES(key), modes.CBC(iv)).encryptor() + ciphertext = encryptor.update(padded) + encryptor.finalize() + unpadder = PKCS7(128).unpadder() + return ciphertext, key, iv, unpadder + + +def _make_cbc_decryptor(key, iv): + return Cipher(algorithms.AES(key), modes.CBC(iv)).decryptor() + + +class TestDelayedAuthCBCDecryption: + + def test_roundtrip(self): + plaintext = b"hello world, this is a CBC test!!" + ciphertext, key, iv, unpadder = _encrypt_cbc(plaintext) + stream = CBCDecryptingStream( + _make_streaming_body(ciphertext), + _make_cbc_decryptor(key, iv), + unpadder=unpadder, + content_length=len(ciphertext), + ) + assert stream.read() == plaintext + + def test_chunked_read(self): + plaintext = b"A" * 256 + ciphertext, key, iv, unpadder = _encrypt_cbc(plaintext) + stream = CBCDecryptingStream( + _make_streaming_body(ciphertext), + _make_cbc_decryptor(key, iv), + unpadder=unpadder, + content_length=len(ciphertext), + ) + result = b"" + while chunk := stream.read(64): + result += chunk + assert result == plaintext + + def test_finalize_called(self): + plaintext = b"finalize me" + ciphertext, key, iv, unpadder = _encrypt_cbc(plaintext) + stream = CBCDecryptingStream( + _make_streaming_body(ciphertext), + _make_cbc_decryptor(key, iv), + unpadder=unpadder, + content_length=len(ciphertext), + ) + actual = stream.read() + assert stream._finalized + assert actual == plaintext + + def test_no_trailing_padding_bytes(self): + plaintext = b"short" + ciphertext, key, iv, unpadder = _encrypt_cbc(plaintext) + stream = CBCDecryptingStream( + _make_streaming_body(ciphertext), + _make_cbc_decryptor(key, iv), + unpadder=unpadder, + content_length=len(ciphertext), + ) + assert stream.read() == plaintext + + def test_read_after_finalized_returns_empty(self): + plaintext = b"done" + ciphertext, key, iv, unpadder = _encrypt_cbc(plaintext) + stream = CBCDecryptingStream( + _make_streaming_body(ciphertext), + _make_cbc_decryptor(key, iv), + unpadder=unpadder, + content_length=len(ciphertext), + ) + stream.read() + assert stream.read() == b"" + + def test_readable_false_after_finalized(self): + plaintext = b"readable" + ciphertext, key, iv, unpadder = _encrypt_cbc(plaintext) + stream = CBCDecryptingStream( + _make_streaming_body(ciphertext), + _make_cbc_decryptor(key, iv), + unpadder=unpadder, + content_length=len(ciphertext), + ) + assert stream.readable() + actual = stream.read() + assert not stream.readable() + assert actual == plaintext + + def test_close_delegates_to_body(self): + plaintext = b"close me" + ciphertext, key, iv, unpadder = _encrypt_cbc(plaintext) + body = _make_streaming_body(ciphertext) + stream = CBCDecryptingStream( + body, _make_cbc_decryptor(key, iv), unpadder=unpadder, content_length=len(ciphertext) + ) + stream.close() + body.close.assert_called_once() + + def test_enter_returns_self(self): + plaintext = b"ctx" + ciphertext, key, iv, unpadder = _encrypt_cbc(plaintext) + stream = CBCDecryptingStream( + _make_streaming_body(ciphertext), + _make_cbc_decryptor(key, iv), + unpadder=unpadder, + content_length=len(ciphertext), + ) + assert stream.__enter__() is stream + + def test_wrong_key_raises_error(self): + from cryptography.hazmat.primitives.padding import PKCS7 + + plaintext = b"wrong key test!!" + ciphertext, _key, iv, _ = _encrypt_cbc(plaintext) + wrong_key = os.urandom(32) + stream = CBCDecryptingStream( + _make_streaming_body(ciphertext), + _make_cbc_decryptor(wrong_key, iv), + unpadder=PKCS7(128).unpadder(), + content_length=len(ciphertext), + ) + with pytest.raises(S3EncryptionClientError, match="Failed to decrypt CBC content"): + stream.read() + + def test_empty_ciphertext(self): + from cryptography.hazmat.primitives.padding import PKCS7 + + key = os.urandom(32) + iv = os.urandom(16) + stream = CBCDecryptingStream( + _make_streaming_body(b""), + _make_cbc_decryptor(key, iv), + unpadder=PKCS7(128).unpadder(), + content_length=0, + ) + # Empty stream finalize will fail because CBC expects at least one block + with pytest.raises(S3EncryptionClientError, match="Failed to decrypt CBC content"): + stream.read() + + +class TestGCMBufferedDecryptingStream: + + def test_full_read(self): + plaintext = os.urandom(1024) + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMBufferedDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + assert stream.read() == plaintext + + def test_partial_reads(self): + plaintext = os.urandom(512) + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMBufferedDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + result = b"" + while chunk := stream.read(100): + result += chunk + assert result == plaintext + + def test_read_triggers_full_decrypt(self): + plaintext = os.urandom(256) + ct, key, nonce = _encrypt_gcm(plaintext) + body = _make_streaming_body(ct) + stream = GCMBufferedDecryptingStream( + body, _make_gcm_decryptor(key, nonce), tag_length=16, content_length=len(ct) + ) + assert stream._plaintext is None + stream.read(1) + assert stream._plaintext is not None + # Entire ciphertext consumed + assert body._stream.tell() == len(ct) + + def test_tell(self): + plaintext = os.urandom(200) + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMBufferedDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + stream.read(50) + assert stream.tell() == 50 + + def test_readable(self): + plaintext = b"readable test" + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMBufferedDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + assert stream.readable() + + def test_readinto(self): + """Asserts that readinto is implemented by botocore's StreamingBody""" + plaintext = os.urandom(64) + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMBufferedDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + buf = bytearray(64) + n = stream.readinto(buf) + assert n == 64 + assert bytes(buf) == plaintext + + def test_enter_returns_self(self): + plaintext = b"enter" + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMBufferedDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + assert stream.__enter__() is stream + + def test_close_delegates(self): + """Asserts that close is implemented by botocore's StreamingBody""" + plaintext = b"close" + ct, key, nonce = _encrypt_gcm(plaintext) + body = _make_streaming_body(ct) + stream = GCMBufferedDecryptingStream( + body, _make_gcm_decryptor(key, nonce), tag_length=16, content_length=len(ct) + ) + stream.close() + body.close.assert_called_once() + + def test_close_without_close_attr(self): + """Asserts that close is implemented by botocore's StreamingBody""" + plaintext = b"no close" + ct, key, nonce = _encrypt_gcm(plaintext) + body = Mock() + del body.close + body.read = BytesIO(ct).read + stream = GCMBufferedDecryptingStream( + body, _make_gcm_decryptor(key, nonce), tag_length=16, content_length=len(ct) + ) + stream.close() # should not raise + + def test_wrong_key_raises_error(self): + plaintext = b"wrong key" + ct, _key, nonce = _encrypt_gcm(plaintext) + wrong_key = os.urandom(32) + stream = GCMBufferedDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(wrong_key, nonce), + tag_length=16, + content_length=len(ct), + ) + with pytest.raises(S3EncryptionClientError, match="Failed to decrypt object"): + stream.read() + + def test_tampered_ciphertext_raises_error(self): + plaintext = b"tamper test" + ct, key, nonce = _encrypt_gcm(plaintext) + tampered = bytearray(ct) + tampered[0] ^= 0xFF + stream = GCMBufferedDecryptingStream( + _make_streaming_body(bytes(tampered)), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + with pytest.raises(S3EncryptionClientError, match="Failed to decrypt object"): + stream.read() + + def test_idempotent_decrypt(self): + plaintext = os.urandom(128) + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMBufferedDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + first = stream.read(63) + second = stream.read(65) + assert first + second == plaintext + + +class TestDelayedAuthGCMDecryption: + + def test_full_read(self): + plaintext = os.urandom(1024) + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMDelayedAuthDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + assert stream.read() == plaintext + + def test_chunked_read(self): + plaintext = os.urandom(512) + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMDelayedAuthDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + result = b"" + while chunk := stream.read(64): + result += chunk + assert result == plaintext + + def test_read_after_finalized_returns_empty(self): + plaintext = os.urandom(128) + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMDelayedAuthDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + actual = stream.read() + assert stream._finalized + assert stream.read() == b"" + assert actual == plaintext + + def test_readable_false_after_finalized(self): + plaintext = b"readable" + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMDelayedAuthDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + assert stream.readable() + stream.read() + assert not stream.readable() + + def test_close_delegates(self): + plaintext = b"close" + ct, key, nonce = _encrypt_gcm(plaintext) + body = _make_streaming_body(ct) + stream = GCMDelayedAuthDecryptingStream( + body, _make_gcm_decryptor(key, nonce), tag_length=16, content_length=len(ct) + ) + stream.close() + body.close.assert_called_once() + + def test_enter_returns_self(self): + plaintext = b"ctx" + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMDelayedAuthDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + assert stream.__enter__() is stream + + def test_wrong_key_raises_error(self): + plaintext = b"wrong key" + ct, _key, nonce = _encrypt_gcm(plaintext) + wrong_key = os.urandom(32) + stream = GCMDelayedAuthDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(wrong_key, nonce), + tag_length=16, + content_length=len(ct), + ) + with pytest.raises(S3EncryptionClientError, match="Failed to decrypt GCM content"): + stream.read() + + def test_tampered_tag_raises_error(self): + plaintext = b"tamper tag" + ct, key, nonce = _encrypt_gcm(plaintext) + tampered = bytearray(ct) + tampered[-1] ^= 0xFF # flip last byte (part of tag) + stream = GCMDelayedAuthDecryptingStream( + _make_streaming_body(bytes(tampered)), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + with pytest.raises(S3EncryptionClientError, match="Failed to decrypt GCM content"): + stream.read() + + def test_small_data_less_than_tag_length(self): + """Data exactly equal to tag length — only tag, no ciphertext.""" + plaintext = b"" + ct, key, nonce = _encrypt_gcm(plaintext) + # For empty plaintext, ct is just the 16-byte tag + assert len(ct) == 16 + stream = GCMDelayedAuthDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + assert stream.read() == b"" + + def test_large_data(self): + plaintext = os.urandom(1024 * 1024) # 1 MB + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMDelayedAuthDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + result = b"" + while chunk := stream.read(65536): + result += chunk + assert result == plaintext + + +# --------------------------------------------------------------------------- +# Parameterized edge-case plaintext lengths +# --------------------------------------------------------------------------- +# Lengths chosen around AES block size (16) and two-block (32) boundaries, +# plus zero and one byte, to exercise padding, tag-splitting, and empty-data paths. +EDGE_CASE_LENGTHS = [0, 1, 8, 15, 16, 17, 31, 32, 33, 47, 48, 49] + + +class TestEdgeCasePlaintextLengths: + + @pytest.mark.parametrize("length", EDGE_CASE_LENGTHS) + def test_buffered_gcm(self, length): + plaintext = os.urandom(length) + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMBufferedDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + assert stream.read() == plaintext + + @pytest.mark.parametrize("length", EDGE_CASE_LENGTHS) + def test_delayed_auth_gcm(self, length): + plaintext = os.urandom(length) + ct, key, nonce = _encrypt_gcm(plaintext) + stream = GCMDelayedAuthDecryptingStream( + _make_streaming_body(ct), + _make_gcm_decryptor(key, nonce), + tag_length=16, + content_length=len(ct), + ) + result = b"" + while chunk := stream.read(7): + result += chunk + assert result == plaintext + + @pytest.mark.parametrize("length", EDGE_CASE_LENGTHS) + def test_delayed_auth_cbc(self, length): + plaintext = os.urandom(length) + ciphertext, key, iv, unpadder = _encrypt_cbc(plaintext) + stream = CBCDecryptingStream( + _make_streaming_body(ciphertext), + _make_cbc_decryptor(key, iv), + unpadder=unpadder, + content_length=len(ciphertext), + ) + result = b"" + while stream.readable(): + # odd read size to stress tag-splitting/padding + result += stream.read(7) + assert result == plaintext + + +class TestGCMDelayedAuthContentLengthValidation: + + def test_content_length_less_than_tag_length_raises(self): + """ContentLength smaller than the GCM tag must raise immediately.""" + stream_body = _make_streaming_body(b"\x00" * 8) + decryptor = _make_gcm_decryptor(os.urandom(32), os.urandom(12)) + with pytest.raises(S3EncryptionClientError, match="less than GCM tag length"): + GCMDelayedAuthDecryptingStream(stream_body, decryptor, tag_length=16, content_length=8)