Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions src/s3_encryption/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
import io

from attrs import define, field
from botocore import serialize
from botocore.response import StreamingBody

from .exceptions import S3EncryptionClientError
from .materials.crypto_materials_manager import (
AbstractCryptoMaterialsManager,
DefaultCryptoMaterialsManager,
)
from .materials.keyring import AbstractKeyring
from .pipelines import GetEncryptedObjectPipeline, PutEncryptedObjectPipeline

DEFAULT_ENCODING = "utf-8"


@define
class S3EncryptionClientConfig:
Expand All @@ -37,6 +41,21 @@ class S3EncryptionClient:
wrapped_s3_client = field()
config: S3EncryptionClientConfig = field()

def __attrs_post_init__(self):
"""Validate serialization encoding after initialization.

Ensures boto3 serializers are using the expected default encoding.
"""
# Sanity check that boto3 serialization are ONLY using the default encoding (utf-8)
# This should always be the case, but changes in encoding would break the assumption that
# the decrypted plaintext adheres to the non-utf8 encoding scheme. So we avoid that.
for sz_name, sz in serialize.SERIALIZERS.items():
if sz.DEFAULT_ENCODING != DEFAULT_ENCODING:
raise S3EncryptionClientError(
f"All Serializers MUST only support utf-8 encoding, but {sz_name} is using "
f"{sz.DEFAULT_ENCODING}!"
)
Comment on lines +53 to +57

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit testing this could be annoying due to global module -- maybe have the test inject a new serializer into the module then remove it?


