Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
54cb127
first pass at KC GCM enc/dec
kessplas Feb 27, 2026
281531e
first pass test server
kessplas Feb 27, 2026
abb0e21
fix duvet
kessplas Feb 28, 2026
5e0626b
fix wrap alg
kessplas Feb 28, 2026
ec78e56
fix duvet
kessplas Feb 28, 2026
ad89776
fixup
kessplas Feb 28, 2026
2757ea6
implement CBC
kessplas Feb 28, 2026
711edee
format
kessplas Feb 28, 2026
a620bcb
test-server legacy unauth
kessplas Mar 2, 2026
27e8fc8
fix custom suffix test
kessplas Mar 5, 2026
05d376b
tighter validation
kessplas Mar 5, 2026
155caa1
fix duvet
kessplas Mar 5, 2026
ffa2a78
improve code
kessplas Mar 5, 2026
8dd7eb4
more better code
kessplas Mar 5, 2026
99ef305
decrypt annotations
kessplas Mar 6, 2026
98a42c3
all the spec, all the tests!
kessplas Mar 6, 2026
aab46cb
add TestServer test that fails when encrypt validation is missing
kessplas Mar 6, 2026
f7a3751
investigate commit policy validation errors
kessplas Mar 6, 2026
e1ab926
add validation, only test Python in TestServer, add integ tests
kessplas Mar 6, 2026
96d093e
fix integ tests
kessplas Mar 6, 2026
dbf7b12
consolidate crypto params into AlgorithmSuite
kessplas Mar 11, 2026
7763e0e
rename algorithm_suite to encryption_algorithm in public config API
kessplas Mar 11, 2026
44c1a5b
revise annotations, improve test quality
kessplas Mar 11, 2026
79a69bf
formatting
kessplas Mar 11, 2026
f7ce7e5
linter
kessplas Mar 11, 2026
b219535
default to key committing encryption algorithm
kessplas Mar 13, 2026
9472420
PR feedback
kessplas Mar 13, 2026
2a11683
lint
kessplas Mar 13, 2026
95977d2
PR feedback - make pipelines require enc alg
kessplas Mar 18, 2026
6f5b380
PR feedback - make commit policy required in decrypt pipeline
kessplas Mar 18, 2026
ff188fd
PR feedback + refactor
kessplas Mar 18, 2026
9077125
format
kessplas Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions compliance_exceptions/decryption_exceptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Compliance Exceptions for Decryption Implementation

## Summary

The Python S3 Encryption Client does not currently support Ranged Gets.
Ranged Gets allow downloading and decrypting a subset of bytes from an encrypted S3 object.
This is an optional feature per the specification ("MAY support") and is planned for a future release.

## Ranged Gets

##= specification/s3-encryption/decryption.md#ranged-gets
##= type=exception
##% The S3EC MAY support the "range" parameter on GetObject which specifies a subset of bytes to download and decrypt.

Justification: Ranged Gets are not yet implemented in the Python S3 Encryption Client. The specification uses MAY, making this an optional feature. This is planned for a future release.

---

##= specification/s3-encryption/decryption.md#ranged-gets
##= type=exception
##% If the S3EC supports Ranged Gets, the S3EC MUST adjust the customer-provided range to include the beginning and end of the cipher blocks for the given range.

Justification: Not applicable since Ranged Gets are not yet supported. When Ranged Gets are implemented, this requirement will be fulfilled.

---

##= specification/s3-encryption/decryption.md#ranged-gets
##= type=exception
##% If the object was encrypted with ALG_AES_256_GCM_IV12_TAG16_NO_KDF, then ALG_AES_256_CTR_IV16_TAG16_NO_KDF MUST be used to decrypt the range of the object.

Justification: Not applicable since Ranged Gets are not yet supported. When Ranged Gets are implemented, the correct CTR-mode algorithm suite will be used for GCM-encrypted objects.

---

##= specification/s3-encryption/decryption.md#ranged-gets
##= type=exception
##% If the object was encrypted with ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY, then ALG_AES_256_CTR_HKDF_SHA512_COMMIT_KEY MUST be used to decrypt the range of the object.

Justification: Not applicable since Ranged Gets are not yet supported. When Ranged Gets are implemented, the correct CTR-mode algorithm suite will be used for key-committing objects.

---

##= specification/s3-encryption/decryption.md#ranged-gets
##= type=exception
##% If the GetObject response contains a range, but the GetObject request does not contain a range, the S3EC MUST throw an exception.

Justification: Not applicable since Ranged Gets are not yet supported. When Ranged Gets are implemented, this validation will be added to detect unexpected range responses.

---
63 changes: 63 additions & 0 deletions compliance_exceptions/encryption_exceptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Compliance Exceptions for Encryption Implementation

## Summary

The Python S3 Encryption Client does not implement AES-CTR algorithm suites (used only for ranged-get decryption),
does not yet validate IV/Message ID for zero values, does not validate maximum plaintext length,
and relies on Python's `cryptography` library to automatically append GCM auth tags.

## AES-CTR Algorithm Suites

##= specification/s3-encryption/encryption.md#alg-aes-256-ctr-hkdf-sha512-commit-key
##= type=exception
##% Attempts to encrypt using key committing AES-CTR MUST fail.

Justification: The AES-CTR algorithm suites are only used for ranged-get decryption. Since ranged gets are not yet implemented, these algorithm suites are not defined in the `AlgorithmSuite` enum and cannot be selected for encryption. The constraint is satisfied structurally.

---

##= specification/s3-encryption/encryption.md#alg-aes-256-ctr-iv16-tag16-no-kdf
##= type=exception
##% Attempts to encrypt using AES-CTR MUST fail.

Justification: Same as above. AES-CTR is not available as an algorithm suite option, so it cannot be used for encryption.

---

## GCM Auth Tag Appending

##= specification/s3-encryption/encryption.md#alg-aes-256-gcm-hkdf-sha512-commit-key
##= type=exception
##% The client MUST append the GCM auth tag to the ciphertext if the underlying crypto provider does not do so automatically.

Justification: Python's `cryptography` library (`AESGCM.encrypt`) automatically appends the GCM authentication tag to the ciphertext. No manual appending is needed.

---

##= specification/s3-encryption/encryption.md#alg-aes-256-gcm-iv12-tag16-no-kdf
##= type=exception
##% The client MUST append the GCM auth tag to the ciphertext if the underlying crypto provider does not do so automatically.

Justification: Python's `cryptography` library (`AESGCM.encrypt`) automatically appends the GCM authentication tag to the ciphertext. No manual appending is needed.

---

## Cipher Initialization Validation

##= specification/s3-encryption/encryption.md#cipher-initialization
##= type=exception
##% The client SHOULD validate that the generated IV or Message ID is not zeros.

Justification: This SHOULD-level validation is not yet implemented. The IV and Message ID are generated using `os.urandom()`, which is cryptographically secure and extremely unlikely to produce all-zero output. This validation is planned for a future release.

---

## Plaintext Length Validation

##= specification/s3-encryption/encryption.md#content-encryption
##= type=exception
##% The client MUST validate that the length of the plaintext bytes does not exceed the algorithm suite's cipher's maximum content length in bytes.

Justification: Maximum plaintext length validation is not yet implemented. For AES-GCM with a 12-byte IV, the maximum plaintext size is approximately 64 GiB, which exceeds practical S3 single-object upload limits. This validation is planned for a future release.

---
106 changes: 88 additions & 18 deletions src/s3_encryption/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,39 @@
DefaultCryptoMaterialsManager,
)
from .materials.keyring import AbstractKeyring
from .materials.materials import AlgorithmSuite, CommitmentPolicy
from .pipelines import GetEncryptedObjectPipeline, PutEncryptedObjectPipeline

S3_METADATA_PREFIX = "x-amz-meta-"

# Thread-local context attribute names
_CTX_ENCRYPTION_CONTEXT = "encryption_context"
_CTX_BUCKET = "bucket"
_CTX_KEY = "key"
_CTX_S3_CLIENT = "s3_client"
_CTX_INSTRUCTION_FILE_MODE = "instruction_file_mode"

# Attributes to clean up after get_object completes
# (s3_client is intentionally excluded — it is not request-scoped)
_GET_OBJECT_CLEANUP_ATTRS = (_CTX_ENCRYPTION_CONTEXT, _CTX_BUCKET, _CTX_KEY)


@define
class S3EncryptionClientConfig:
"""Configuration object for the S3 Encryption Client."""

