|
2 | 2 | from unittest.mock import patch |
3 | 3 |
|
4 | 4 | from test_utils import * |
5 | | -from uid2_client import BidStreamClient, EncryptionError |
| 5 | +from uid2_client import BidStreamClient, EncryptionError, Uid2Base64UrlCoder |
6 | 6 |
|
7 | 7 |
|
8 | 8 | @patch('uid2_client.bid_stream_client.refresh_bidstream_keys') |
@@ -89,27 +89,68 @@ def test_empty_keys(self, mock_refresh_bidstream_keys): # EmptyKeyContainer |
89 | 89 | self._client.decrypt_ad_token_into_raw_uid(token, None) |
90 | 90 | self.assertEqual('keys not initialized', str(context.exception)) |
91 | 91 |
|
92 | | - def test_master_key_expired(self, mock_refresh_keys_util): #ExpiredKeyContainer |
| 92 | + def test_master_key_expired(self, mock_refresh_bidstream_keys): #ExpiredKeyContainer |
93 | 93 | def get_post_refresh_keys_response_with_key_expired(): |
94 | 94 | master_key_expired = EncryptionKey(master_key_id, -1, created=now, activates=now - dt.timedelta(hours=2), expires=now - dt.timedelta(hours=1), secret=master_secret, |
95 | 95 | keyset_id=99999) |
96 | 96 | site_key_expired = EncryptionKey(site_key_id, site_id, created=now, activates=now - dt.timedelta(hours=2), expires=now - dt.timedelta(hours=1), secret=site_secret, |
97 | 97 | keyset_id=99999) |
98 | 98 | return create_default_key_collection([master_key_expired, site_key_expired]) |
99 | 99 |
|
100 | | - mock_refresh_keys_util.return_value = get_post_refresh_keys_response_with_key_expired() |
| 100 | + mock_refresh_bidstream_keys.return_value = get_post_refresh_keys_response_with_key_expired() |
101 | 101 | self._client.refresh_keys() |
102 | 102 |
|
103 | 103 | with self.assertRaises(EncryptionError) as context: |
104 | 104 | self._client.decrypt_ad_token_into_raw_uid(example_uid, None) |
105 | 105 |
|
106 | 106 | self.assertEqual('no keys available or all keys have expired; refresh the latest keys from UID2 service', str(context.exception)) |
107 | 107 |
|
| 108 | + def test_not_authorized_for_master_key(self, mock_refresh_bidstream_keys): #NotAuthorizedForMasterKey |
| 109 | + def get_post_refresh_keys_response_with_key_expired(): |
| 110 | + another_master_key = EncryptionKey(master_key_id + site_key_id + 1, -1, created=now, activates=now, expires=now + dt.timedelta(hours=1), secret=master_secret) |
| 111 | + another_site_key = EncryptionKey(master_key_id + site_key_id + 2, site_id, created=now, activates=now, expires=now + dt.timedelta(hours=1), secret=site_secret) |
| 112 | + return create_default_key_collection([another_master_key, another_site_key]) |
| 113 | + |
| 114 | + mock_refresh_bidstream_keys.return_value = get_post_refresh_keys_response_with_key_expired() |
| 115 | + self._client.refresh_keys() |
| 116 | + token = generate_uid_token(IdentityScope.UID2, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4) |
| 117 | + |
| 118 | + with self.assertRaises(EncryptionError) as context: |
| 119 | + self._client.decrypt_ad_token_into_raw_uid(token, None) |
| 120 | + |
| 121 | + self.assertEqual('not authorized for master key', str(context.exception)) |
| 122 | + |
| 123 | + def test_invalid_payload(self, mock_refresh_bidstream_keys): #InvalidPayload |
| 124 | + mock_refresh_bidstream_keys.return_value = create_default_key_collection([master_key, site_key]) |
| 125 | + self._client.refresh_keys() |
| 126 | + token = generate_uid_token(IdentityScope.UID2, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4) |
| 127 | + payload = Uid2Base64UrlCoder.decode(token) |
| 128 | + bad_token = base64.urlsafe_b64encode(payload[:0]) |
| 129 | + |
| 130 | + with self.assertRaises(EncryptionError) as context: |
| 131 | + self._client.decrypt_ad_token_into_raw_uid(bad_token, None) |
| 132 | + self.assertEqual('invalid payload', str(context.exception)) |
| 133 | + |
| 134 | + def test_token_expiry_custom_decryption_time(self, mock_refresh_bidstream_keys): |
| 135 | + mock_refresh_bidstream_keys.return_value = create_default_key_collection([master_key, site_key]) |
| 136 | + self._client.refresh_keys() |
| 137 | + |
| 138 | + expires_at = now - dt.timedelta(days=60) |
| 139 | + created_at = expires_at - dt.timedelta(minutes=1) |
| 140 | + token = generate_uid_token(IdentityScope.UID2, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4, |
| 141 | + created_at=created_at, expires_at=expires_at) |
| 142 | + with self.assertRaises(EncryptionError) as context: |
| 143 | + self._client.decrypt_ad_token_into_raw_uid(token, None, expires_at + dt.timedelta(seconds=1)) |
| 144 | + self.assertEqual('token expired', str(context.exception)) |
| 145 | + |
| 146 | + result = self._client.decrypt_ad_token_into_raw_uid(token, None, expires_at - dt.timedelta(seconds=1)) |
| 147 | + self.assertIsNotNone(result) |
| 148 | + self.assertEqual(result.identity_scope, IdentityScope.UID2) |
| 149 | + self.assertEqual(result.advertising_token_version, AdvertisingTokenVersion.ADVERTISING_TOKEN_V4) |
| 150 | + |
108 | 151 | def test_refresh_keys(self, mock_refresh_bidstream_keys): |
109 | | - key_collection = create_default_key_collection([master_key]) |
110 | | - mock_refresh_bidstream_keys.return_value = key_collection |
111 | | - client = BidStreamClient(self._CONST_BASE_URL, self._CONST_API_KEY, client_secret) |
112 | | - client.refresh_keys() |
| 152 | + mock_refresh_bidstream_keys.return_value = create_default_key_collection([master_key]) |
| 153 | + self._client.refresh_keys() |
113 | 154 | mock_refresh_bidstream_keys.assert_called_once_with(self._CONST_BASE_URL, self._CONST_API_KEY, |
114 | 155 | client_secret_bytes) |
115 | 156 |
|
|
0 commit comments