Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions src/s3_encryption/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from .exceptions import S3EncryptionClientError
from .instruction_file import parse_instruction_file
from .instruction_file_config import InstructionFileConfig
from .materials.crypto_materials_manager import (
AbstractCryptoMaterialsManager,
DefaultCryptoMaterialsManager,
Expand Down Expand Up @@ -57,6 +58,9 @@ class S3EncryptionClientConfig:
before GCM tag verification. Defaults to False. Has no effect for
CBC encrypted ciphertext, which is always streamed as there is no
authentication tag.
instruction_file_config: Configuration for instruction file behavior.
Defaults to InstructionFileConfig() which enables instruction file
reads on GetObject.

Raises:
S3EncryptionClientError: If the encryption algorithm is legacy, or if
Expand Down Expand Up @@ -86,6 +90,8 @@ class S3EncryptionClientConfig:
##% Delayed Authentication mode MUST be set to false by default.
enable_delayed_authentication: bool = field(default=False)

instruction_file_config: InstructionFileConfig = field(factory=InstructionFileConfig)

@cmm.default
def _default_cmm_for_keyring(self):
return DefaultCryptoMaterialsManager(self.keyring)
Expand Down Expand Up @@ -248,6 +254,7 @@ def on_get_object_after_call(self, parsed, **kwargs):
commitment_policy=self.config.commitment_policy,
s3_client=getattr(self._context, _CTX_S3_CLIENT, None),
enable_legacy_unauthenticated_modes=self.config.enable_legacy_unauthenticated_modes,
instruction_file_config=self.config.instruction_file_config,
)
decrypted_data = pipeline.decrypt(
response,
Expand Down Expand Up @@ -428,8 +435,9 @@ def delete_object(self, **kwargs):
##= type=implementation
##% - DeleteObject MUST delete the associated instruction file
##% using the default instruction file suffix.
instruction_key = kwargs["Key"] + instruction_file_suffix
self.wrapped_s3_client.delete_object(Bucket=kwargs["Bucket"], Key=instruction_key)
if not self.config.instruction_file_config.disable_delete_object:
instruction_key = kwargs["Key"] + instruction_file_suffix
self.wrapped_s3_client.delete_object(Bucket=kwargs["Bucket"], Key=instruction_key)

return response
except S3EncryptionClientError:
Expand Down Expand Up @@ -469,12 +477,14 @@ def delete_objects(self, **kwargs):
##= type=implementation
##% - DeleteObjects MUST delete each of the corresponding instruction files
##% using the default instruction file suffix.
instruction_objects = [
{"Key": obj["Key"] + instruction_file_suffix} for obj in kwargs["Delete"]["Objects"]
]
self.wrapped_s3_client.delete_objects(
Bucket=kwargs["Bucket"], Delete={"Objects": instruction_objects}
)
if not self.config.instruction_file_config.disable_delete_objects:
instruction_objects = [
{"Key": obj["Key"] + instruction_file_suffix}
for obj in kwargs["Delete"]["Objects"]
]
self.wrapped_s3_client.delete_objects(
Bucket=kwargs["Bucket"], Delete={"Objects": instruction_objects}
)

return response
except S3EncryptionClientError:
Expand Down
34 changes: 34 additions & 0 deletions src/s3_encryption/instruction_file_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Instruction file configuration for S3 Encryption Client.

This module provides configuration for instruction file behavior
during encryption and decryption operations.
"""

from attrs import define, field


@define
class InstructionFileConfig:
"""Configuration for instruction file behavior in the S3 Encryption Client.

Controls whether the client will interact with instruction files
as part of GetObject, DeleteObject, and DeleteObjects operations.

Attributes:
disable_get_object: If True, the client will not attempt to fetch
instruction files during GetObject (decryption) and will raise
an error if the object's metadata implies an instruction file
is required. Defaults to False.
disable_delete_object: If True, the client will not attempt to
delete the associated instruction file during DeleteObject.
Defaults to False.
disable_delete_objects: If True, the client will not attempt to
delete the associated instruction files during DeleteObjects.
Defaults to False.
"""

disable_get_object: bool = field(default=False)
disable_delete_object: bool = field(default=False)
disable_delete_objects: bool = field(default=False)
10 changes: 10 additions & 0 deletions src/s3_encryption/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .decryptor import AesCbcDecryptor, AesGcmDecryptor
from .exceptions import S3EncryptionClientError
from .instruction_file import fetch_instruction_file
from .instruction_file_config import InstructionFileConfig
from .key_derivation import derive_keys, verify_commitment
from .materials.crypto_materials_manager import AbstractCryptoMaterialsManager
from .materials.encrypted_data_key import EncryptedDataKey
Expand Down Expand Up @@ -181,6 +182,7 @@ class GetEncryptedObjectPipeline:
commitment_policy: CommitmentPolicy = field()
s3_client: object = field(default=None)
enable_legacy_unauthenticated_modes: bool = field(default=False)
instruction_file_config: InstructionFileConfig = field(factory=InstructionFileConfig)

# Map content cipher metadata values to AlgorithmSuite
_CONTENT_CIPHER_TO_ALGORITHM_SUITE = {
Expand Down Expand Up @@ -258,6 +260,14 @@ def decrypt(
# Check if we need to fetch instruction file
if metadata.should_use_instruction_file():

if self.instruction_file_config.disable_get_object:
raise S3EncryptionClientError(
"Exception encountered while fetching Instruction File. "
"Ensure the object you are attempting to decrypt has been encrypted "
"using the S3 Encryption Client and instruction files are enabled. "
f"bucket: {bucket}\n key: {key}"
)

if self.s3_client is None:
raise S3EncryptionClientError("s3_client required to fetch instruction file")
if bucket is None or key is None:
Expand Down
249 changes: 249 additions & 0 deletions test/integration/test_i_s3_encryption_instruction_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,252 @@ def test_decrypt_large_v3_instruction_file_delayed_auth():
total += len(chunk)

assert total == LARGE_FILE_SIZE


# --- InstructionFileConfig integration tests ---


def test_instruction_file_config_disabled_raises_on_instruction_file_object():
"""When instruction file get is disabled, decrypting an instruction-file object MUST fail."""
from s3_encryption.exceptions import S3EncryptionClientError
from s3_encryption.instruction_file_config import InstructionFileConfig

key = TEST_OBJECTS["v3_instruction_file"]

kms_client = boto3.client("kms", region_name=region)
keyring = KmsKeyring(kms_client, kms_key_id)
wrapped_client = boto3.client("s3")
config = S3EncryptionClientConfig(
keyring,
commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT,
instruction_file_config=InstructionFileConfig(disable_get_object=True),
)
s3ec = S3EncryptionClient(wrapped_client, config)

with pytest.raises(
S3EncryptionClientError, match="Exception encountered while fetching Instruction File"
):
s3ec.get_object(Bucket=bucket, Key=key)


def test_instruction_file_config_enabled_still_decrypts():
"""When instruction file get is explicitly enabled, decryption MUST succeed as before."""
from s3_encryption.instruction_file_config import InstructionFileConfig

key = TEST_OBJECTS["v3_instruction_file"]

kms_client = boto3.client("kms", region_name=region)
keyring = KmsKeyring(kms_client, kms_key_id)
wrapped_client = boto3.client("s3")
config = S3EncryptionClientConfig(
keyring,
commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT,
instruction_file_config=InstructionFileConfig(disable_get_object=False),
)
s3ec = S3EncryptionClient(wrapped_client, config)

response = s3ec.get_object(Bucket=bucket, Key=key)
output = response["Body"].read().decode("utf-8")

assert output == "static-v3-instruction-file-from-java-v4"


def test_instruction_file_config_disabled_allows_non_instruction_file_objects():
"""When instruction file get is disabled, objects with metadata in headers MUST still decrypt."""
from s3_encryption.instruction_file_config import InstructionFileConfig

kms_client = boto3.client("kms", region_name=region)
keyring = KmsKeyring(kms_client, kms_key_id)
wrapped_client = boto3.client("s3")

# First, put an object using default config (metadata in object headers)
put_config = S3EncryptionClientConfig(keyring)
put_client = S3EncryptionClient(boto3.client("s3"), put_config)

test_key = f"instruction-file-config-test-{uuid.uuid4()}"
plaintext = b"hello from instruction file config test"
put_client.put_object(Bucket=bucket, Key=test_key, Body=plaintext)

try:
# Now decrypt with instruction file get disabled
config = S3EncryptionClientConfig(
keyring,
instruction_file_config=InstructionFileConfig(disable_get_object=True),
)
s3ec = S3EncryptionClient(wrapped_client, config)

response = s3ec.get_object(Bucket=bucket, Key=test_key)
output = response["Body"].read()

assert output == plaintext
finally:
wrapped_client.delete_object(Bucket=bucket, Key=test_key)


def test_instruction_file_config_default_still_decrypts_instruction_files():
"""Default InstructionFileConfig (no explicit config) MUST still decrypt instruction files."""
key = TEST_OBJECTS["v3_instruction_file"]

kms_client = boto3.client("kms", region_name=region)
keyring = KmsKeyring(kms_client, kms_key_id)
wrapped_client = boto3.client("s3")
# No instruction_file_config specified — should use default (enabled)
config = S3EncryptionClientConfig(
keyring,
commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT,
)
s3ec = S3EncryptionClient(wrapped_client, config)

response = s3ec.get_object(Bucket=bucket, Key=key)
output = response["Body"].read().decode("utf-8")

assert output == "static-v3-instruction-file-from-java-v4"


# --- InstructionFileConfig delete_object / delete_objects integration tests ---


def _object_exists(bucket_name, key_name):
"""Return True if the object exists in the bucket."""
from botocore.exceptions import ClientError

s3 = boto3.client("s3")
try:
s3.head_object(Bucket=bucket_name, Key=key_name)
return True
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return False
raise


def test_delete_object_skips_instruction_file_when_disabled():
"""delete_object with disable_delete_object=True must NOT delete the instruction file."""
from s3_encryption.instruction_file_config import InstructionFileConfig

kms_client = boto3.client("kms", region_name=region)
keyring = KmsKeyring(kms_client, kms_key_id)
plain_s3 = boto3.client("s3")

test_key = f"ifc-delete-obj-skip-{uuid.uuid4()}"
instr_key = test_key + ".instruction"

# Put an encrypted object and a fake instruction file
default_client = S3EncryptionClient(boto3.client("s3"), S3EncryptionClientConfig(keyring))
default_client.put_object(Bucket=bucket, Key=test_key, Body=b"data")
plain_s3.put_object(Bucket=bucket, Key=instr_key, Body=b"{}")

try:
# Delete with instruction file deletion disabled
config = S3EncryptionClientConfig(
keyring,
instruction_file_config=InstructionFileConfig(disable_delete_object=True),
)
s3ec = S3EncryptionClient(boto3.client("s3"), config)
s3ec.delete_object(Bucket=bucket, Key=test_key)

# Object should be gone, instruction file should remain
assert not _object_exists(bucket, test_key)
assert _object_exists(bucket, instr_key)
finally:
# Clean up the instruction file
plain_s3.delete_object(Bucket=bucket, Key=instr_key)


def test_delete_object_deletes_instruction_file_when_not_disabled():
"""delete_object with default config must delete the instruction file."""
from s3_encryption.instruction_file_config import InstructionFileConfig

kms_client = boto3.client("kms", region_name=region)
keyring = KmsKeyring(kms_client, kms_key_id)
plain_s3 = boto3.client("s3")

test_key = f"ifc-delete-obj-default-{uuid.uuid4()}"
instr_key = test_key + ".instruction"

default_client = S3EncryptionClient(boto3.client("s3"), S3EncryptionClientConfig(keyring))
default_client.put_object(Bucket=bucket, Key=test_key, Body=b"data")
plain_s3.put_object(Bucket=bucket, Key=instr_key, Body=b"{}")

# Delete with default config (instruction file deletion enabled)
config = S3EncryptionClientConfig(
keyring,
instruction_file_config=InstructionFileConfig(disable_delete_object=False),
)
s3ec = S3EncryptionClient(boto3.client("s3"), config)
s3ec.delete_object(Bucket=bucket, Key=test_key)

assert not _object_exists(bucket, test_key)
assert not _object_exists(bucket, instr_key)


def test_delete_objects_skips_instruction_files_when_disabled():
"""delete_objects with disable_delete_objects=True must NOT delete instruction files."""
from s3_encryption.instruction_file_config import InstructionFileConfig

kms_client = boto3.client("kms", region_name=region)
keyring = KmsKeyring(kms_client, kms_key_id)
plain_s3 = boto3.client("s3")

keys = [f"ifc-delete-objs-skip-{uuid.uuid4()}" for _ in range(2)]
instr_keys = [k + ".instruction" for k in keys]

default_client = S3EncryptionClient(boto3.client("s3"), S3EncryptionClientConfig(keyring))
for key in keys:
default_client.put_object(Bucket=bucket, Key=key, Body=b"data")
for instr_key in instr_keys:
plain_s3.put_object(Bucket=bucket, Key=instr_key, Body=b"{}")

try:
config = S3EncryptionClientConfig(
keyring,
instruction_file_config=InstructionFileConfig(disable_delete_objects=True),
)
s3ec = S3EncryptionClient(boto3.client("s3"), config)
s3ec.delete_objects(
Bucket=bucket,
Delete={"Objects": [{"Key": k} for k in keys]},
)

for key in keys:
assert not _object_exists(bucket, key)
for instr_key in instr_keys:
assert _object_exists(bucket, instr_key)
finally:
plain_s3.delete_objects(
Bucket=bucket,
Delete={"Objects": [{"Key": k} for k in instr_keys]},
)


def test_delete_objects_deletes_instruction_files_when_not_disabled():
"""delete_objects with default config must delete instruction files."""
from s3_encryption.instruction_file_config import InstructionFileConfig

kms_client = boto3.client("kms", region_name=region)
keyring = KmsKeyring(kms_client, kms_key_id)
plain_s3 = boto3.client("s3")

keys = [f"ifc-delete-objs-default-{uuid.uuid4()}" for _ in range(2)]
instr_keys = [k + ".instruction" for k in keys]

default_client = S3EncryptionClient(boto3.client("s3"), S3EncryptionClientConfig(keyring))
for key in keys:
default_client.put_object(Bucket=bucket, Key=key, Body=b"data")
for instr_key in instr_keys:
plain_s3.put_object(Bucket=bucket, Key=instr_key, Body=b"{}")

config = S3EncryptionClientConfig(
keyring,
instruction_file_config=InstructionFileConfig(disable_delete_objects=False),
)
s3ec = S3EncryptionClient(boto3.client("s3"), config)
s3ec.delete_objects(
Bucket=bucket,
Delete={"Objects": [{"Key": k} for k in keys]},
)

for key in keys:
assert not _object_exists(bucket, key)
for instr_key in instr_keys:
assert not _object_exists(bucket, instr_key)
Loading
Loading