Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
aa38719
feat(decryption): add buffered streaming decryption for AES-GCM
texastony Mar 6, 2026
24eae31
feat(config): add enable_delayed_authentication option with duvet cit…
texastony Mar 9, 2026
44aac6c
feat(stream): add DelayedAuthDecryptingStream for unauthenticated str…
texastony Mar 9, 2026
f5eae60
feat(decrypt): wire delayed authentication through pipeline and event…
texastony Mar 9, 2026
f348251
test(integration): add parametrized roundtrip tests for buffered and …
texastony Mar 9, 2026
622b5a1
test(duvet): add streaming decryption tests for delayed authenticatio…
texastony Mar 9, 2026
5869c25
test(integration): parametrize instruction file tests with buffered a…
texastony Mar 9, 2026
377004b
test(integration): add large file and 61 GiB placeholder tests for de…
texastony Mar 9, 2026
182d228
fix: update decrypt return type docstring and verify full delayed-aut…
texastony Mar 9, 2026
4aaddc5
chore: register large pytest mark in pyproject.toml
texastony Mar 9, 2026
ef17832
test(integration): remove xlarge placeholder tests and large pytest mark
texastony Mar 10, 2026
edfcfb8
fix(test): use tuple for parametrize args and remove duplicate test_n…
texastony Mar 10, 2026
cf00af7
refactor: streaming decryptors accept cipher object and tag_length
texastony Mar 18, 2026
a2dd100
merge staging: integrate streaming decryptors with key commitment and…
texastony Mar 19, 2026
8c3fbd6
refactor: use AlgorithmSuite properties for tag length and block size
texastony Mar 19, 2026
a6fb136
fix: address PR #150 review comments from kessplas
texastony Mar 20, 2026
8d4dbdc
refactor: split DelayedAuthDecryptingStream into CBC and GCM classes
texastony Mar 23, 2026
4991e1b
refactor: rename BufferedDecryptingStream to BufferedDecryptingGCMStr…
texastony Mar 24, 2026
07dbee4
test: add unit and integration tests for CBC and GCM decrypting streams
texastony Mar 24, 2026
dad00c2
fix(stream): enforce minimum read size on DelayedAuthGCMDecryptingStream
texastony Mar 24, 2026
4a35e88
test(stream): strengthen CBC test assertions and include empty plaint…
texastony Mar 25, 2026
c8bd8a2
fix(pipelines): fix docstring param order and remove misplaced encryp…
texastony Mar 25, 2026
6fc4910
docs: add Google-style docstring to S3EncryptionClientConfig
texastony Mar 25, 2026
29bfb1c
docs: detail that CBC is always streamed
texastony Mar 25, 2026
0d58080
chore: address linting concern
texastony Mar 25, 2026
ad22880
refactor(stream): use ContentLength to eliminate rolling GCM tag buffer
texastony Mar 26, 2026
5d0ad56
fix(stream): return self from __enter__ and validate content_length
texastony Mar 26, 2026
ce010b3
Merge remote-tracking branch 'origin/staging' into tonyknap/feat-buff…
texastony Mar 27, 2026
30a135c
feat(examples): add usage examples with integration tests
texastony Mar 19, 2026
7245da1
refactor: move instruction_file_suffix from S3EncryptionClientConfig …
texastony Mar 27, 2026
016767a
feat(examples): add usage examples with integration tests
texastony Mar 19, 2026
f8a6784
Merge branch 'tonyknap/refactor-custom-instruction-file-suffix' into …
texastony Apr 8, 2026
6c7f65c
chore: address PR feedback
texastony Apr 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/python-integ.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ jobs:
CI_S3_BUCKET: ${{ vars.CI_S3_BUCKET }}
CI_KMS_KEY_ALIAS: ${{ vars.CI_KMS_KEY_ALIAS }}

- name: Run examples
run: make test-examples

- name: Generate coverage HTML report
if: always()
run: uv run coverage html -d coverage-report
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ format:
uv run ruff check --fix src/ test/

# Run all tests with combined coverage
test: test-unit test-integration
test: test-unit test-integration test-examples

# Run unit tests (creates .coverage report)
test-unit:
Expand All @@ -31,6 +31,9 @@ test-unit:
test-integration:
uv run pytest test/integration/ --verbose --cov=src/s3_encryption --cov-append --cov-report=term-missing

test-examples:
uv run pytest examples/test/ -v

# Clean up cache files
clean:
find . -type d -name __pycache__ -exec rm -rf {} +
Expand Down
2 changes: 2 additions & 0 deletions examples/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
2 changes: 2 additions & 0 deletions examples/src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
89 changes: 89 additions & 0 deletions examples/src/delayed_auth_streaming_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
This example demonstrates streaming decryption with delayed authentication
using the S3 Encryption Client.

By default, the S3 Encryption Client buffers the entire ciphertext and verifies
the authentication tag before releasing any plaintext. This is the safest mode,
but requires holding the entire object in memory.

With delayed authentication enabled, plaintext is released incrementally as it
is decrypted, before the authentication tag has been verified. This allows
processing large files without buffering the entire object in memory.

Your application should still read the stream to completion. In the event that
an error is thrown in the final read due to an invalid authentication tag,
your application must be able to invalidate the associated data.

WARNING: With delayed authentication, plaintext is released before it has been
authenticated. An attacker could modify the ciphertext and the client would
release tampered plaintext before detecting the modification. Only use this
mode when you need to process files too large to buffer in memory and you
understand the security implications.

Comment thread
kessplas marked this conversation as resolved.
This example:
1. Creates a KMS Keyring
2. Configures the S3 Encryption Client with delayed authentication enabled
3. Encrypts and uploads a large object to S3
4. Streams the decrypted object back, reading it in chunks
5. Verifies the decrypted content matches the original
"""

from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig
from s3_encryption.exceptions import S3EncryptionClientSecurityError
from s3_encryption.materials.kms_keyring import KmsKeyring

# 10 MB of example data
EXAMPLE_DATA: bytes = b"A" * (10 * 1024 * 1024)
CHUNK_SIZE = 1024 * 1024 # 1 MB


def delayed_auth_streaming_decrypt(
s3_client, kms_client, kms_key_id: str, bucket: str, key: str
):
"""Demonstrate streaming decryption with delayed authentication.

Args:
s3_client: boto3 S3 client.
kms_client: boto3 KMS client.
kms_key_id: KMS key ARN or alias to use for encryption/decryption.
bucket: S3 bucket name.
key: S3 object key.
"""
# 1. Create a KMS Keyring.
keyring = KmsKeyring(kms_client=kms_client, kms_key_id=kms_key_id)

# 2. Configure the S3 Encryption Client with delayed authentication.
config = S3EncryptionClientConfig(
keyring=keyring,
enable_delayed_authentication=True,
)
s3ec = S3EncryptionClient(wrapped_s3_client=s3_client, config=config)

# 3. Encrypt and upload the object.
s3ec.put_object(Bucket=bucket, Key=key, Body=EXAMPLE_DATA)

# 4. Stream the decrypted object back in chunks.
# With delayed authentication, plaintext is released incrementally
# without buffering the entire object in memory.
response = s3ec.get_object(Bucket=bucket, Key=key)
body = response["Body"]

chunks = []
try:
while True:
chunk = body.read(CHUNK_SIZE)
if not chunk:
break
chunks.append(chunk)

plaintext = b"".join(chunks)

except S3EncryptionClientSecurityError:
# Authentication tag verification failed.
# Discard any plaintext released before the error.
raise

# 5. Verify the decrypted content matches the original.
assert plaintext == EXAMPLE_DATA, "Decrypted plaintext does not match original data"
59 changes: 59 additions & 0 deletions examples/src/instruction_file_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
This example demonstrates decrypting S3 objects that store their encryption
metadata in instruction files rather than S3 object metadata.

An instruction file is a companion S3 object that contains the encryption
metadata (encrypted data key, IV, algorithm, etc.) as JSON. By default,
the instruction file has the same key as the encrypted object with a
".instruction" suffix appended.
Comment thread
kessplas marked this conversation as resolved.

You can also use a custom instruction file suffix. This requires configuring
the S3 Encryption Client with the matching suffix.

NOTE: At this time, the S3 Encryption Client in Python ONLY supports decrypting
(reading) with instruction files; encrypting with instruction files is not supported
at this time.

This example:
1. Decrypts an object using the default instruction file suffix (".instruction")
2. Decrypts the same object using a custom instruction file suffix
"""

from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig
from s3_encryption.materials.kms_keyring import KmsKeyring


