From 84c442a26ab4527f198bdf032911fc43b6fc6b66 Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Fri, 24 Apr 2026 14:57:41 -0700 Subject: [PATCH 1/3] feat: add instruction file config to provide a way to disable instruction files --- src/s3_encryption/__init__.py | 26 +- src/s3_encryption/instruction_file_config.py | 34 +++ src/s3_encryption/pipelines.py | 10 + .../test_i_s3_encryption_instruction_file.py | 100 +++++++ test/test_instruction_file_config.py | 254 ++++++++++++++++++ test/test_s3_encryption_client_delete.py | 19 ++ ...est_s3_encryption_client_delete_objects.py | 26 ++ 7 files changed, 461 insertions(+), 8 deletions(-) create mode 100644 src/s3_encryption/instruction_file_config.py create mode 100644 test/test_instruction_file_config.py diff --git a/src/s3_encryption/__init__.py b/src/s3_encryption/__init__.py index f684c6a9..a8ab239b 100644 --- a/src/s3_encryption/__init__.py +++ b/src/s3_encryption/__init__.py @@ -11,6 +11,7 @@ from .exceptions import S3EncryptionClientError from .instruction_file import parse_instruction_file +from .instruction_file_config import InstructionFileConfig from .materials.crypto_materials_manager import ( AbstractCryptoMaterialsManager, DefaultCryptoMaterialsManager, @@ -57,6 +58,9 @@ class S3EncryptionClientConfig: before GCM tag verification. Defaults to False. Has no effect for CBC encrypted ciphertext, which is always streamed as there is no authentication tag. + instruction_file_config: Configuration for instruction file behavior. + Defaults to InstructionFileConfig() which enables instruction file + reads on GetObject. Raises: S3EncryptionClientError: If the encryption algorithm is legacy, or if @@ -86,6 +90,8 @@ class S3EncryptionClientConfig: ##% Delayed Authentication mode MUST be set to false by default. enable_delayed_authentication: bool = field(default=False) + instruction_file_config: InstructionFileConfig = field(factory=InstructionFileConfig) + @cmm.default def _default_cmm_for_keyring(self): return DefaultCryptoMaterialsManager(self.keyring) @@ -248,6 +254,7 @@ def on_get_object_after_call(self, parsed, **kwargs): commitment_policy=self.config.commitment_policy, s3_client=getattr(self._context, _CTX_S3_CLIENT, None), enable_legacy_unauthenticated_modes=self.config.enable_legacy_unauthenticated_modes, + instruction_file_config=self.config.instruction_file_config, ) decrypted_data = pipeline.decrypt( response, @@ -428,8 +435,9 @@ def delete_object(self, **kwargs): ##= type=implementation ##% - DeleteObject MUST delete the associated instruction file ##% using the default instruction file suffix. - instruction_key = kwargs["Key"] + instruction_file_suffix - self.wrapped_s3_client.delete_object(Bucket=kwargs["Bucket"], Key=instruction_key) + if not self.config.instruction_file_config.disable_delete_object: + instruction_key = kwargs["Key"] + instruction_file_suffix + self.wrapped_s3_client.delete_object(Bucket=kwargs["Bucket"], Key=instruction_key) return response except S3EncryptionClientError: @@ -469,12 +477,14 @@ def delete_objects(self, **kwargs): ##= type=implementation ##% - DeleteObjects MUST delete each of the corresponding instruction files ##% using the default instruction file suffix. - instruction_objects = [ - {"Key": obj["Key"] + instruction_file_suffix} for obj in kwargs["Delete"]["Objects"] - ] - self.wrapped_s3_client.delete_objects( - Bucket=kwargs["Bucket"], Delete={"Objects": instruction_objects} - ) + if not self.config.instruction_file_config.disable_delete_objects: + instruction_objects = [ + {"Key": obj["Key"] + instruction_file_suffix} + for obj in kwargs["Delete"]["Objects"] + ] + self.wrapped_s3_client.delete_objects( + Bucket=kwargs["Bucket"], Delete={"Objects": instruction_objects} + ) return response except S3EncryptionClientError: diff --git a/src/s3_encryption/instruction_file_config.py b/src/s3_encryption/instruction_file_config.py new file mode 100644 index 00000000..73320533 --- /dev/null +++ b/src/s3_encryption/instruction_file_config.py @@ -0,0 +1,34 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Instruction file configuration for S3 Encryption Client. + +This module provides configuration for instruction file behavior +during encryption and decryption operations. +""" + +from attrs import define, field + + +@define +class InstructionFileConfig: + """Configuration for instruction file behavior in the S3 Encryption Client. + + Controls whether the client will interact with instruction files + as part of GetObject, DeleteObject, and DeleteObjects operations. + + Attributes: + disable_get_object: If True, the client will not attempt to fetch + instruction files during GetObject (decryption) and will raise + an error if the object's metadata implies an instruction file + is required. Defaults to False. + disable_delete_object: If True, the client will not attempt to + delete the associated instruction file during DeleteObject. + Defaults to False. + disable_delete_objects: If True, the client will not attempt to + delete the associated instruction files during DeleteObjects. + Defaults to False. + """ + + disable_get_object: bool = field(default=False) + disable_delete_object: bool = field(default=False) + disable_delete_objects: bool = field(default=False) diff --git a/src/s3_encryption/pipelines.py b/src/s3_encryption/pipelines.py index 0a107a5f..d101bba6 100644 --- a/src/s3_encryption/pipelines.py +++ b/src/s3_encryption/pipelines.py @@ -20,6 +20,7 @@ from .decryptor import AesCbcDecryptor, AesGcmDecryptor from .exceptions import S3EncryptionClientError from .instruction_file import fetch_instruction_file +from .instruction_file_config import InstructionFileConfig from .key_derivation import derive_keys, verify_commitment from .materials.crypto_materials_manager import AbstractCryptoMaterialsManager from .materials.encrypted_data_key import EncryptedDataKey @@ -181,6 +182,7 @@ class GetEncryptedObjectPipeline: commitment_policy: CommitmentPolicy = field() s3_client: object = field(default=None) enable_legacy_unauthenticated_modes: bool = field(default=False) + instruction_file_config: InstructionFileConfig = field(factory=InstructionFileConfig) # Map content cipher metadata values to AlgorithmSuite _CONTENT_CIPHER_TO_ALGORITHM_SUITE = { @@ -258,6 +260,14 @@ def decrypt( # Check if we need to fetch instruction file if metadata.should_use_instruction_file(): + if self.instruction_file_config.disable_get_object: + raise S3EncryptionClientError( + "Exception encountered while fetching Instruction File. " + "Ensure the object you are attempting to decrypt has been encrypted " + "using the S3 Encryption Client and instruction files are enabled. " + f"bucket: {bucket}\n key: {key}" + ) + if self.s3_client is None: raise S3EncryptionClientError("s3_client required to fetch instruction file") if bucket is None or key is None: diff --git a/test/integration/test_i_s3_encryption_instruction_file.py b/test/integration/test_i_s3_encryption_instruction_file.py index b2d2b5f4..9435989b 100644 --- a/test/integration/test_i_s3_encryption_instruction_file.py +++ b/test/integration/test_i_s3_encryption_instruction_file.py @@ -291,3 +291,103 @@ def test_decrypt_large_v3_instruction_file_delayed_auth(): total += len(chunk) assert total == LARGE_FILE_SIZE + + +# --- InstructionFileConfig integration tests --- + + +def test_instruction_file_config_disabled_raises_on_instruction_file_object(): + """When instruction file get is disabled, decrypting an instruction-file object MUST fail.""" + from s3_encryption.exceptions import S3EncryptionClientError + from s3_encryption.instruction_file_config import InstructionFileConfig + + key = TEST_OBJECTS["v3_instruction_file"] + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig( + keyring, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + instruction_file_config=InstructionFileConfig(disable_get_object=True), + ) + s3ec = S3EncryptionClient(wrapped_client, config) + + with pytest.raises( + S3EncryptionClientError, match="Exception encountered while fetching Instruction File" + ): + s3ec.get_object(Bucket=bucket, Key=key) + + +def test_instruction_file_config_enabled_still_decrypts(): + """When instruction file get is explicitly enabled, decryption MUST succeed as before.""" + from s3_encryption.instruction_file_config import InstructionFileConfig + + key = TEST_OBJECTS["v3_instruction_file"] + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + wrapped_client = boto3.client("s3") + config = S3EncryptionClientConfig( + keyring, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + instruction_file_config=InstructionFileConfig(disable_get_object=False), + ) + s3ec = S3EncryptionClient(wrapped_client, config) + + response = s3ec.get_object(Bucket=bucket, Key=key) + output = response["Body"].read().decode("utf-8") + + assert output == "static-v3-instruction-file-from-java-v4" + + +def test_instruction_file_config_disabled_allows_non_instruction_file_objects(): + """When instruction file get is disabled, objects with metadata in headers MUST still decrypt.""" + from s3_encryption.instruction_file_config import InstructionFileConfig + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + wrapped_client = boto3.client("s3") + + # First, put an object using default config (metadata in object headers) + put_config = S3EncryptionClientConfig(keyring) + put_client = S3EncryptionClient(boto3.client("s3"), put_config) + + test_key = f"instruction-file-config-test-{uuid.uuid4()}" + plaintext = b"hello from instruction file config test" + put_client.put_object(Bucket=bucket, Key=test_key, Body=plaintext) + + try: + # Now decrypt with instruction file get disabled + config = S3EncryptionClientConfig( + keyring, + instruction_file_config=InstructionFileConfig(disable_get_object=True), + ) + s3ec = S3EncryptionClient(wrapped_client, config) + + response = s3ec.get_object(Bucket=bucket, Key=test_key) + output = response["Body"].read() + + assert output == plaintext + finally: + wrapped_client.delete_object(Bucket=bucket, Key=test_key) + + +def test_instruction_file_config_default_still_decrypts_instruction_files(): + """Default InstructionFileConfig (no explicit config) MUST still decrypt instruction files.""" + key = TEST_OBJECTS["v3_instruction_file"] + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + wrapped_client = boto3.client("s3") + # No instruction_file_config specified — should use default (enabled) + config = S3EncryptionClientConfig( + keyring, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + ) + s3ec = S3EncryptionClient(wrapped_client, config) + + response = s3ec.get_object(Bucket=bucket, Key=key) + output = response["Body"].read().decode("utf-8") + + assert output == "static-v3-instruction-file-from-java-v4" diff --git a/test/test_instruction_file_config.py b/test/test_instruction_file_config.py new file mode 100644 index 00000000..851fc605 --- /dev/null +++ b/test/test_instruction_file_config.py @@ -0,0 +1,254 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Unit tests for InstructionFileConfig and its integration with S3EncryptionClientConfig.""" + +import base64 +import json +import os +from io import BytesIO +from unittest.mock import Mock + +import pytest + +from s3_encryption import S3EncryptionClientConfig +from s3_encryption.exceptions import S3EncryptionClientError +from s3_encryption.instruction_file_config import InstructionFileConfig +from s3_encryption.materials.crypto_materials_manager import DefaultCryptoMaterialsManager +from s3_encryption.materials.keyring import S3Keyring +from s3_encryption.materials.materials import CommitmentPolicy +from s3_encryption.pipelines import GetEncryptedObjectPipeline + + +class TestInstructionFileConfig: + """Tests for the InstructionFileConfig attrs class.""" + + def test_defaults_all_false(self): + """All disable flags default to False.""" + config = InstructionFileConfig() + assert config.disable_get_object is False + assert config.disable_delete_object is False + assert config.disable_delete_objects is False + + def test_disable_get_object(self): + """disable_get_object can be set to True.""" + config = InstructionFileConfig(disable_get_object=True) + assert config.disable_get_object is True + assert config.disable_delete_object is False + assert config.disable_delete_objects is False + + def test_disable_delete_object(self): + """disable_delete_object can be set independently.""" + config = InstructionFileConfig(disable_delete_object=True) + assert config.disable_get_object is False + assert config.disable_delete_object is True + assert config.disable_delete_objects is False + + def test_disable_delete_objects(self): + """disable_delete_objects can be set independently.""" + config = InstructionFileConfig(disable_delete_objects=True) + assert config.disable_get_object is False + assert config.disable_delete_object is False + assert config.disable_delete_objects is True + + def test_all_disabled(self): + """All flags can be set to True simultaneously.""" + config = InstructionFileConfig( + disable_get_object=True, + disable_delete_object=True, + disable_delete_objects=True, + ) + assert config.disable_get_object is True + assert config.disable_delete_object is True + assert config.disable_delete_objects is True + + +class TestS3EncryptionClientConfigInstructionFileConfig: + """Tests for instruction_file_config on S3EncryptionClientConfig.""" + + def test_default_instruction_file_config(self): + """S3EncryptionClientConfig defaults to InstructionFileConfig with all enabled.""" + mock_keyring = Mock(spec=S3Keyring) + config = S3EncryptionClientConfig(keyring=mock_keyring) + assert isinstance(config.instruction_file_config, InstructionFileConfig) + assert config.instruction_file_config.disable_get_object is False + + def test_custom_instruction_file_config(self): + """S3EncryptionClientConfig accepts a custom InstructionFileConfig.""" + mock_keyring = Mock(spec=S3Keyring) + ifc = InstructionFileConfig(disable_get_object=True) + config = S3EncryptionClientConfig(keyring=mock_keyring, instruction_file_config=ifc) + assert config.instruction_file_config.disable_get_object is True + + def test_instruction_file_config_does_not_affect_other_config(self): + """Setting instruction_file_config does not change other defaults.""" + mock_keyring = Mock(spec=S3Keyring) + ifc = InstructionFileConfig(disable_get_object=True) + config = S3EncryptionClientConfig(keyring=mock_keyring, instruction_file_config=ifc) + assert config.enable_delayed_authentication is False + assert config.enable_legacy_unauthenticated_modes is False + + +class TestPipelineInstructionFileGetDisabled: + """Tests for GetEncryptedObjectPipeline when instruction file get is disabled.""" + + def test_decrypt_raises_when_instruction_file_disabled_and_needed(self): + """Pipeline MUST raise when instruction file is needed but disabled.""" + object_metadata = {} + + mock_keyring = Mock(spec=S3Keyring) + cmm = DefaultCryptoMaterialsManager(mock_keyring) + mock_s3_client = Mock() + + pipeline = GetEncryptedObjectPipeline( + cmm, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + s3_client=mock_s3_client, + instruction_file_config=InstructionFileConfig(disable_get_object=True), + ) + + mock_response = { + "Body": BytesIO(b"encrypted-data"), + "Metadata": object_metadata, + } + + with pytest.raises( + S3EncryptionClientError, + match="Exception encountered while fetching Instruction File", + ): + pipeline.decrypt( + mock_response, + instruction_suffix=".instruction", + enable_delayed_authentication=False, + bucket="test-bucket", + key="test-key", + ) + + mock_s3_client.get_object.assert_not_called() + + def test_decrypt_raises_when_instruction_file_disabled_v3_partial_metadata(self): + """Pipeline MUST raise when V3 object has partial metadata requiring instruction file.""" + object_metadata = { + "x-amz-c": "115", + "x-amz-d": base64.b64encode(b"key-commitment-data").decode("utf-8"), + "x-amz-i": base64.b64encode(b"test-message-id").decode("utf-8"), + } + + mock_keyring = Mock(spec=S3Keyring) + cmm = DefaultCryptoMaterialsManager(mock_keyring) + mock_s3_client = Mock() + + pipeline = GetEncryptedObjectPipeline( + cmm, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + s3_client=mock_s3_client, + instruction_file_config=InstructionFileConfig(disable_get_object=True), + ) + + mock_response = { + "Body": BytesIO(b"encrypted-data"), + "Metadata": object_metadata, + } + + with pytest.raises( + S3EncryptionClientError, + match="Exception encountered while fetching Instruction File", + ): + pipeline.decrypt( + mock_response, + instruction_suffix=".instruction", + enable_delayed_authentication=False, + bucket="test-bucket", + key="test-key", + ) + + mock_s3_client.get_object.assert_not_called() + + def test_decrypt_succeeds_when_instruction_file_disabled_but_not_needed(self): + """Objects with metadata in headers decrypt fine regardless of config.""" + object_metadata = { + "x-amz-iv": base64.b64encode(os.urandom(12)).decode("utf-8"), + "x-amz-key-v2": base64.b64encode(b"encrypted-key-data").decode("utf-8"), + "x-amz-wrap-alg": "kms+context", + "x-amz-matdesc": json.dumps({"kms_cmk_id": "test-key-id"}), + "x-amz-cek-alg": "AES/GCM/NoPadding", + "x-amz-tag-len": "128", + } + + mock_keyring = Mock(spec=S3Keyring) + cmm = DefaultCryptoMaterialsManager(mock_keyring) + + pipeline = GetEncryptedObjectPipeline( + cmm, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + s3_client=None, + instruction_file_config=InstructionFileConfig(disable_get_object=True), + ) + + mock_response = { + "Body": BytesIO(b"encrypted-data"), + "Metadata": object_metadata, + "ContentLength": 100, + } + + mock_keyring.on_decrypt.side_effect = Exception("Keyring called — no instruction file") + + with pytest.raises(Exception, match="Keyring called"): + pipeline.decrypt( + mock_response, + instruction_suffix=".instruction", + enable_delayed_authentication=False, + bucket="test-bucket", + key="test-key", + ) + + def test_decrypt_fetches_instruction_file_when_not_disabled(self): + """Pipeline fetches instruction file normally when disable_get_object is False.""" + object_metadata = {} + + instruction_file_metadata = { + "x-amz-iv": base64.b64encode(os.urandom(12)).decode("utf-8"), + "x-amz-key-v2": base64.b64encode(b"encrypted-key-data").decode("utf-8"), + "x-amz-wrap-alg": "kms+context", + "x-amz-matdesc": json.dumps({"kms_cmk_id": "test-key-id"}), + "x-amz-cek-alg": "AES/GCM/NoPadding", + "x-amz-tag-len": "128", + "x-amz-crypto-instr-file": "", + } + + mock_s3_client = Mock() + mock_s3_client.get_object.return_value = { + "Body": BytesIO(b""), + "Metadata": instruction_file_metadata, + } + + mock_keyring = Mock(spec=S3Keyring) + cmm = DefaultCryptoMaterialsManager(mock_keyring) + + pipeline = GetEncryptedObjectPipeline( + cmm, + commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, + s3_client=mock_s3_client, + instruction_file_config=InstructionFileConfig(disable_get_object=False), + ) + + mock_response = { + "Body": BytesIO(b"encrypted-test-data"), + "Metadata": object_metadata, + } + + mock_keyring.on_decrypt.side_effect = Exception( + "Keyring called - instruction file was fetched" + ) + + with pytest.raises(Exception, match="Keyring called"): + pipeline.decrypt( + mock_response, + instruction_suffix=".instruction", + enable_delayed_authentication=False, + bucket="test-bucket", + key="test-key", + ) + + mock_s3_client.get_object.assert_called_once_with( + Bucket="test-bucket", Key="test-key.instruction" + ) diff --git a/test/test_s3_encryption_client_delete.py b/test/test_s3_encryption_client_delete.py index 1279abab..897ccf3f 100644 --- a/test/test_s3_encryption_client_delete.py +++ b/test/test_s3_encryption_client_delete.py @@ -106,3 +106,22 @@ def test_instruction_file_suffix_not_forwarded_to_s3(self): # First call (object delete) should not contain InstructionFileSuffix assert mock_s3.delete_object.call_args_list[0] == call(Bucket="bucket", Key="key") + + def test_instruction_file_not_deleted_when_disabled(self): + """delete_object skips instruction file deletion when disable_delete_object is True.""" + from s3_encryption.instruction_file_config import InstructionFileConfig + + mock_keyring = Mock(spec=S3Keyring) + mock_s3 = Mock() + mock_s3.meta.events = Mock() + config = S3EncryptionClientConfig( + keyring=mock_keyring, + instruction_file_config=InstructionFileConfig(disable_delete_object=True), + ) + s3ec = S3EncryptionClient(wrapped_s3_client=mock_s3, config=config) + + s3ec.delete_object(Bucket="bucket", Key="key") + + # Only one call — the object itself, no instruction file delete + assert mock_s3.delete_object.call_count == 1 + assert mock_s3.delete_object.call_args_list[0] == call(Bucket="bucket", Key="key") diff --git a/test/test_s3_encryption_client_delete_objects.py b/test/test_s3_encryption_client_delete_objects.py index c1045ca3..4c4b99b5 100644 --- a/test/test_s3_encryption_client_delete_objects.py +++ b/test/test_s3_encryption_client_delete_objects.py @@ -186,3 +186,29 @@ def test_preserves_version_ids_in_objects(self): ], }, ) + + def test_instruction_files_not_deleted_when_disabled(self): + """delete_objects skips instruction file deletion when disable_delete_objects is True.""" + from s3_encryption.instruction_file_config import InstructionFileConfig + + mock_keyring = Mock(spec=S3Keyring) + mock_s3 = Mock() + mock_s3.meta.events = Mock() + mock_s3.delete_objects.return_value = {"Deleted": [{"Key": "key1"}, {"Key": "key2"}]} + config = S3EncryptionClientConfig( + keyring=mock_keyring, + instruction_file_config=InstructionFileConfig(disable_delete_objects=True), + ) + s3ec = S3EncryptionClient(wrapped_s3_client=mock_s3, config=config) + + s3ec.delete_objects( + Bucket="bucket", + Delete={"Objects": [{"Key": "key1"}, {"Key": "key2"}]}, + ) + + # Only one call — the objects themselves, no instruction file delete + assert mock_s3.delete_objects.call_count == 1 + assert mock_s3.delete_objects.call_args_list[0] == call( + Bucket="bucket", + Delete={"Objects": [{"Key": "key1"}, {"Key": "key2"}]}, + ) From c2ab684d6c04e2a8bf2ee5e20741dd55845feb4d Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Fri, 24 Apr 2026 15:39:18 -0700 Subject: [PATCH 2/3] add integ tests for deletes --- .../test_i_s3_encryption_instruction_file.py | 157 ++++++++++++++++++ 1 file changed, 157 insertions(+) diff --git a/test/integration/test_i_s3_encryption_instruction_file.py b/test/integration/test_i_s3_encryption_instruction_file.py index 9435989b..bacb8786 100644 --- a/test/integration/test_i_s3_encryption_instruction_file.py +++ b/test/integration/test_i_s3_encryption_instruction_file.py @@ -391,3 +391,160 @@ def test_instruction_file_config_default_still_decrypts_instruction_files(): output = response["Body"].read().decode("utf-8") assert output == "static-v3-instruction-file-from-java-v4" + + +# --- InstructionFileConfig delete_object / delete_objects integration tests --- + + +def _object_exists(bucket_name, key_name): + """Return True if the object exists in the bucket.""" + from botocore.exceptions import ClientError + + s3 = boto3.client("s3") + try: + s3.head_object(Bucket=bucket_name, Key=key_name) + return True + except ClientError as e: + if e.response["Error"]["Code"] == "404": + return False + raise + + +def test_delete_object_skips_instruction_file_when_disabled(): + """delete_object with disable_delete_object=True must NOT delete the instruction file.""" + from s3_encryption.instruction_file_config import InstructionFileConfig + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + plain_s3 = boto3.client("s3") + + test_key = f"ifc-delete-obj-skip-{uuid.uuid4()}" + instr_key = test_key + ".instruction" + + # Put an encrypted object and a fake instruction file + default_client = S3EncryptionClient( + boto3.client("s3"), S3EncryptionClientConfig(keyring) + ) + default_client.put_object(Bucket=bucket, Key=test_key, Body=b"data") + plain_s3.put_object(Bucket=bucket, Key=instr_key, Body=b"{}") + + try: + # Delete with instruction file deletion disabled + config = S3EncryptionClientConfig( + keyring, + instruction_file_config=InstructionFileConfig(disable_delete_object=True), + ) + s3ec = S3EncryptionClient(boto3.client("s3"), config) + s3ec.delete_object(Bucket=bucket, Key=test_key) + + # Object should be gone, instruction file should remain + assert not _object_exists(bucket, test_key) + assert _object_exists(bucket, instr_key) + finally: + # Clean up the instruction file + plain_s3.delete_object(Bucket=bucket, Key=instr_key) + + +def test_delete_object_deletes_instruction_file_when_not_disabled(): + """delete_object with default config must delete the instruction file.""" + from s3_encryption.instruction_file_config import InstructionFileConfig + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + plain_s3 = boto3.client("s3") + + test_key = f"ifc-delete-obj-default-{uuid.uuid4()}" + instr_key = test_key + ".instruction" + + default_client = S3EncryptionClient( + boto3.client("s3"), S3EncryptionClientConfig(keyring) + ) + default_client.put_object(Bucket=bucket, Key=test_key, Body=b"data") + plain_s3.put_object(Bucket=bucket, Key=instr_key, Body=b"{}") + + # Delete with default config (instruction file deletion enabled) + config = S3EncryptionClientConfig( + keyring, + instruction_file_config=InstructionFileConfig(disable_delete_object=False), + ) + s3ec = S3EncryptionClient(boto3.client("s3"), config) + s3ec.delete_object(Bucket=bucket, Key=test_key) + + assert not _object_exists(bucket, test_key) + assert not _object_exists(bucket, instr_key) + + +def test_delete_objects_skips_instruction_files_when_disabled(): + """delete_objects with disable_delete_objects=True must NOT delete instruction files.""" + from s3_encryption.instruction_file_config import InstructionFileConfig + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + plain_s3 = boto3.client("s3") + + keys = [f"ifc-delete-objs-skip-{uuid.uuid4()}" for _ in range(2)] + instr_keys = [k + ".instruction" for k in keys] + + default_client = S3EncryptionClient( + boto3.client("s3"), S3EncryptionClientConfig(keyring) + ) + for key in keys: + default_client.put_object(Bucket=bucket, Key=key, Body=b"data") + for instr_key in instr_keys: + plain_s3.put_object(Bucket=bucket, Key=instr_key, Body=b"{}") + + try: + config = S3EncryptionClientConfig( + keyring, + instruction_file_config=InstructionFileConfig(disable_delete_objects=True), + ) + s3ec = S3EncryptionClient(boto3.client("s3"), config) + s3ec.delete_objects( + Bucket=bucket, + Delete={"Objects": [{"Key": k} for k in keys]}, + ) + + for key in keys: + assert not _object_exists(bucket, key) + for instr_key in instr_keys: + assert _object_exists(bucket, instr_key) + finally: + plain_s3.delete_objects( + Bucket=bucket, + Delete={"Objects": [{"Key": k} for k in instr_keys]}, + ) + + +def test_delete_objects_deletes_instruction_files_when_not_disabled(): + """delete_objects with default config must delete instruction files.""" + from s3_encryption.instruction_file_config import InstructionFileConfig + + kms_client = boto3.client("kms", region_name=region) + keyring = KmsKeyring(kms_client, kms_key_id) + plain_s3 = boto3.client("s3") + + keys = [f"ifc-delete-objs-default-{uuid.uuid4()}" for _ in range(2)] + instr_keys = [k + ".instruction" for k in keys] + + default_client = S3EncryptionClient( + boto3.client("s3"), S3EncryptionClientConfig(keyring) + ) + for key in keys: + default_client.put_object(Bucket=bucket, Key=key, Body=b"data") + for instr_key in instr_keys: + plain_s3.put_object(Bucket=bucket, Key=instr_key, Body=b"{}") + + config = S3EncryptionClientConfig( + keyring, + instruction_file_config=InstructionFileConfig(disable_delete_objects=False), + ) + s3ec = S3EncryptionClient(boto3.client("s3"), config) + s3ec.delete_objects( + Bucket=bucket, + Delete={"Objects": [{"Key": k} for k in keys]}, + ) + + for key in keys: + assert not _object_exists(bucket, key) + for instr_key in instr_keys: + assert not _object_exists(bucket, instr_key) From 50d5185091fff8aa271a846d4941c1a8ed0ca476 Mon Sep 17 00:00:00 2001 From: Kess Plasmeier Date: Fri, 24 Apr 2026 15:42:16 -0700 Subject: [PATCH 3/3] lint --- .../test_i_s3_encryption_instruction_file.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/test/integration/test_i_s3_encryption_instruction_file.py b/test/integration/test_i_s3_encryption_instruction_file.py index bacb8786..c46176f5 100644 --- a/test/integration/test_i_s3_encryption_instruction_file.py +++ b/test/integration/test_i_s3_encryption_instruction_file.py @@ -422,9 +422,7 @@ def test_delete_object_skips_instruction_file_when_disabled(): instr_key = test_key + ".instruction" # Put an encrypted object and a fake instruction file - default_client = S3EncryptionClient( - boto3.client("s3"), S3EncryptionClientConfig(keyring) - ) + default_client = S3EncryptionClient(boto3.client("s3"), S3EncryptionClientConfig(keyring)) default_client.put_object(Bucket=bucket, Key=test_key, Body=b"data") plain_s3.put_object(Bucket=bucket, Key=instr_key, Body=b"{}") @@ -456,9 +454,7 @@ def test_delete_object_deletes_instruction_file_when_not_disabled(): test_key = f"ifc-delete-obj-default-{uuid.uuid4()}" instr_key = test_key + ".instruction" - default_client = S3EncryptionClient( - boto3.client("s3"), S3EncryptionClientConfig(keyring) - ) + default_client = S3EncryptionClient(boto3.client("s3"), S3EncryptionClientConfig(keyring)) default_client.put_object(Bucket=bucket, Key=test_key, Body=b"data") plain_s3.put_object(Bucket=bucket, Key=instr_key, Body=b"{}") @@ -485,9 +481,7 @@ def test_delete_objects_skips_instruction_files_when_disabled(): keys = [f"ifc-delete-objs-skip-{uuid.uuid4()}" for _ in range(2)] instr_keys = [k + ".instruction" for k in keys] - default_client = S3EncryptionClient( - boto3.client("s3"), S3EncryptionClientConfig(keyring) - ) + default_client = S3EncryptionClient(boto3.client("s3"), S3EncryptionClientConfig(keyring)) for key in keys: default_client.put_object(Bucket=bucket, Key=key, Body=b"data") for instr_key in instr_keys: @@ -526,9 +520,7 @@ def test_delete_objects_deletes_instruction_files_when_not_disabled(): keys = [f"ifc-delete-objs-default-{uuid.uuid4()}" for _ in range(2)] instr_keys = [k + ".instruction" for k in keys] - default_client = S3EncryptionClient( - boto3.client("s3"), S3EncryptionClientConfig(keyring) - ) + default_client = S3EncryptionClient(boto3.client("s3"), S3EncryptionClientConfig(keyring)) for key in keys: default_client.put_object(Bucket=bucket, Key=key, Body=b"data") for instr_key in instr_keys: