feat(decryption): streaming decryption with cipher-agnostic BufferedDecryptingStream and DelayedAuthDecryptingStream#150
Conversation
Introduce BufferedDecryptingStream that wraps the S3 StreamingBody and decrypts lazily on first read. No plaintext is released until the entire ciphertext is read and the GCM auth tag is verified, matching the Java S3EC's BufferedCipherSubscriber behavior. - Add stream.py with BufferedDecryptingStream (read, iter_chunks, close) - Pipeline returns BufferedDecryptingStream instead of decrypted bytes - Event handler passes stream directly as parsed["Body"]
…ations Add enable_delayed_authentication field to S3EncryptionClientConfig, defaulting to False. Includes duvet specification citations from client.md#enable-delayed-authentication.
…eaming Add DelayedAuthDecryptingStream that releases plaintext incrementally via AES-GCM cipher.update() before tag verification. The GCM tag (last 16 bytes) is held back and verified on stream exhaustion via finalize_with_tag(). Matches Java S3EC CipherSubscriber pattern.
… handler - Pipeline.decrypt() accepts enable_delayed_authentication param and returns DelayedAuthDecryptingStream when True, BufferedDecryptingStream when False. Raises error if param is None (must be explicitly set). - Event handler passes config flag to pipeline. - Remove duplicated defaults from pipeline params — config is single source of truth. - Update unit tests to pass instruction_suffix explicitly.
…delayed auth Replace individual delayed-auth tests with pytest.mark.parametrize covering both buffered and delayed-auth modes across ascii, empty, unicode, utf-8, latin-1, binary data, and no-body cases.
0809734 to
257fda6
Compare
…n citations Add unit tests that verify the behavioral contract of both stream modes: - DelayedAuthDecryptingStream releases plaintext before GCM tag verification - BufferedDecryptingStream withholds all plaintext until tag is verified Includes duvet type=test citations for enable-delayed-authentication spec.
257fda6 to
622b5a1
Compare
…nd delayed-auth modes
…layed-auth streaming - Add 50 MB V2 delayed-auth streaming decryption test against static object - Add 50 MB V3 test (skipped, V3 not yet implemented) - Add 61 GiB V2/V3 placeholder tests marked @pytest.mark.large (skipped, static objects not yet created) - Parametrize existing instruction file tests with buffered/delayed-auth modes
…h plaintext - Fix pipeline decrypt() docstring to reflect both return types - Add assertion that full delayed-auth stream output matches expected plaintext
- Remove 61 GiB V2/V3 placeholder tests (static objects not yet created) - Remove large pytest mark registration (no longer used)
BufferedDecryptingStream and DelayedAuthDecryptingStream now take a decryptor object and tag_length instead of raw key/nonce. This makes them reusable across algorithm suites (GCM, key-committing GCM, CBC).
… algorithm suite dispatch - Merge staging's key commitment, commitment policy, algorithm suite config, CBC decryption, and V3 decryption into the streaming branch - Algorithm suite dispatch now returns streaming decryptors for all paths (GCM, key-committing GCM, CBC) instead of eager in-memory decryption - Add unpadder support to streams for CBC PKCS7 padding removal - Update all tests to pass enable_delayed_authentication and use .read() on stream results
- Add cipher_tag_length_bytes and cipher_block_size_bytes properties to AlgorithmSuite - Replace hardcoded GCM_TAG_LENGTH and PKCS7(128) with algorithm suite properties - Remove dead code: _decrypt_cbc_content() - Make _make_decrypting_stream and _decrypt_kc_gcm_content static methods - Remove GCM_TAG_LENGTH constant from stream.py - Make tag_length required (no default) on DelayedAuthDecryptingStream
kessplas
left a comment
There was a problem hiding this comment.
Blocking changes for the dead method/ghost Duvet annotations.
- Make instruction_suffix and enable_delayed_authentication positional args - Move duvet annotation to BufferedDecryptingStream return - Hardcode CBC to always stream (no auth tag, matches Java behavior) - Move duvet annotations from _decrypt_kc_gcm_content to _decrypt_kc_gcm_streaming - Remove unused _decrypt_kc_gcm_content method - Fix DelayedAuthDecryptingStream CBC unpadding (peek + incremental unpadder) - Add CBC unit tests for both stream types (roundtrip, chunked, finalize, padding) - Add delayed authentication mode integration test with duvet citation
Split DelayedAuthDecryptingStream into DelayedAuthCBCDecryptingStream and DelayedAuthGCMDecryptingStream. CBC and GCM are mutually exclusive paths — CBC uses an unpadder with no auth tag, GCM uses a rolling tag buffer with no padding — so the single-class design carried impossible field combinations and conditional branching in read(). All three stream classes (BufferedDecryptingStream and the two new delayed-auth classes) now extend botocore's StreamingBody with @define(slots=False), inheriting iter_chunks, iter_lines, __iter__, and __next__ for free. Updated _make_decrypting_stream dispatch in pipelines.py and test constructors in test_stream.py.
…eam and simplify dispatch
# Delayed-Auth Streams: Empty Read Behavior ## Problem DelayedAuthGCMDecryptingStream.read(amt) can return b"" mid-stream before the stream is exhausted. This happens when the read size is small relative to the GCM tag length (16 bytes) — the stream can't distinguish ciphertext from the trailing auth tag until it has accumulated more than tag_length bytes. Example with 20 bytes of ciphertext+tag, read(7): 1. read(7) → 7 bytes buffered, <= 16 → returns b"" 2. read(7) → 14 bytes buffered, <= 16 → returns b"" 3. read(7) → 20 bytes total, splits ciphertext/tag → returns plaintext In Python, read() returning b"" conventionally signals EOF. This breaks common patterns like: while chunk := stream.read(7) DelayedAuthCBCDecryptingStream does not have this issue — CBC cipher.update() always produces output when given input. ## Java Behavior Java's CipherSubscriber (the delayed-auth equivalent) does the same thing. When cipher.update() returns null/empty, it explicitly sends an empty ByteBuffer downstream. This is fine in Java's reactive streams model where empty emissions are normal signaling. In Python's read() API, it's surprising. ## Options Considered 1. Keep as-is, document it — match Java semantics. 2. Loop internally in read() — more Pythonic, but violates io.py Reader.read contract: "If size is specified, at most size items will be read." 3. Require minimum read size (chosen) — raise if amt < tag_length + 1. ## ESDK-Python Comparison ESDK-Python's StreamDecryptor never has this problem because it decrypts at the frame level. Each frame has its own IV and tag, so authentication is per-frame. S3EC operates on a single non-framed GCM ciphertext where the tag is simply appended — the stream must separate tag from ciphertext on the fly.
kessplas
left a comment
There was a problem hiding this comment.
Per offline discussion,
we can simplify the stream implementation quite a bit,
because we know the length of the object from the GetObject response.
Then we don't need to do the lookahead stuff.
- Refactor DelayedAuthGCMDecryptingStream to use content_length instead of rolling tag buffer and peek-ahead - Add ContentLength validation in on_get_object_after_call - Pass content_length through pipeline to all stream constructors - Rename stream classes: BufferedDecryptingGCMStream → GCMBufferedDecryptingStream, DelayedAuthCBCDecryptingStream → CBCDecryptingStream, DelayedAuthGCMDecryptingStream → GCMDelayedAuthDecryptingStream - Track _amount_read for progress in all three streams - Remove minimum read size restriction
- GCMBufferedDecryptingStream.__enter__ returns self for consistent context manager behavior across all stream classes - GCMDelayedAuthDecryptingStream raises on content_length < tag_length - Clarify content_length comment as ciphertext content length
…ered-decryption-aes-gcm
There was a problem hiding this comment.
We shouldn't need to do this when we know the content length.
| self._decrypt() | ||
| return self._raw_stream.tell() | ||
|
|
||
| def __enter__(self): # noqa: D105 |
There was a problem hiding this comment.
Since this relates to using the stream in a with block we should test that too.
Ideally, both a unit test that can assert close is called upon exit,
and an integration test that shows this pattern is functional.
| def __attrs_post_init__(self): # noqa: D105 | ||
| # By passing in content_length, and updating _amount_read in read(), | ||
| # we support the super's normal progression. | ||
| # However, we do not support the super's _verify_content_length. |
There was a problem hiding this comment.
Why do we not support calling _verify_content_length?
This is a problem as we can't just assume this won't get called.
We need to implement this correctly.
| _tag_length: int = field() | ||
| # _content_length intentionally collides with super's _content_length | ||
| _content_length: int = field() | ||
| _ciphertext_remaining: int = field(init=False) |
There was a problem hiding this comment.
We need to be clear if this means ciphertext as in "the ciphertext object" (meaning ciphertext + tag) or ciphertext as in "the ciphertext" (meaning just ciphertext and no tag) since it is ambiguous and critically important. Ideally change the variable name or at least leave a comment.
Personally especially in terms of lengths I interpret "ciphertext" as ciphertext + tag, and "plaintext" as the original content length (which is also the length of the ciphertext without the tag).
This is particularly relevant for eg implementing _verify_content_length and whether or not we overwrite the content length of the plaintext stream returned from GetObject.
Let's discuss further offline.
| return b"" | ||
| self._finalized = True | ||
| try: | ||
| tag = self._body.read(self._tag_length) |
There was a problem hiding this comment.
Related to the theme of ciphertext vs plaintext length, we need to update self._amount_read here as well since that is tracking ciphertext + tag length.
- Add decryptor.py with AesCbcDecryptor and AesGcmDecryptor - AesGcmDecryptor uses rolling tail buffer to hold back GCM tag bytes - Rename streams to BufferedDecryptingStream and DecryptingStream - Remove CBCDecryptingStream (CBC now uses DecryptingStream) - Streams are cipher-agnostic, delegating all crypto to Decryptor - DecryptingStream loops on empty update() results to avoid returning spurious empty bytes mid-stream - Override all StreamingBody methods explicitly on both stream classes
… iterator tests - Add buffered_decrypt.py with one_shot_decrypt() returning plain StreamingBody - Remove BufferedDecryptingStream class from stream.py - Add docstrings to DecryptingStream iterator methods - Wire _verify_content_length into _finalize to catch truncated HTTP responses - Add unit tests for iter_chunks, iter_lines, __iter__, __next__, readinto, readlines
- raise S3EncryptionClientSecurityError for CBC/GCM decryption failures - remove magic number tolerance in _verify_content_length - fix type annotations (object -> StreamingBody) - fix __iter__/__next__ inconsistency in DecryptingStream - Remove unnecessary __del__ and set_socket_timeout overrides - Update test assertions to expect SecurityError
Changes
Streaming decryptors accept cipher objects
BufferedDecryptingStreamandDelayedAuthDecryptingStreamnow take a decryptor object andtag_lengthinstead of raw key/nonceunpadderparameter for CBC PKCS7 padding removalAlgorithm suite dispatch returns streams
GetEncryptedObjectPipeline.decrypt()returns a streaming decryptor for all algorithm suites instead of eagerly decrypting in memoryMerged with staging
enable_delayed_authenticationand call.read()on stream resultsUse AlgorithmSuite properties for tag length and block size
cipher_tag_length_bytesandcipher_block_size_bytesproperties toAlgorithmSuiteGCM_TAG_LENGTHconstant andPKCS7(128)with algorithm suite propertiestag_lengthis now required (no default) onDelayedAuthDecryptingStream_decrypt_cbc_content()_make_decrypting_streamand_decrypt_kc_gcm_contentstatic methodsUpdate: address kessplas review comments (a6fb136)
instruction_suffixandenable_delayed_authenticationpositional (required) args ondecrypt()enable_delayed_authenticationNone-check toBufferedDecryptingStreamreturnenable_delayed_authentication=True) — no auth tag, matches Java behavior_decrypt_kc_gcm_contentto_decrypt_kc_gcm_streaming_decrypt_kc_gcm_contentmethodDelayedAuthDecryptingStreamCBC unpadding: peek to detect stream exhaustion + incremental unpadderUpdate: stream class refactor + comprehensive tests + minimum read size
Split stream classes by cipher (8d4dbdc, 4991e1b)
DelayedAuthDecryptingStreamintoDelayedAuthCBCDecryptingStreamandDelayedAuthGCMDecryptingStreamBufferedDecryptingStream→BufferedDecryptingGCMStream(GCM does not need unpadder)DelayedAuthCBCDecryptingStream(no longer routed through GCM factory)_make_decrypting_stream→_make_decrypting_gcm_streamEnforce minimum read size on DelayedAuthGCMDecryptingStream (dad00c2)
read(amt)raisesS3EncryptionClientErrorif0 < amt < tag_length + 1ByteBufferdownstream) but adapted for Python'sread()contractstreaming.mdComprehensive unit tests (07dbee4, 4a35e88)
TestDelayedAuthCBCDecryption: read-after-finalized, readable, close, context manager, wrong key, empty ciphertextTestBufferedDecryptingGCMStream: full/partial reads, tell, readable, readinto, close, wrong key, tampered ciphertext, idempotent decryptTestDelayedAuthGCMDecryption: full/chunked reads, finalized, readable, close, wrong key, tampered tag, small/large dataTestEdgeCasePlaintextLengths: parameterized across [0, 1, 8, 15, 16, 17, 31, 32, 33, 47, 48, 49] for all stream typestest_read_too_small_raises_errorfor minimum read size guardStreaming integration tests (07dbee4)
test_i_s3_encryption_streaming.pywith 17 testsTest results