11import unittest
22from unittest .mock import patch
33
4- from uid2_client import SharingClient , ClientType
4+ from tests .uid2_token_generator import UID2TokenGenerator
5+ from uid2_client import SharingClient , ClientType , EncryptionError , encryption , AdvertisingTokenVersion
56from test_utils import *
67
78
9+ def _create_key_collection (identity_scope ):
10+ key_set = [master_key , site_key ]
11+ return EncryptionKeysCollection (key_set , identity_scope , site_id , 1 ,
12+ 99999 , 86400 )
13+
14+
15+ def _generate_uid_token (identity_scope , version , created_at = None , expires_at = None ):
16+ return UID2TokenGenerator .generate_uid_token (example_id , master_key , site_id , site_key ,
17+ identity_scope , version , created_at , expires_at )
18+
19+
20+ @patch ('uid2_client.sharing_client.refresh_sharing_keys' )
821class TestSharingClient (unittest .TestCase ):
922 _CONST_BASE_URL = 'base_url'
1023 _CONST_API_KEY = 'api_key'
1124
12- @patch ('uid2_client.sharing_client.refresh_sharing_keys' )
13- @patch ('uid2_client.sharing_client.encrypt' )
14- def test_encrypt_raw_uid_into_sharing_token (self , mock_encrypt , mock_refresh_sharing_keys ):
15- key_collection = create_default_key_collection ([master_key ])
25+ def setUp (self ):
26+ self ._key_collection = _create_key_collection (IdentityScope .UID2 )
27+ self ._test_cases = [
28+ [IdentityScope .UID2 , AdvertisingTokenVersion .ADVERTISING_TOKEN_V2 ],
29+ [IdentityScope .UID2 , AdvertisingTokenVersion .ADVERTISING_TOKEN_V3 ],
30+ [IdentityScope .UID2 , AdvertisingTokenVersion .ADVERTISING_TOKEN_V4 ],
31+ [IdentityScope .EUID , AdvertisingTokenVersion .ADVERTISING_TOKEN_V2 ],
32+ [IdentityScope .EUID , AdvertisingTokenVersion .ADVERTISING_TOKEN_V3 ],
33+ [IdentityScope .EUID , AdvertisingTokenVersion .ADVERTISING_TOKEN_V4 ]
34+ ]
35+
36+ def test_encrypt_decrypt (self , mock_refresh_sharing_keys ): #CanEncryptAndDecryptForSharing
37+ key_collection = self ._key_collection
1638 mock_refresh_sharing_keys .return_value = key_collection
17- mock_encrypt .return_value = 'encrypted_token'
1839 client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
1940 client .refresh_keys ()
20- encrypted_token = client .encrypt_raw_uid_into_sharing_token (example_uid , key_collection .get_default_keyset_id ())
21- self .assertEqual (encrypted_token , 'encrypted_token' )
41+ sharing_token = client .encrypt_raw_uid_into_sharing_token (example_uid , key_collection .get_default_keyset_id ())
42+ self .assertIsNotNone (sharing_token )
43+ self .assertIsInstance (sharing_token , str )
44+ result = client .decrypt_sharing_token_into_raw_uid (sharing_token )
45+ self .assertEqual (example_uid , result .uid2 )
2246 mock_refresh_sharing_keys .assert_called_once ()
23- mock_encrypt .assert_called_once_with (example_uid , None , key_collection , key_collection .get_default_keyset_id ())
2447
25- @patch ('uid2_client.sharing_client.refresh_sharing_keys' )
26- @patch ('uid2_client.sharing_client.decrypt_token' )
27- def test_decrypt_sharing_token_into_raw_uid (self , mock_decrypt_token , mock_refresh_sharing_keys ):
28- key_collection = create_default_key_collection ([master_key ])
29- mock_refresh_sharing_keys .return_value = key_collection
30- mock_decrypt_token .return_value = example_uid
48+ def test_can_decrypt_another_clients_encrypted_token (self , mock_refresh_keys_util ): #CanDecryptAnotherClientsEncryptedToken
49+ mock_refresh_keys_util .return_value = self ._key_collection
50+ sending_client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
51+ sending_client .refresh_keys ()
52+
53+ ad_token = sending_client .encrypt_raw_uid_into_sharing_token (example_uid )
54+
55+ receiving_client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
56+ receiving_client .refresh_keys ()
57+
58+ result = receiving_client .decrypt_sharing_token_into_raw_uid (ad_token )
59+ self .assertEqual (example_uid , result .uid2 )
60+
61+ def test_sharing_token_is_v4 (self , mock_refresh_keys_util ):
62+ mock_refresh_keys_util .return_value = self ._key_collection
63+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
64+ client .refresh_keys ()
65+
66+ ad_token = client .encrypt_raw_uid_into_sharing_token (example_uid )
67+ contains_base_64_special_chars = "+" in ad_token or "/" in ad_token or "=" in ad_token
68+ self .assertFalse (contains_base_64_special_chars )
69+
70+ def test_sharing_client_produces_uid2_token (self , mock_refresh_keys_util ): #ClientProducesTokenWithCorrectPrefix
71+ mock_refresh_keys_util .return_value = self ._key_collection
72+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
73+ client .refresh_keys ()
74+
75+ ad_token = client .encrypt_raw_uid_into_sharing_token (example_uid )
76+ self .assertEqual ("A" , ad_token [0 ])
77+
78+ def test_sharing_client_produces_euid_token (self , mock_refresh_keys_util ): #ClientProducesTokenWithCorrectPrefix
79+ mock_refresh_keys_util .return_value = _create_key_collection (IdentityScope .EUID )
80+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
81+ client .refresh_keys ()
82+
83+ ad_token = client .encrypt_raw_uid_into_sharing_token (example_uid )
84+ self .assertEqual ("E" , ad_token [0 ])
85+
86+ def test_raw_uid_produces_correct_identity_type_in_token (self , mock_refresh_keys_util ): #RawUidProducesCorrectIdentityTypeInToken
87+ mock_refresh_keys_util .return_value = self ._key_collection
88+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
89+ client .refresh_keys ()
90+
91+ self .assertEqual (IdentityType .Email , get_identity_type (client .encrypt_raw_uid_into_sharing_token (
92+ "Q4bGug8t1xjsutKLCNjnb5fTlXSvIQukmahYDJeLBtk=" )))
93+ self .assertEqual (IdentityType .Phone , get_identity_type (client .encrypt_raw_uid_into_sharing_token (
94+ "BEOGxroPLdcY7LrSiwjY52+X05V0ryELpJmoWAyXiwbZ" )))
95+ self .assertEqual (IdentityType .Email , get_identity_type (client .encrypt_raw_uid_into_sharing_token (
96+ "oKg0ZY9ieD/CGMEjAA0kcq+8aUbLMBG0MgCT3kWUnJs=" )))
97+ self .assertEqual (IdentityType .Email , get_identity_type (client .encrypt_raw_uid_into_sharing_token (
98+ "AKCoNGWPYng/whjBIwANJHKvvGlGyzARtDIAk95FlJyb" )))
99+ self .assertEqual (IdentityType .Email , get_identity_type (client .encrypt_raw_uid_into_sharing_token (
100+ "EKCoNGWPYng/whjBIwANJHKvvGlGyzARtDIAk95FlJyb" )))
101+
102+ def test_multiple_keys_per_keyset (self , mock_refresh_keys_util ): # MultipleKeysPerKeyset
103+ def get_post_refresh_keys_response_with_multiple_keys ():
104+ return create_default_key_collection ([master_key , site_key , master_key2 , site_key2 ])
105+
106+ mock_refresh_keys_util .return_value = get_post_refresh_keys_response_with_multiple_keys ()
107+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
108+ client .refresh_keys ()
109+
110+ sharing_token = client .encrypt_raw_uid_into_sharing_token (example_uid )
111+
112+ result = client .decrypt_sharing_token_into_raw_uid (sharing_token )
113+ self .assertEqual (example_uid , result .uid2 )
114+
115+ def test_cannot_encrypt_if_no_key_from_default_keyset (self , mock_refresh_keys_util ): #CannotEncryptIfNoKeyFromTheDefaultKeyset
116+ def get_post_refresh_keys_response_with_no_default_keyset_key ():
117+ return create_default_key_collection ([master_key ])
118+
119+ mock_refresh_keys_util .return_value = get_post_refresh_keys_response_with_no_default_keyset_key ()
31120 client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
32121 client .refresh_keys ()
33- raw_uid = client .decrypt_sharing_token_into_raw_uid ('token' )
34- self .assertEqual (raw_uid , example_uid )
35- mock_refresh_sharing_keys .assert_called_once ()
36- mock_decrypt_token .assert_called_once_with ('token' , key_collection , None , ClientType .Sharing )
37122
38- @patch ('uid2_client.sharing_client.refresh_sharing_keys' )
123+ with self .assertRaises (EncryptionError ) as context :
124+ client .encrypt_raw_uid_into_sharing_token (example_uid )
125+ self .assertTrue ('No Site ID in keys' in context .exception )
126+
127+ def test_cannot_encrypt_if_theres_no_default_keyset_header (self , mock_refresh_keys_util ): #CannotEncryptIfTheresNoDefaultKeysetHeader
128+ def get_post_refresh_keys_response_with_no_default_keyset_header ():
129+ key_set = [master_key , site_key ]
130+ return EncryptionKeysCollection (key_set , IdentityScope .UID2 , 9000 , 1 ,
131+ "" , 86400 )
132+
133+ mock_refresh_keys_util .return_value = get_post_refresh_keys_response_with_no_default_keyset_header ()
134+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
135+ client .refresh_keys ()
136+
137+ with self .assertRaises (EncryptionError ) as context :
138+ client .encrypt_raw_uid_into_sharing_token (example_uid )
139+ self .assertTrue ('No Keyset Key Found' in context .exception )
140+
141+ def test_expiry_in_token_matches_expiry_in_response (self , mock_refresh_keys_util ): # ExpiryInTokenMatchesExpiryInResponse
142+ def get_post_refresh_keys_response_with_token_expiry ():
143+ return create_default_key_collection ([master_key , site_key ])
144+
145+ mock_refresh_keys_util .return_value = get_post_refresh_keys_response_with_token_expiry ()
146+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
147+ client .refresh_keys ()
148+
149+ ad_token = client .encrypt_raw_uid_into_sharing_token (example_uid )
150+
151+ result = client .decrypt_sharing_token_into_raw_uid (ad_token )
152+ self .assertEqual (example_uid , result .uid2 )
153+
154+ real_decrypt_v3 = encryption ._decrypt_token_v3
155+
156+ with patch ('uid2_client.encryption._decrypt_token_v3' ) as mock_decrypt :
157+ def decrypt_side_effect (token_bytes , keys , now ):
158+ return real_decrypt_v3 (token_bytes , keys , '' , ClientType .Sharing , now + dt .timedelta (seconds = 3 ), AdvertisingTokenVersion .ADVERTISING_TOKEN_V4 )
159+
160+ mock_decrypt .side_effect = decrypt_side_effect
161+
162+ with self .assertRaises (EncryptionError ) as context :
163+ client .decrypt_sharing_token_into_raw_uid (ad_token )
164+ self .assertTrue ('token expired' in context .exception )
165+
166+ def test_encrypt_key_inactive (self , mock_refresh_keys_util ): #EncryptKeyInactive
167+ def get_post_refresh_keys_response_with_key_inactive ():
168+ inactive_key = EncryptionKey (245 , site_id , now , now + dt .timedelta (days = 1 ), now + dt .timedelta (days = 2 ),
169+ site_secret , keyset_id = 99999 )
170+ return create_default_key_collection ([inactive_key ])
171+
172+ mock_refresh_keys_util .return_value = get_post_refresh_keys_response_with_key_inactive ()
173+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
174+ client .refresh_keys ()
175+
176+ with self .assertRaises (EncryptionError ) as context :
177+ client .encrypt_raw_uid_into_sharing_token (example_uid )
178+ self .assertTrue ('No Keyset Key Found' in context .exception )
179+
180+ def test_encrypt_key_expired (self , mock_refresh_keys_util ): #EncryptKeyExpired
181+ def get_post_refresh_keys_response_with_key_expired ():
182+ expired_key = EncryptionKey (245 , site_id , now , now , now - dt .timedelta (days = 1 ), site_secret ,
183+ keyset_id = 99999 )
184+ return create_default_key_collection ([expired_key ])
185+
186+ mock_refresh_keys_util .return_value = get_post_refresh_keys_response_with_key_expired ()
187+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
188+ client .refresh_keys ()
189+
190+ with self .assertRaises (EncryptionError ) as context :
191+ client .encrypt_raw_uid_into_sharing_token (example_uid )
192+ self .assertTrue ('No Keyset Key Found' in context .exception )
193+
39194 def test_refresh_keys (self , mock_refresh_sharing_keys ):
40195 key_collection = create_default_key_collection ([master_key ])
41196 mock_refresh_sharing_keys .return_value = key_collection
@@ -44,6 +199,58 @@ def test_refresh_keys(self, mock_refresh_sharing_keys):
44199 mock_refresh_sharing_keys .assert_called_once_with (self ._CONST_BASE_URL , self ._CONST_API_KEY ,
45200 client_secret_bytes )
46201
202+ def test_smoke_test (self , mock_refresh_sharing_keys ): # SmokeTest
203+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
204+ for expected_scope , expected_version in self ._test_cases :
205+ with self .subTest (expected_scope = expected_scope , expected_version = expected_version ):
206+ token = _generate_uid_token (expected_scope , expected_version , None )
207+ mock_refresh_sharing_keys .return_value = _create_key_collection (expected_scope )
208+ client .refresh_keys ()
209+ decrypted = client .decrypt_sharing_token_into_raw_uid (token )
210+ self .assertEqual (decrypted .identity_scope , expected_scope )
211+ self .assertEqual (decrypted .advertising_token_version , expected_version )
212+
213+ def test_token_lifetime_too_long_for_sharing (self , mock_refresh_sharing_keys ): # TokenLifetimeTooLongForSharing
214+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
215+ expires_in_sec = dt .datetime .now (tz = timezone .utc ) + dt .timedelta (days = 31 )
216+ max_sharing_lifetime = dt .timedelta (days = 30 ).total_seconds ()
217+ for expected_scope , expected_version in self ._test_cases :
218+ with self .subTest (expected_scope = expected_scope , expected_version = expected_version ):
219+ token = _generate_uid_token (expected_scope , expected_version , expires_in_sec )
220+ mock_refresh_sharing_keys .return_value = EncryptionKeysCollection ([master_key , site_key ],
221+ expected_scope , site_id , 1 ,
222+ 99999 , 86400 , max_sharing_lifetime )
223+ client .refresh_keys ()
224+ with self .assertRaises (EncryptionError ):
225+ client .decrypt_sharing_token_into_raw_uid (token )
226+
227+ def test_token_generated_in_the_future_to_simulate_clock_skew (self , mock_refresh_sharing_keys ): # TokenGeneratedInTheFutureToSimulateClockSkew
228+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
229+ created_at_future = dt .datetime .now (tz = timezone .utc ) + dt .timedelta (minutes = 31 ) #max allowed clock skew is 30m
230+ for expected_scope , expected_version in self ._test_cases :
231+ with self .subTest (expected_scope = expected_scope , expected_version = expected_version ):
232+ token = _generate_uid_token (expected_scope , expected_version , created_at_future )
233+ mock_refresh_sharing_keys .return_value = EncryptionKeysCollection ([master_key , site_key ],
234+ expected_scope , site_id , 1 ,
235+ 99999 , 86400 )
236+ client .refresh_keys ()
237+ with self .assertRaises (EncryptionError ):
238+ client .decrypt_sharing_token_into_raw_uid (token )
239+
240+ def test_token_generated_in_the_future_within_allowed_clock_skew (self , mock_refresh_sharing_keys ): # TokenGeneratedInTheFutureWithinAllowedClockSkew
241+ client = SharingClient (self ._CONST_BASE_URL , self ._CONST_API_KEY , client_secret )
242+ created_at_future = dt .datetime .now (tz = timezone .utc ) + dt .timedelta (minutes = 29 ) #max allowed clock skew is 30m
243+ for expected_scope , expected_version in self ._test_cases :
244+ with self .subTest (expected_scope = expected_scope , expected_version = expected_version ):
245+ token = _generate_uid_token (expected_scope , expected_version , created_at_future )
246+ mock_refresh_sharing_keys .return_value = EncryptionKeysCollection ([master_key , site_key ],
247+ expected_scope , site_id , 1 ,
248+ 99999 , 86400 )
249+ client .refresh_keys ()
250+ result = client .decrypt_sharing_token_into_raw_uid (token )
251+ self .assertIsNotNone (result )
252+
253+
47254
48255if __name__ == '__main__' :
49256 unittest .main ()
0 commit comments