feat: add low-level Multipart Upload and upload_file(obj)#172
Conversation
texastony
commented
Apr 3, 2026
- feat: add encrypted multipart upload support
- test: add transfer manager integration tests, deduplicate upload_file tests
Add create_multipart_upload, upload_part, complete_multipart_upload, and abort_multipart_upload to S3EncryptionClient, plus upload_file and upload_fileobj convenience methods. A single AES-GCM cipher instance is shared across all parts, with parts encrypted serially as required by the spec. The GCM auth tag is appended to the last part's ciphertext on complete. Encryption metadata is set during create_multipart_upload. Supports both ALG_AES_256_GCM_IV12_TAG16_NO_KDF (V2) and ALG_AES_256_GCM_HKDF_SHA512_COMMIT_KEY (V3) algorithm suites. New files: - test/integration/test_i_s3_encryption_multipart.py (28 tests) Modified files: - src/s3_encryption/pipelines.py (MultipartUploadPipeline) - src/s3_encryption/__init__.py (S3EncryptionClient methods)
… tests Add test_i_s3_encryption_transfer_manager.py with 20 tests covering upload_file and upload_fileobj: threshold boundary, custom threshold, custom chunksize, encryption context, user metadata, and delayed-auth decryption. Remove duplicate upload_file/upload_fileobj tests from test_i_s3_encryption_multipart.py (now covered by the new file).
Duvet requires each ##% annotation to match a single contiguous line in the specification. Split the combined UploadPart citations into separate annotations across __init__.py, pipelines.py, and the multipart integration test.
|
From IRL conversation with @kessplas , remove buffer-and-push-last-upload behavior in favor for a last-upload flag. |
Replace the one-part buffering strategy with direct encryption and upload, following the S3EC-Java pattern (SdkPartType.LAST). Pipeline changes: - Remove _pending_part field and finalize() method - encrypt_part() now accepts is_last param and returns ciphertext directly; when is_last=True, finalizes cipher and appends GCM tag - Add has_final_part_been_seen property Client changes: - upload_part() accepts IsLastPart kwarg, encrypts and uploads to S3 immediately, returns real S3 response with ETag - complete_multipart_upload() validates final part was seen, passes caller's Parts list (with real ETags) directly to S3; no more list_parts call - _multipart_upload_from_readable() uses read-ahead to detect last chunk, passes IsLastPart=True, collects real ETags
…python into tonyknap/investigate-transfer-manager
| ##= specification/s3-encryption/client.md#optional-api-operations | ||
| ##= type=implementation | ||
| ##% UploadPart MUST encrypt each part. | ||
| ##= specification/s3-encryption/client.md#optional-api-operations | ||
| ##= type=implementation | ||
| ##% Each part MUST be encrypted in sequence. | ||
| ##= specification/s3-encryption/client.md#optional-api-operations | ||
| ##= type=implementation | ||
| ##% Each part MUST be encrypted using the same cipher instance for each part. |
There was a problem hiding this comment.
This will go in the docstring for the class right? That doesn't seem ideal
| ##= specification/s3-encryption/client.md#optional-api-operations | ||
| ##= type=implementation | ||
| ##% CreateMultipartUpload MAY be implemented by the S3EC. | ||
| ##% If implemented, CreateMultipartUpload MUST initiate a multipart upload. |
There was a problem hiding this comment.
nit, I've spent a lot of time on annotations with Rust/LLMs, so I think this could be better placed to line ~611 (ditto for other annotations placed right above the method definition) but don't really care too much
There was a problem hiding this comment.
Pull request overview
Adds encrypted multipart upload support to the S3 Encryption Client, including low-level multipart operations and high-level upload_file / upload_fileobj, plus unit/integration coverage to validate encrypted round-trips and concurrency behavior.
Changes:
- Implement
MultipartUploadPipelineto encrypt multipart parts with a single streaming AES-GCM context and emit object encryption metadata. - Add
create_multipart_upload,upload_part,complete_multipart_upload,abort_multipart_upload,upload_file, andupload_fileobjAPIs toS3EncryptionClient. - Add unit and integration tests for multipart behavior, transfer-manager-like thresholds, and concurrent multipart uploads.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
src/s3_encryption/pipelines.py |
Adds streaming multipart encryption pipeline and per-part encryption logic. |
src/s3_encryption/__init__.py |
Adds multipart API surface + upload_file helpers and in-memory multipart state tracking. |
test/test_stream.py |
Makes CBC “wrong key” test deterministic by retrying against PKCS7 false-positives. |
test/test_multipart.py |
Adds unit tests for multipart pipeline behavior and client multipart wrappers. |
test/integration/test_i_s3_encryption_transfer_manager.py |
Adds integration tests covering upload thresholds/chunksize and round-trip decryption. |
test/integration/test_i_s3_encryption_multipart.py |
Adds full integration coverage for low-level multipart API + metadata/context/delayed-auth cases. |
test/integration/test_i_s3_encryption_multithreaded.py |
Adds integration test for concurrent multipart uploads on a shared client. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ) | ||
|
|
||
| # Merge encryption metadata into user-provided Metadata | ||
| user_metadata = kwargs.get("Metadata", {}) |
| upload_id = kwargs.get("UploadId") | ||
| with self._multipart_lock: | ||
| pipeline = self._multipart_uploads.get(upload_id) | ||
| if pipeline is None: | ||
| raise S3EncryptionClientError( | ||
| f"No multipart upload found for UploadId: {upload_id}. " | ||
| "Call create_multipart_upload first." | ||
| ) | ||
|
|
||
| part_number = kwargs["PartNumber"] | ||
| is_last = kwargs.pop("IsLastPart", False) | ||
| body = kwargs.get("Body", b"") | ||
| if isinstance(body, str): | ||
| body = body.encode("utf-8") | ||
| elif hasattr(body, "read"): | ||
| body = body.read() | ||
|
|
||
| try: | ||
| ciphertext = pipeline.encrypt_part(part_number, body, is_last=is_last) | ||
| except S3EncryptionClientError: | ||
| raise | ||
| except Exception as e: | ||
| raise S3EncryptionClientError(f"Failed to encrypt part {part_number}: {e}") from e | ||
|
|
||
| return self.wrapped_s3_client.upload_part( | ||
| Bucket=kwargs["Bucket"], | ||
| Key=kwargs["Key"], | ||
| UploadId=upload_id, | ||
| PartNumber=part_number, | ||
| Body=ciphertext, | ||
| ) |
| return self.wrapped_s3_client.upload_part( | ||
| Bucket=kwargs["Bucket"], | ||
| Key=kwargs["Key"], | ||
| UploadId=upload_id, | ||
| PartNumber=part_number, | ||
| Body=ciphertext, | ||
| ) | ||
|
|
| return self.wrapped_s3_client.complete_multipart_upload(**kwargs) | ||
| except S3EncryptionClientError: | ||
| raise | ||
| except Exception as e: | ||
| raise S3EncryptionClientError(f"Failed to complete multipart upload: {e}") from e | ||
| finally: | ||
| with self._multipart_lock: | ||
| self._multipart_uploads.pop(upload_id, None) | ||
|
|
| result = b"" | ||
| while chunk := response["Body"].read(65536): | ||
| result += chunk | ||
| assert result == data |
| result = b"" | ||
| while chunk := response["Body"].read(65536): | ||
| result += chunk | ||
| assert result == data |
| def upload_fileobj(self, fileobj, bucket, key, multipart_chunksize=None, **kwargs): | ||
| """Encrypt and upload a file-like object to S3 via multipart upload. | ||
|
|
||
| Args: | ||
| fileobj: A file-like object with a read() method. | ||
| bucket: Target S3 bucket. | ||
| key: Target S3 object key. | ||
| multipart_chunksize: Size of each part (default 8MB). | ||
| **kwargs: Additional arguments (e.g. EncryptionContext, Metadata). | ||
| """ | ||
| chunksize = multipart_chunksize or _DEFAULT_MULTIPART_CHUNKSIZE | ||
| return self._multipart_upload_from_readable(fileobj, bucket, key, chunksize, **kwargs) |
| ##% Each part MUST be encrypted using the same cipher instance for each part. | ||
| ##= specification/s3-encryption/client.md#optional-api-operations | ||
| ##= type=test | ||
| ##% CompleteMultipartUpload MAY be implemented by the S3EC. |
| Uses a barrier to ensure upload_part calls for different objects are | ||
| interleaved, exercising the per-upload cipher isolation under contention. | ||
| """ | ||
| import threading |
There was a problem hiding this comment.
In-method imports are weird and should probably fail the linter, but moving to the top of file is fine
| ##% If implemented, CreateMultipartUpload MUST initiate a multipart upload. | ||
| ##= specification/s3-encryption/client.md#optional-api-operations | ||
| ##= type=test | ||
| ##% UploadPart MUST encrypt each part. |
There was a problem hiding this comment.
Some of these annotations are suspect -- for example I am pretty sure I could sub in a plaintext S3 client to this test and it would pass, so this test doesn't implicitly validate that each part is encrypted
| ##= specification/s3-encryption/client.md#optional-api-operations | ||
| ##= type=implementation | ||
| ##% If implemented, CreateMultipartUpload MUST initiate a multipart upload. |
There was a problem hiding this comment.
| ##= specification/s3-encryption/client.md#optional-api-operations | |
| ##= type=implementation | |
| ##% If implemented, CreateMultipartUpload MUST initiate a multipart upload. | |
| ##= specification/s3-encryption/client.md#optional-api-operations | |
| ##% If implemented, CreateMultipartUpload MUST initiate a multipart upload. |
nit, I prefer less content if possible. Implementation is the default annotation type so specifying implementation isn't necessary