def put_object(self, **kwargs):
"""Encrypt and upload an object to S3.

Expand All @@ -61,12 +80,27 @@ def put_object(self, **kwargs):
# Create a pipeline for this operation
pipeline = PutEncryptedObjectPipeline(self.config.cmm)

# Encrypt the data using the pipeline
data_bytes = body
# We probably just shouldn't support strings, use utf8 for now
# TODO: look deeper into this, what does normal boto3 do?
# The documentation for boto3 asks for bytes or a file-like object,
# but in reality, it is possible to pass strings.
# Strings will be encoded using DEFAULT_ENCODING,
# which MUST match the default encoding defined int the Serializer class in botocore.
if isinstance(body, str):
data_bytes = body.encode("utf-8")
data_bytes = body.encode(DEFAULT_ENCODING)
elif isinstance(body, bytes):
data_bytes = body
elif isinstance(body, io.IOBase):
# TODO: Streaming support
raise S3EncryptionClientError(
f"Body parameter of type {type(body)} is not an acceptable type! "
f"Streaming operations are not yet supported."
)
else:
raise S3EncryptionClientError(
f"Body parameter of type {type(body)} is not an acceptable type! "
f"Use bytes or a file-like object."
)

# Now encrypt the bytes/file-like IOBase object
encrypted_data, encryption_metadata = pipeline.encrypt(
data_bytes, encryption_context=encryption_context
)
Expand Down
188 changes: 187 additions & 1 deletion test/integration/test_i_s3_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from datetime import datetime

import boto3
import pytest

from s3_encryption import S3EncryptionClient, S3EncryptionClientConfig
from s3_encryption.exceptions import S3EncryptionClientError
from s3_encryption.materials.kms_keyring import KmsKeyring

bucket = os.environ.get("CI_S3_BUCKET", "s3ec-python-github-test-bucket")
Expand All @@ -15,7 +17,7 @@
)


def test_simple_roundtrip():
def test_simple_roundtrip_ascii_string():
key = "simple-rt"
key += datetime.now().strftime("%Y-%m-%d-%H:%M:%S")

Expand Down Expand Up @@ -101,3 +103,187 @@ def test_no_body_roundtrip():
print(
"Success! Object with no Body parameter encrypted and decrypted correctly as empty bytes."
)


def test_unicode_string_roundtrip():

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note that might help you as you progress in writing tests:
For DBESDK RTTs, I liked to test the same input with both a vanilla boto3 client and our encrypted client.
Given the same input, each client should (usually) expect the same output.

Here's how you could set that up:
https://github.com/aws/aws-database-encryption-sdk-dynamodb/blob/python-poc/DynamoDbEncryption/runtimes/python/test/integ/encrypted/test_client.py#L75-L105

key = "unicode-string-rt"
key += datetime.now().strftime("%Y-%m-%d-%H:%M:%S")

# String with unusual Unicode characters
data = "Unicode test: 你好, こんにちは, 안녕하세요, Привет, مرحبا, ¡Hola!, ½⅓¼⅕⅙⅐⅛⅑⅒⅔⅖⅗⅘⅙⅚⅜⅝⅞"

kms_client = boto3.client("kms", region_name=region)

keyring = KmsKeyring(kms_client, kms_key_id)

wrapped_client = boto3.client("s3")
config = S3EncryptionClientConfig(keyring)
s3ec = S3EncryptionClient(wrapped_client, config)
s3ec.put_object(Bucket=bucket, Key=key, Body=data)
get_req = {"Bucket": bucket, "Key": key}
response = s3ec.get_object(**get_req)

# Boto3 encodes to utf-8 in put_object but does not
# decode in get_object; do so manually to complete the
# round trip
output = response["Body"].read().decode("utf-8")
if output != data:
print("Uh oh! Input and output don't match!")
print("Input:")
print(repr(data))
print("Output:")
print(repr(output))
raise RuntimeError
print("Success! Unicode string encrypted and decrypted correctly.")


def test_specific_encoding_utf8_roundtrip():
key = "utf8-encoding-rt"
key += datetime.now().strftime("%Y-%m-%d-%H:%M:%S")

# String with mixed characters
data = "UTF-8 encoding test: 你好, こんにちは, 안녕하세요, Привет, مرحبا, ¡Hola!"

# Explicitly encode as UTF-8 before sending
encoded_data = data.encode("utf-8")

kms_client = boto3.client("kms", region_name=region)

keyring = KmsKeyring(kms_client, kms_key_id)

wrapped_client = boto3.client("s3")
config = S3EncryptionClientConfig(keyring)
s3ec = S3EncryptionClient(wrapped_client, config)

# Pass the pre-encoded bytes to put_object
s3ec.put_object(Bucket=bucket, Key=key, Body=encoded_data)

get_req = {"Bucket": bucket, "Key": key}
response = s3ec.get_object(**get_req)

# Read raw bytes and decode with the same encoding
output = response["Body"].read().decode("utf-8")

if output != data:
print("Uh oh! Input and output don't match!")
print("Input:")
print(repr(data))
print("Output:")
print(repr(output))
raise RuntimeError
print("Success! UTF-8 encoded string encrypted and decrypted correctly.")


def test_specific_encoding_latin1_roundtrip():
key = "latin1-encoding-rt"
key += datetime.now().strftime("%Y-%m-%d-%H:%M:%S")

# String with Latin-1 compatible characters
data = "Latin-1 encoding test: éèêë àâäãåá çñ ¿¡ øæå ØÆÅÉÈÊËÀÂÄÃÅÁ"

# Explicitly encode as Latin-1 before sending
encoded_data = data.encode("latin-1")

kms_client = boto3.client("kms", region_name=region)

keyring = KmsKeyring(kms_client, kms_key_id)

wrapped_client = boto3.client("s3")
config = S3EncryptionClientConfig(keyring)
s3ec = S3EncryptionClient(wrapped_client, config)

# Pass the pre-encoded bytes to put_object
s3ec.put_object(Bucket=bucket, Key=key, Body=encoded_data)

get_req = {"Bucket": bucket, "Key": key}
response = s3ec.get_object(**get_req)

# Read raw bytes and decode with the same encoding
output = response["Body"].read().decode("latin-1")

if output != data:
print("Uh oh! Input and output don't match!")
print("Input:")
print(repr(data))
print("Output:")
print(repr(output))
raise RuntimeError
print("Success! Latin-1 encoded string encrypted and decrypted correctly.")


def test_binary_data_roundtrip():
key = "binary-data-rt"
key += datetime.now().strftime("%Y-%m-%d-%H:%M:%S")

# Create some binary data (not valid in any particular encoding)
data = bytes([i for i in range(256)])

kms_client = boto3.client("kms", region_name=region)

keyring = KmsKeyring(kms_client, kms_key_id)

wrapped_client = boto3.client("s3")
config = S3EncryptionClientConfig(keyring)
s3ec = S3EncryptionClient(wrapped_client, config)

# Pass the binary data directly
s3ec.put_object(Bucket=bucket, Key=key, Body=data)

get_req = {"Bucket": bucket, "Key": key}
response = s3ec.get_object(**get_req)

# Read raw bytes without decoding
output = response["Body"].read()

if output != data:
print("Uh oh! Input and output don't match!")
print("Input:")
print(repr(data))
print("Output:")
print(repr(output))
raise RuntimeError
print("Success! Binary data encrypted and decrypted correctly.")


def test_invalid_body_types():
"""Test that put_object raises an exception when given invalid body types."""
key = "invalid-body-type"
key += datetime.now().strftime("%Y-%m-%d-%H:%M:%S")

kms_client = boto3.client("kms", region_name=region)
keyring = KmsKeyring(kms_client, kms_key_id)
wrapped_client = boto3.client("s3")
config = S3EncryptionClientConfig(keyring)
s3ec = S3EncryptionClient(wrapped_client, config)

# Test with integer
with pytest.raises(S3EncryptionClientError) as excinfo:
s3ec.put_object(Bucket=bucket, Key=key, Body=42)
assert "not an acceptable type" in str(excinfo.value)

# Test with float
with pytest.raises(S3EncryptionClientError) as excinfo:
s3ec.put_object(Bucket=bucket, Key=key, Body=3.14)
assert "not an acceptable type" in str(excinfo.value)

# Test with list
with pytest.raises(S3EncryptionClientError) as excinfo:
s3ec.put_object(Bucket=bucket, Key=key, Body=[1, 2, 3])
assert "not an acceptable type" in str(excinfo.value)

# Test with dictionary
with pytest.raises(S3EncryptionClientError) as excinfo:
s3ec.put_object(Bucket=bucket, Key=key, Body={"key": "value"})
assert "not an acceptable type" in str(excinfo.value)

# Test with boolean
with pytest.raises(S3EncryptionClientError) as excinfo:
s3ec.put_object(Bucket=bucket, Key=key, Body=True)
assert "not an acceptable type" in str(excinfo.value)

# Test with None (also raises an exception)
with pytest.raises(S3EncryptionClientError) as excinfo:
s3ec.put_object(Bucket=bucket, Key=key, Body=None)
assert "not an acceptable type" in str(excinfo.value)

print("Success! All invalid body types correctly raised exceptions.")