def instruction_file_get(
s3_client, kms_client, kms_key_id: str, bucket: str, key: str, expected_plaintext: bytes
):
"""Demonstrate decrypting objects with default and custom instruction file suffixes.

Args:
s3_client: boto3 S3 client.
kms_client: boto3 KMS client.
kms_key_id: KMS key ARN or alias used to encrypt the object.
bucket: S3 bucket containing the encrypted object and instruction files.
key: S3 object key of the encrypted object.
expected_plaintext: Expected plaintext content for verification.
"""
keyring = KmsKeyring(kms_client=kms_client, kms_key_id=kms_key_id)

# 1. Decrypt using the default instruction file suffix (".instruction").
# The client will fetch "<key>.instruction" for the encryption metadata.
config = S3EncryptionClientConfig(keyring=keyring)
s3ec = S3EncryptionClient(wrapped_s3_client=s3_client, config=config)

response = s3ec.get_object(Bucket=bucket, Key=key)
plaintext = response["Body"].read()
assert plaintext == expected_plaintext, "Default suffix: decrypted plaintext does not match"

# 2. Decrypt while specifying the Instruction File Suffix
# InstructionFileSuffix is a per-request keyword argument on get_object,
# so the same client can use different suffixes per request.
response = s3ec.get_object(
Bucket=bucket, Key=key, InstructionFileSuffix=".custom-suffix-instruction"
)
plaintext = response["Body"].read()
assert plaintext == expected_plaintext, "Custom suffix: decrypted plaintext does not match"
95 changes: 95 additions & 0 deletions examples/src/kms_keyring_put_get_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
This example demonstrates a basic put/get roundtrip using the S3 Encryption Client
with a KMS Keyring.

The KMS Keyring uses a symmetric KMS key to generate and decrypt data keys.
The S3 Encryption Client encrypts the object before uploading to S3 and decrypts
it on download, so the data is protected at rest.

This example:
1. Creates a KMS Keyring with the provided KMS key ID
2. Wraps a boto3 S3 client with the S3 Encryption Client
3. Creates an encryption context bound to the S3 bucket and key
4. Puts an encrypted object to S3
5. Gets and decrypts the object from S3
6. Verifies the decrypted plaintext matches the original

Here is an example KMS Key Policy statement that would validate the
Encryption Context used in this example::

Sid: RestrictToEncryptionContextBucket
Effect: Allow
Principal:
AWS: "arn:aws:iam::<account-id>:role/<role-name>"
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource: "*"
Condition:
StringEquals:
"kms:EncryptionContext:aws-s3-bucket": "<bucket>"
"""

from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig, S3EncryptionClientError
from s3_encryption.materials.kms_keyring import KmsKeyring

EXAMPLE_DATA: bytes = b"Hello, S3 Encryption Client!"


def kms_keyring_put_get(s3_client, kms_client, kms_key_id: str, bucket: str, key: str):
"""Demonstrate an encrypt/decrypt cycle using a KMS Keyring with S3.