keyring: AbstractKeyring
encryption_algorithm: AlgorithmSuite = field(
default=AlgorithmSuite.ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY
)
commitment_policy: CommitmentPolicy = field(
default=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
)
Comment on lines +40 to +45

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there no Duvet annotations for these defaults?
You would think there would be.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would. Cut #155 to update spec.

##= specification/s3-encryption/client.md#enable-legacy-unauthenticated-modes
##% The S3EC MUST support the option to enable or disable legacy unauthenticated modes (content encryption algorithms).
##= specification/s3-encryption/client.md#enable-legacy-unauthenticated-modes
##% The option to enable legacy unauthenticated modes MUST be set to false by default.
enable_legacy_unauthenticated_modes: bool = field(default=False)
cmm: AbstractCryptoMaterialsManager = field()
##= specification/s3-encryption/data-format/metadata-strategy.md#instruction-file
##= type=implementation
Expand All @@ -41,6 +64,51 @@ class S3EncryptionClientConfig:
def _default_cmm_for_keyring(self):
return DefaultCryptoMaterialsManager(self.keyring)

##= specification/s3-encryption/client.md#encryption-algorithm
##% The S3EC MUST validate that the configured encryption algorithm is not legacy.
##= specification/s3-encryption/client.md#encryption-algorithm
##% If the configured encryption algorithm is legacy, then the S3EC MUST throw an exception.
##= specification/s3-encryption/client.md#key-commitment
##% The S3EC MUST validate the configured Encryption Algorithm against the provided key commitment policy.
##= specification/s3-encryption/client.md#key-commitment
##% If the configured Encryption Algorithm is incompatible with the key commitment policy, then it MUST throw an exception.
def __attrs_post_init__(self):

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Praise: Award winning error experience.

"""Validate algorithm suite and commitment policy configuration."""
if self.encryption_algorithm.is_legacy:
raise S3EncryptionClientError(
f"Cannot configure S3 Encryption Client with legacy algorithm suite "
f"{self.encryption_algorithm.name}. Legacy algorithm suites are only "
f"supported for decryption (and enable_legacy_unauthenticated_modes is True)."
)

##= specification/s3-encryption/key-commitment.md#commitment-policy
##% When the commitment policy is REQUIRE_ENCRYPT_ALLOW_DECRYPT, the S3EC MUST only encrypt using an algorithm suite which supports key commitment.
##= specification/s3-encryption/key-commitment.md#commitment-policy
##% When the commitment policy is REQUIRE_ENCRYPT_REQUIRE_DECRYPT, the S3EC MUST only encrypt using an algorithm suite which supports key commitment.
if (
self.commitment_policy
in (
CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT,
CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT,
)
and not self.encryption_algorithm.supports_key_commitment
):
raise S3EncryptionClientError(
f"Commitment policy {self.commitment_policy.name} requires a key-committing "
f"algorithm suite, but {self.encryption_algorithm.name} does not support key commitment."
)

##= specification/s3-encryption/key-commitment.md#commitment-policy
##% When the commitment policy is FORBID_ENCRYPT_ALLOW_DECRYPT, the S3EC MUST NOT encrypt using an algorithm suite which supports key commitment.
if (
self.commitment_policy == CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT
and self.encryption_algorithm.supports_key_commitment
):
raise S3EncryptionClientError(
f"Commitment policy {self.commitment_policy.name} forbids key-committing "
f"algorithm suites, but {self.encryption_algorithm.name} supports key commitment."
)


