-
Notifications
You must be signed in to change notification settings - Fork 0
chore: refactor to make S3EC install itself as plugins #138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
439fd49
910ab04
f0f65a7
1537274
4753ca4
d2d43cb
c82d3eb
8d81487
3546929
21a68fc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,9 +3,9 @@ | |
| """Top-level S3 Encryption Client v3 for Python package.""" | ||
|
|
||
| import io | ||
| import threading | ||
|
|
||
| from attrs import define, field | ||
| from botocore import serialize | ||
| from botocore.response import StreamingBody | ||
|
|
||
| from .exceptions import S3EncryptionClientError | ||
|
|
@@ -16,7 +16,7 @@ | |
| from .materials.keyring import AbstractKeyring | ||
| from .pipelines import GetEncryptedObjectPipeline, PutEncryptedObjectPipeline | ||
|
|
||
| DEFAULT_ENCODING = "utf-8" | ||
| S3_METADATA_PREFIX = "x-amz-meta-" | ||
|
|
||
|
|
||
| @define | ||
|
|
@@ -31,31 +31,122 @@ def _default_cmm_for_keyring(self): | |
| return DefaultCryptoMaterialsManager(self.keyring) | ||
|
|
||
|
|
||
| class S3EncryptionClientPlugin: | ||
| """Plugin that adds encryption/decryption capabilities to a boto3 S3 client. | ||
|
|
||
| This plugin uses boto3's event system to intercept put_object and get_object | ||
| calls to provide transparent encryption and decryption of S3 objects. | ||
| """ | ||
|
|
||
| def __init__(self, config: S3EncryptionClientConfig): | ||
| """Initialize the plugin with encryption configuration. | ||
|
|
||
| Args: | ||
| config: S3EncryptionClientConfig containing keyring and CMM | ||
| """ | ||
| self.config = config | ||
| self._context = threading.local() | ||
|
|
||
| def on_put_object_before_call(self, params, **kwargs): | ||
| """Event handler for before-call.s3.PutObject. | ||
|
|
||
| This handler encrypts the body after serialization but before the request is sent. | ||
|
|
||
| Args: | ||
| params: Dictionary of parameters for the PutObject call (after serialization) | ||
| **kwargs: Additional event arguments | ||
| """ | ||
| # At this point, boto3 has already serialized the Body | ||
| # Extract the serialized body from the request | ||
| body = params.get("body") | ||
| if body is None: | ||
| body_bytes = b"" | ||
| elif isinstance(body, bytes): | ||
| body_bytes = body | ||
| elif hasattr(body, "read"): | ||
| # It's a file-like object (BytesIO, etc.) | ||
| # TODO(streaming): Add support for streaming encryption without reading entire body | ||
| # into memory | ||
| body_bytes = body.read() | ||
| else: | ||
| # Unexpected body type - should not happen as boto3 validates before this point | ||
| raise S3EncryptionClientError("Unexpected type of body parameter!") | ||
|
|
||
| encryption_context = getattr(self._context, "encryption_context", None) | ||
|
|
||
| pipeline = PutEncryptedObjectPipeline(self.config.cmm) | ||
| encrypted_data, encryption_metadata = pipeline.encrypt( | ||
| body_bytes, encryption_context=encryption_context | ||
| ) | ||
|
|
||
| params["body"] = encrypted_data | ||
|
|
||
| headers = params.get("headers", {}) | ||
|
|
||
| # Add encryption metadata to headers | ||
| if encryption_metadata: | ||
| for key, value in encryption_metadata.items(): | ||
| # Add as S3 metadata headers | ||
| header_key = f"{S3_METADATA_PREFIX}{key}" | ||
| headers[header_key] = value | ||
|
|
||
| params["headers"] = headers | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand enough of the business logic to know if this was intentional or not, but you may wind up overwriting existing headers here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... is the ask to check for header key collision?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not an ask - it's me not understanding. If you want to override it, it's great as is :) If you're concerned with users setting custom headers, you can always just update the above to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The headers set by the encryption client are reserved and should not be set by the customer. If the customer does this, those values must be overwritten. This is not explicitly specified, but this is how Java behaves. |
||
|
|
||
| def on_get_object_after_call(self, parsed, **kwargs): | ||
| """Event handler for after-call.s3.GetObject. | ||
|
|
||
| This handler decrypts the body after the response is received from S3. | ||
|
|
||
| Args: | ||
| parsed: Dictionary containing the parsed response | ||
| **kwargs: Additional event arguments (includes 'params' with request parameters) | ||
| """ | ||
| # Get encryption context from thread-local storage (set by get_object wrapper) | ||
| encryption_context = getattr(self._context, "encryption_context", None) | ||
|
|
||
| # The parsed response already has the Body as a StreamingBody | ||
| # We need to read it, decrypt it, and replace it | ||
|
|
||
| # Create a response dict that matches what the pipeline expects | ||
| response = { | ||
| "Body": parsed.get("Body"), | ||
| "Metadata": parsed.get("Metadata", {}), | ||
| } | ||
|
|
||
| # Create a pipeline and decrypt the data | ||
| pipeline = GetEncryptedObjectPipeline(self.config.cmm) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: To handle instruction files, we are going to need the S3EC client to be passed to the GetEncryptedObjectPipeline. Is there an S3 Client in scope here? Note: This does not have to be resolved for this PR, but I am asking @kessplas about this, as Kess has context on the plugin nature that I do not.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Short answer, no. Long answer, there can be. I think the easiest way to deal with this is to make the original wrapped client an instance variable of the plugin so that the plugin can call out to |
||
| decrypted_data = pipeline.decrypt(response, encryption_context) | ||
|
|
||
| # Replace body with decrypted data | ||
| stream = io.BytesIO(decrypted_data) | ||
| streaming_body = StreamingBody(stream, len(decrypted_data)) | ||
| parsed["Body"] = streaming_body | ||
|
|
||
|
|
||
| @define | ||
| class S3EncryptionClient: | ||
| """Client for encrypting and decrypting S3 objects. | ||
|
|
||
| This client wraps a boto3 S3 client and provides encryption and decryption | ||
| capabilities for S3 objects using the configured keyring and crypto materials manager. | ||
|
|
||
| The encryption/decryption is implemented using boto3's event system, registering | ||
| handlers for before-call and after-call events. | ||
| """ | ||
|
|
||
| wrapped_s3_client = field() | ||
| config: S3EncryptionClientConfig = field() | ||
| _plugin: S3EncryptionClientPlugin = field(init=False) | ||
|
|
||
| def __attrs_post_init__(self): | ||
| """Validate serialization encoding after initialization. | ||
| """Install the encryption plugin on the wrapped client using boto3 events.""" | ||
| # Create the plugin | ||
| object.__setattr__(self, "_plugin", S3EncryptionClientPlugin(self.config)) | ||
|
|
||
| Ensures boto3 serializers are using the expected default encoding. | ||
| """ | ||
| # Sanity check that boto3 serialization are ONLY using the default encoding (utf-8) | ||
| # This should always be the case, but changes in encoding would break the assumption that | ||
| # the decrypted plaintext adheres to the non-utf8 encoding scheme. So we avoid that. | ||
| for sz_name, sz in serialize.SERIALIZERS.items(): | ||
| if sz.DEFAULT_ENCODING != DEFAULT_ENCODING: | ||
| raise S3EncryptionClientError( | ||
| f"All Serializers MUST only support utf-8 encoding, but {sz_name} is using " | ||
| f"{sz.DEFAULT_ENCODING}!" | ||
| ) | ||
| # Register event handlers using boto3's event system | ||
| event_system = self.wrapped_s3_client.meta.events | ||
| event_system.register("before-call.s3.PutObject", self._plugin.on_put_object_before_call) | ||
| event_system.register("after-call.s3.GetObject", self._plugin.on_get_object_after_call) | ||
|
|
||
| def put_object(self, **kwargs): | ||
| """Encrypt and upload an object to S3. | ||
|
|
@@ -71,52 +162,28 @@ def put_object(self, **kwargs): | |
|
|
||
| Returns: | ||
| The response from the S3 client's put_object method. | ||
|
|
||
| Raises: | ||
| S3EncryptionClientError: Any problem with encryption, including if the Body parameter | ||
| has an invalid type. | ||
| """ | ||
| # Extract required parameters from kwargs | ||
| bucket = kwargs.pop("Bucket") | ||
| key = kwargs.pop("Key") | ||
| body = kwargs.pop("Body", b"") # Default to empty bytes when Body is not provided | ||
| # Extract EncryptionContext if provided (not a standard S3 parameter) | ||
| encryption_context = kwargs.pop("EncryptionContext", None) | ||
|
|
||
| # Create a pipeline for this operation | ||
| pipeline = PutEncryptedObjectPipeline(self.config.cmm) | ||
|
|
||
| # The documentation for boto3 asks for bytes or a file-like object, | ||
| # but in reality, it is possible to pass strings. | ||
| # Strings will be encoded using DEFAULT_ENCODING, | ||
| # which MUST match the default encoding defined int the Serializer class in botocore. | ||
| if isinstance(body, str): | ||
| data_bytes = body.encode(DEFAULT_ENCODING) | ||
| elif isinstance(body, bytes): | ||
| data_bytes = body | ||
| elif isinstance(body, io.IOBase): | ||
| # TODO: Streaming support | ||
| raise S3EncryptionClientError( | ||
| f"Body parameter of type {type(body)} is not an acceptable type! " | ||
| f"Streaming operations are not yet supported." | ||
| ) | ||
| else: | ||
| raise S3EncryptionClientError( | ||
| f"Body parameter of type {type(body)} is not an acceptable type! " | ||
| f"Use bytes or a file-like object." | ||
| ) | ||
|
|
||
| # Now encrypt the bytes/file-like IOBase object | ||
| encrypted_data, encryption_metadata = pipeline.encrypt( | ||
| data_bytes, encryption_context=encryption_context | ||
| ) | ||
|
|
||
| # Add encryption metadata to the request parameters | ||
| params = {"Bucket": bucket, "Key": key, "Body": encrypted_data, **kwargs} | ||
|
|
||
| # Add encryption metadata to the parameters | ||
| if encryption_metadata: | ||
| # Merge any existing metadata with our encryption metadata | ||
| metadata = params.get("Metadata", {}) | ||
| metadata.update(encryption_metadata) | ||
| params["Metadata"] = metadata | ||
|
|
||
| return self.wrapped_s3_client.put_object(**params) | ||
| # Store encryption context in thread-local storage for the event handler | ||
| self._plugin._context.encryption_context = encryption_context | ||
|
|
||
| try: | ||
| return self.wrapped_s3_client.put_object(**kwargs) | ||
| except S3EncryptionClientError: | ||
| # Re-raise our own exceptions without wrapping | ||
| raise | ||
| except Exception as e: | ||
| raise S3EncryptionClientError(f"Failed to encrypt object: {str(e)}") from e | ||
| finally: | ||
| # Clean up thread-local storage | ||
| if hasattr(self._plugin._context, "encryption_context"): | ||
| delattr(self._plugin._context, "encryption_context") | ||
|
|
||
| def get_object(self, **kwargs): | ||
| """Download and decrypt an object from S3. | ||
|
|
@@ -131,29 +198,25 @@ def get_object(self, **kwargs): | |
| Returns: | ||
| The response from the S3 client's get_object method with the Body | ||
| replaced with a StreamingBody containing the decrypted data. | ||
|
|
||
| Raises: | ||
| S3EncryptionClientError: If decryption fails or the object is not properly encrypted. | ||
| """ | ||
| # Extract encryption context if provided | ||
| # Extract EncryptionContext if provided (not a standard S3 parameter) | ||
| encryption_context = kwargs.pop("EncryptionContext", None) | ||
|
|
||
| # Create params for the S3 client | ||
| params = {**kwargs} | ||
|
|
||
| # Get the encrypted object from S3 | ||
| response = self.wrapped_s3_client.get_object(**params) | ||
|
|
||
| # Create a pipeline for this operation | ||
| pipeline = GetEncryptedObjectPipeline(self.config.cmm) | ||
|
|
||
| # Decrypt the data using the pipeline | ||
| decrypted_data = pipeline.decrypt( | ||
| response, encryption_context | ||
| ) # encrypted_data, encryption_metadata) | ||
|
|
||
| # Create a new streaming body with the decrypted data | ||
| stream = io.BytesIO(decrypted_data) | ||
| streaming_body = StreamingBody(stream, len(decrypted_data)) | ||
|
|
||
| # Update the response with the decrypted data | ||
| response["Body"] = streaming_body | ||
|
|
||
| return response | ||
| # Store encryption context in thread-local storage for the event handler | ||
| self._plugin._context.encryption_context = encryption_context | ||
|
|
||
| try: | ||
| return self.wrapped_s3_client.get_object(**kwargs) | ||
| except S3EncryptionClientError: | ||
| # Re-raise our own exceptions without wrapping | ||
| raise | ||
| except Exception as e: | ||
| # Wrap any unexpected errors during decryption | ||
| raise S3EncryptionClientError(f"Failed to decrypt object: {str(e)}") from e | ||
| finally: | ||
| # Clean up thread-local storage | ||
| if hasattr(self._plugin._context, "encryption_context"): | ||
| delattr(self._plugin._context, "encryption_context") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: any chance the request ID is in scope here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this answers you're question:
If you're asking if we have access to the
x-amz-request-idorx-amz-id-2headers, those come from the service as a part of the response. Since the request hasn't been sent yet at this point in code execution, so those values don't exist yet. You'd need to wait until we have sent the request and the service has responded.If you only need it for client side tracking, you can delay this to the
request-createdevent and take a look at theamz-sdk-invocation-idheader. If you're looking to add any sort of logic around the request id that's connected with the server's request id, you'll need to hook into after-call or later.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was asking to improve the error message.
I frequent customer ask is for request IDs to be in error messages,
but since the request ID is not in scope,
let it roll.