|
| 1 | +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. |
| 2 | +# SPDX-License-Identifier: Apache-2.0 |
| 3 | +"""Unit tests for streaming decryption behavior.""" |
| 4 | + |
| 5 | +import os |
| 6 | +from io import BytesIO |
| 7 | +from unittest.mock import Mock |
| 8 | + |
| 9 | +from cryptography.hazmat.primitives.ciphers.aead import AESGCM |
| 10 | + |
| 11 | +from s3_encryption.stream import BufferedDecryptingStream, DelayedAuthDecryptingStream |
| 12 | + |
| 13 | + |
| 14 | +def _encrypt(plaintext: bytes): |
| 15 | + """Encrypt plaintext with AES-GCM, return (ciphertext_with_tag, key, nonce).""" |
| 16 | + key = os.urandom(32) |
| 17 | + nonce = os.urandom(12) |
| 18 | + ciphertext_with_tag = AESGCM(key).encrypt(nonce, plaintext, None) |
| 19 | + return ciphertext_with_tag, key, nonce |
| 20 | + |
| 21 | + |
| 22 | +def _make_streaming_body(data: bytes): |
| 23 | + """Create a mock StreamingBody wrapping data.""" |
| 24 | + body = Mock() |
| 25 | + stream = BytesIO(data) |
| 26 | + body.read = stream.read |
| 27 | + body.close = Mock() |
| 28 | + body._stream = stream |
| 29 | + return body |
| 30 | + |
| 31 | + |
| 32 | +class TestDelayedAuthReleasesBeforeVerification: |
| 33 | + """Delayed auth releases plaintext before the GCM tag is verified.""" |
| 34 | + |
| 35 | + ##= specification/s3-encryption/client.md#enable-delayed-authentication |
| 36 | + ##= type=test |
| 37 | + ##% When enabled, the S3EC MAY release plaintext from a stream which has not been authenticated. |
| 38 | + def test_delayed_auth_releases_plaintext_before_tag_verification(self): |
| 39 | + plaintext = os.urandom(4096) |
| 40 | + ciphertext_with_tag, key, nonce = _encrypt(plaintext) |
| 41 | + body = _make_streaming_body(ciphertext_with_tag) |
| 42 | + |
| 43 | + stream = DelayedAuthDecryptingStream(body, key, nonce) |
| 44 | + # read(256) decrypts a partial chunk via cipher.update(), releasing |
| 45 | + # plaintext without consuming the full ciphertext stream. The GCM tag |
| 46 | + # at the end of the stream has not been reached yet. |
| 47 | + chunk = stream.read(256) |
| 48 | + |
| 49 | + # Plaintext was returned before the stream was fully consumed |
| 50 | + assert len(chunk) > 0 |
| 51 | + # _finalized is False: the GCM tag has NOT been verified yet |
| 52 | + assert not stream._finalized |
| 53 | + # Ciphertext remains unread in the underlying stream |
| 54 | + assert body._stream.tell() < len(ciphertext_with_tag) |
| 55 | + |
| 56 | + |
| 57 | +class TestBufferedWithholdsUntilVerification: |
| 58 | + """Buffered mode does not release plaintext until the GCM tag is verified.""" |
| 59 | + |
| 60 | + ##= specification/s3-encryption/client.md#enable-delayed-authentication |
| 61 | + ##= type=test |
| 62 | + ##% When disabled the S3EC MUST NOT release plaintext from a stream which has not been authenticated. |
| 63 | + def test_buffered_verifies_tag_before_releasing_any_plaintext(self): |
| 64 | + plaintext = os.urandom(4096) |
| 65 | + ciphertext_with_tag, key, nonce = _encrypt(plaintext) |
| 66 | + body = _make_streaming_body(ciphertext_with_tag) |
| 67 | + |
| 68 | + stream = BufferedDecryptingStream(body, key, nonce) |
| 69 | + # read(1) triggers _decrypt(), which calls self._body.read() with no amt, |
| 70 | + # consuming the entire ciphertext and verifying the GCM tag before |
| 71 | + # returning even 1 byte of plaintext. |
| 72 | + chunk = stream.read(1) |
| 73 | + |
| 74 | + assert chunk == plaintext[:1] |
| 75 | + # _plaintext being set confirms full decrypt+verify already happened |
| 76 | + assert stream._plaintext is not None |
0 commit comments