Args:
s3_client: boto3 S3 client.
kms_client: boto3 KMS client.
kms_key_id: KMS key ARN or alias to use for encryption/decryption.
bucket: S3 bucket name.
key: S3 object key.
"""
# 1. Create a KMS Keyring.
keyring = KmsKeyring(kms_client=kms_client, kms_key_id=kms_key_id)

# 2. Wrap the S3 client with the S3 Encryption Client.
# The default commitment policy is REQUIRE_ENCRYPT_REQUIRE_DECRYPT,
# which enforces key-committing algorithm suites on both encrypt and decrypt.
config = S3EncryptionClientConfig(keyring=keyring)
s3ec = S3EncryptionClient(wrapped_s3_client=s3_client, config=config)

# 3. Create an encryption context.
# The encryption context is a set of key-value pairs that are bound to the ciphertext.
# Including the bucket and key ensures the ciphertext is tied to this specific S3 object.
# This will also be visible to KMS when evaluating key policies.
# See the example KMS Key Policy in this module's docstring.
# The encryption context is optional, but strongly recommended.
encryption_context = {
"aws-s3-bucket": bucket,
"aws-s3-key": key,
}

# 4. Put an encrypted object.
s3ec.put_object(Bucket=bucket, Key=key, Body=EXAMPLE_DATA, EncryptionContext=encryption_context)

# 5. Get and decrypt the object.
# If you specified an encryption context during encryption,
# you must provide the same encryption context during decryption.
response = s3ec.get_object(Bucket=bucket, Key=key, EncryptionContext=encryption_context)
plaintext = response["Body"].read()

Comment thread
kessplas marked this conversation as resolved.
# 6. Optional Verify the decrypted plaintext matches the original.
assert plaintext == EXAMPLE_DATA, "Decrypted plaintext does not match original data"

# However, if the Encryption Context is not present at decryption time, then decryption will fail
failed = False
try:
_ = s3ec.get_object(
Bucket=bucket, Key=key,
# Incomplete Encryption Context
EncryptionContext={"aws-s3-bucket": bucket})
except S3EncryptionClientError as e:
failed = True
assert hasattr(e, "kwargs")
assert e.kwargs.get("msg") is not None
assert e.kwargs.get("msg") == "Provided encryption context does not match information retrieved from S3"
assert failed
60 changes: 60 additions & 0 deletions examples/src/legacy_decrypt_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
This example demonstrates how to decrypt legacy S3 objects that were encrypted
using older versions of the S3 Encryption Client (V1 or V2).

Legacy objects use the KmsV1 wrapping algorithm and may use unauthenticated
content encryption (AES-CBC). To decrypt these objects, you must:
1. Enable legacy wrapping algorithms on the KMS Keyring
2. Enable legacy unauthenticated modes on the S3 Encryption Client config
3. Use a commitment policy that allows non-key-committing algorithm suites

This example:
1. Creates a KMS Keyring with legacy wrapping algorithms enabled
2. Configures the S3 Encryption Client with legacy decryption support
3. Decrypts a legacy V1 object from S3
4. Verifies the decrypted plaintext matches the expected content
"""

from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig
from s3_encryption.materials.kms_keyring import KmsKeyring
from s3_encryption.materials.materials import CommitmentPolicy


def decrypt_legacy_object(s3_client, kms_client, kms_key_id: str, bucket: str, key: str):
"""Decrypt a legacy S3 object encrypted by an older S3 Encryption Client.

Args:
s3_client: boto3 S3 client.
kms_client: boto3 KMS client.
kms_key_id: KMS key ARN or alias used to encrypt the object.
bucket: S3 bucket name.
key: S3 object key.

Returns:
Decrypted plaintext bytes.
"""
# 1. Create a KMS Keyring with legacy wrapping algorithms enabled.
# This allows the keyring to decrypt data keys wrapped using the KmsV1 mode,
# which older S3 Encryption Clients used.
keyring = KmsKeyring(
kms_client=kms_client,
kms_key_id=kms_key_id,
enable_legacy_wrapping_algorithms=True,
)

# 2. Configure the S3 Encryption Client for legacy decryption.
# - enable_legacy_unauthenticated_modes: allows decryption of AES-CBC content
# - REQUIRE_ENCRYPT_ALLOW_DECRYPT: new objects are encrypted with key-committing
# algorithm suites, while still allowing decryption of legacy objects
config = S3EncryptionClientConfig(
keyring=keyring,
enable_legacy_unauthenticated_modes=True,
commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT,
)
s3ec = S3EncryptionClient(wrapped_s3_client=s3_client, config=config)

# 3. Decrypt the legacy object.
response = s3ec.get_object(Bucket=bucket, Key=key)
return response["Body"].read()
2 changes: 2 additions & 0 deletions examples/test/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
27 changes: 27 additions & 0 deletions examples/test/test_i_delayed_auth_streaming_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Test suite for the delayed auth streaming decrypt example."""
import boto3
import pytest

from ..src.delayed_auth_streaming_example import delayed_auth_streaming_decrypt

pytestmark = [pytest.mark.examples]

BUCKET = "s3ec-python-github-test-bucket"
KEY = "examples/delayed-auth-streaming"
KMS_KEY_ID = "arn:aws:kms:us-west-2:370957321024:alias/S3EC-Python-Github-KMS-Key"


def test_delayed_auth_streaming_decrypt():
s3_client = boto3.client("s3", region_name="us-west-2")
kms_client = boto3.client("kms", region_name="us-west-2")
delayed_auth_streaming_decrypt(
s3_client=s3_client,
kms_client=kms_client,
kms_key_id=KMS_KEY_ID,
bucket=BUCKET,
key=KEY,
)
# Clean up
s3_client.delete_object(Bucket=BUCKET, Key=KEY)
Loading
Loading