class S3EncryptionClientPlugin:
"""Plugin that adds encryption/decryption capabilities to a boto3 S3 client.
Expand All @@ -67,7 +135,7 @@ def on_put_object_before_call(self, params, **kwargs):
params: Dictionary of parameters for the PutObject call (after serialization)
**kwargs: Additional event arguments
"""
if getattr(self._context, "instruction_file_mode", False):
if getattr(self._context, _CTX_INSTRUCTION_FILE_MODE, False):
raise S3EncryptionClientError(
"Instruction file mode is exclusively for reading instruction files "
"and not supported in put_object!"
Expand All @@ -88,11 +156,12 @@ def on_put_object_before_call(self, params, **kwargs):
# Unexpected body type - should not happen as boto3 validates before this point
raise S3EncryptionClientError("Unexpected type of body parameter!")

encryption_context = getattr(self._context, "encryption_context", None)
encryption_context = getattr(self._context, _CTX_ENCRYPTION_CONTEXT, None)

pipeline = PutEncryptedObjectPipeline(self.config.cmm)
pipeline = PutEncryptedObjectPipeline(self.config.cmm, self.config.encryption_algorithm)
encrypted_data, encryption_metadata = pipeline.encrypt(
body_bytes, encryption_context=encryption_context
body_bytes,
encryption_context=encryption_context,
)

params["body"] = encrypted_data
Expand All @@ -118,12 +187,12 @@ def on_get_object_after_call(self, parsed, **kwargs):
**kwargs: Additional event arguments (includes 'params' with request parameters)
"""
# Check if plaintext mode is enabled via thread-local flag
if getattr(self._context, "instruction_file_mode", False):
if getattr(self._context, _CTX_INSTRUCTION_FILE_MODE, False):
self.process_instruction_file(parsed)
return

# Get encryption context from thread-local storage (set by get_object wrapper)
encryption_context = getattr(self._context, "encryption_context", None)
encryption_context = getattr(self._context, _CTX_ENCRYPTION_CONTEXT, None)

# The parsed response already has the Body as a StreamingBody
# We need to read it, decrypt it, and replace it
Expand All @@ -137,13 +206,15 @@ def on_get_object_after_call(self, parsed, **kwargs):
# Create a pipeline and decrypt the data
pipeline = GetEncryptedObjectPipeline(
self.config.cmm,
s3_client=getattr(self._context, "s3_client", None),
commitment_policy=self.config.commitment_policy,
s3_client=getattr(self._context, _CTX_S3_CLIENT, None),
enable_legacy_unauthenticated_modes=self.config.enable_legacy_unauthenticated_modes,
)
decrypted_data = pipeline.decrypt(
response,
encryption_context,
bucket=getattr(self._context, "bucket", None),
key=getattr(self._context, "key", None),
bucket=getattr(self._context, _CTX_BUCKET, None),
key=getattr(self._context, _CTX_KEY, None),
instruction_suffix=self.config.instruction_file_suffix,
)

Expand All @@ -163,7 +234,7 @@ def process_instruction_file(self, parsed):
Args:
parsed: Dictionary containing the parsed response
"""
instruction_key = getattr(self._context, "key", None)
instruction_key = getattr(self._context, _CTX_KEY, None)

# In plaintext mode, parse instruction file and append to metadata
existing_metadata = parsed.get("Metadata", {})
Expand Down Expand Up @@ -242,8 +313,8 @@ def put_object(self, **kwargs):
raise S3EncryptionClientError(f"Failed to encrypt object: {str(e)}") from e
finally:
# Clean up thread-local storage
if hasattr(self._plugin._context, "encryption_context"):
delattr(self._plugin._context, "encryption_context")
if hasattr(self._plugin._context, _CTX_ENCRYPTION_CONTEXT):
delattr(self._plugin._context, _CTX_ENCRYPTION_CONTEXT)

def get_object(self, **kwargs):
"""Download and decrypt an object from S3.
Expand All @@ -266,12 +337,12 @@ def get_object(self, **kwargs):
encryption_context = kwargs.pop("EncryptionContext", None)

# Store encryption context in thread-local storage for the event handler
self._plugin._context.encryption_context = encryption_context
setattr(self._plugin._context, _CTX_ENCRYPTION_CONTEXT, encryption_context)
# Store wrapped client in thread-local storage for
# the event handler to fetch instruction files
self._plugin._context.s3_client = self.wrapped_s3_client
self._plugin._context.bucket = kwargs.get("Bucket")
self._plugin._context.key = kwargs.get("Key")
setattr(self._plugin._context, _CTX_S3_CLIENT, self.wrapped_s3_client)
setattr(self._plugin._context, _CTX_BUCKET, kwargs.get("Bucket"))
setattr(self._plugin._context, _CTX_KEY, kwargs.get("Key"))

try:
return self.wrapped_s3_client.get_object(**kwargs)
Expand All @@ -284,7 +355,6 @@ def get_object(self, **kwargs):
finally:
# Clean up thread-local storage;
# do not clean up the client as it is not thread local only
attrs = ["encryption_context", "Bucket", "Key"]
for attr in attrs:
for attr in _GET_OBJECT_CLEANUP_ATTRS:
if hasattr(self._plugin._context, attr):
delattr(self._plugin._context, attr)
Loading
Loading