|
| 1 | +import json |
| 2 | +import re |
| 3 | +import requests |
| 4 | +import pgpy |
| 5 | +from pgpy import PGPMessage |
| 6 | +from pgpy.constants import KeyFlags, HashAlgorithm, SymmetricKeyAlgorithm, CompressionAlgorithm |
| 7 | +from sendsafely.Package import Package |
| 8 | +from sendsafely.exceptions import GetPackagesException, GetUserInformationException, TrustedDeviceException, \ |
| 9 | + DeletePackageException, GetPackageInformationFailedException, GetKeycodeFailedException |
| 10 | +from sendsafely.utilities import make_headers, _get_string_from_file, read_key_pair |
| 11 | + |
| 12 | + |
| 13 | +class SendSafely: |
| 14 | + """ |
| 15 | + Class used to setup authentication and interface with the REST API |
| 16 | + Acts as a handler for the specific queries one may perform either on packages, or more generally as a user |
| 17 | + """ |
| 18 | + API_URL = "/api/v2.0" |
| 19 | + BASE_URL = None |
| 20 | + API_KEY = None |
| 21 | + API_SECRET = None |
| 22 | + KEY_PAIR = None |
| 23 | + KEY_ID = None |
| 24 | + |
| 25 | + def __init__(self, url, api_key, api_secret): |
| 26 | + super().__init__() |
| 27 | + self.BASE_URL = url + self.API_URL |
| 28 | + self.API_KEY = api_key |
| 29 | + self.API_SECRET = api_secret |
| 30 | + |
| 31 | + def load_package_from_link(self, link): |
| 32 | + """ |
| 33 | + Creates a package object from a secure link |
| 34 | + :param link: The link |
| 35 | + :return: The Package associated with that link. |
| 36 | + """ |
| 37 | + tokens = re.split('[?&#]', link) |
| 38 | + package_code = [item for item in tokens if item.startswith("packageCode=")][0].split("packageCode=")[-1] |
| 39 | + try: |
| 40 | + client_secret = [item for item in tokens if item.startswith("keyCode=")][0].split("keyCode=")[-1] |
| 41 | + except IndexError: |
| 42 | + client_secret = [item for item in tokens if item.startswith("keycode=")][0].split("keycode=")[-1] |
| 43 | + if "#" in package_code: |
| 44 | + package_code = re.split('#', package_code)[0] |
| 45 | + package_information = self.get_package_information(package_code) |
| 46 | + package_id = package_information["packageId"] |
| 47 | + return self.load_package(package_id=package_id, key_code=client_secret) |
| 48 | + |
| 49 | + def get_user_information(self): |
| 50 | + endpoint = "/user" |
| 51 | + url = self.BASE_URL + endpoint |
| 52 | + headers = make_headers(self.API_SECRET, self.API_KEY, endpoint) |
| 53 | + try: |
| 54 | + response = requests.get(url, headers=headers).json() |
| 55 | + except Exception as e: |
| 56 | + raise GetUserInformationException(details=str(e)) |
| 57 | + if response["response"] != "SUCCESS": |
| 58 | + raise GetUserInformationException(details=response["message"]) |
| 59 | + return response |
| 60 | + |
| 61 | + def generate_trusted_device_key_pair(self, description): |
| 62 | + """ |
| 63 | + Adds a public key to this user |
| 64 | + :param description: A description of this public key to submit to SendSafely |
| 65 | + :return: The response, including key pair |
| 66 | + """ |
| 67 | + key = pgpy.PGPKey.new(pgpy.constants.PubKeyAlgorithm.RSAEncryptOrSign, 2048) |
| 68 | + email = self.get_user_information()["email"] |
| 69 | + uid = pgpy.PGPUID.new('Trusted Browser', email=email) |
| 70 | + key.add_uid(uid=uid, usage={KeyFlags.EncryptCommunications, KeyFlags.EncryptStorage}, |
| 71 | + hashes=[HashAlgorithm.SHA256], |
| 72 | + ciphers=[SymmetricKeyAlgorithm.AES256], |
| 73 | + compression=[CompressionAlgorithm.Uncompressed]) |
| 74 | + public_key = str(key.pubkey) |
| 75 | + endpoint = "/public-key" |
| 76 | + url = self.BASE_URL + endpoint |
| 77 | + body = { |
| 78 | + "publicKey": public_key, |
| 79 | + "description": description |
| 80 | + } |
| 81 | + headers = make_headers(self.API_SECRET, self.API_KEY, endpoint, request_body=json.dumps(body)) |
| 82 | + response = requests.put(url, headers=headers, json=body).json() |
| 83 | + if response["response"] != "SUCCESS": |
| 84 | + raise TrustedDeviceException(details=response["message"]) |
| 85 | + self.KEY_ID = response["id"] |
| 86 | + self.KEY_PAIR = key |
| 87 | + result = {"response": response, "privateKey": str(key), "publicKey": public_key} |
| 88 | + return result |
| 89 | + |
| 90 | + def revoke_trusted_device_key(self, public_key_id): |
| 91 | + """ |
| 92 | + Removes the public key with public_key_id from this account. |
| 93 | + :param public_key_id: The public key ID |
| 94 | + :return: the JSON response |
| 95 | + """ |
| 96 | + endpoint = "/public-key/" + public_key_id |
| 97 | + url = self.BASE_URL + endpoint |
| 98 | + headers = make_headers(self.API_SECRET, self.API_KEY, endpoint) |
| 99 | + try: |
| 100 | + response = requests.delete(url, headers=headers).json() |
| 101 | + except Exception as e: |
| 102 | + raise TrustedDeviceException(details=str(e)) |
| 103 | + if response["response"] != "SUCCESS": |
| 104 | + raise TrustedDeviceException(details=response["message"]) |
| 105 | + return response |
| 106 | + |
| 107 | + def get_package_keycode(self, package_id, public_key_id=None, private_key=None): |
| 108 | + """ |
| 109 | + Gets the decrypted package keycode using trusted device keys. |
| 110 | + Trusted device must have been assigned prior to the package being uploaded. |
| 111 | + :param package_id: The package Id |
| 112 | + :param public_key_id: The public key id for the trusted device |
| 113 | + :param private_key: The private trusted device key |
| 114 | + :return: |
| 115 | + """ |
| 116 | + #if path_to_keys: |
| 117 | + # data = read_key_pair(path_to_keys) |
| 118 | + # public_key_id = data["publicKeyId"] |
| 119 | + # private_key = data["privateKey"] |
| 120 | + if public_key_id is None or private_key is None: |
| 121 | + public_key_id = self.KEY_ID |
| 122 | + private_key = self.KEY_PAIR |
| 123 | + endpoint = '/package/' + package_id + '/link/' + public_key_id |
| 124 | + url = self.BASE_URL + endpoint |
| 125 | + headers = make_headers(self.API_SECRET, self.API_KEY, endpoint) |
| 126 | + try: |
| 127 | + keycode = requests.get(url, headers=headers).json()["message"] |
| 128 | + key_pair = pgpy.PGPKey.from_blob(str(private_key))[0] |
| 129 | + keycode_message = PGPMessage.from_blob(keycode) |
| 130 | + decrypted_keycode = key_pair.decrypt(keycode_message).message |
| 131 | + return {"keyCode": decrypted_keycode} |
| 132 | + except Exception as e: |
| 133 | + raise GetKeycodeFailedException(details=str(e)) |
| 134 | + |
| 135 | + def load_package(self, package_id, key_code=None): |
| 136 | + """ |
| 137 | + Builds a Package object from information about that package |
| 138 | + :param package_id: The Package ID |
| 139 | + :param key_code: The client secret/keycode for this package (optional) |
| 140 | + :return: The Package Object. |
| 141 | + """ |
| 142 | + package_information = self.get_package_information(package_id) |
| 143 | + server_secret = package_information["serverSecret"] |
| 144 | + package_code = package_information["packageCode"] |
| 145 | + if key_code: |
| 146 | + key_code = _get_string_from_file(key_code) |
| 147 | + package_variables = { |
| 148 | + "packageId": package_id, |
| 149 | + "serverSecret": server_secret, |
| 150 | + "packageCode": package_code, |
| 151 | + "clientSecret": key_code |
| 152 | + } |
| 153 | + return Package(self, package_variables=package_variables) |
| 154 | + |
| 155 | + def delete_package(self, package_id): |
| 156 | + """ |
| 157 | + Deletes a given package. |
| 158 | + :param package_id: the package you desire to delete. |
| 159 | + :return: the JSON response. |
| 160 | + """ |
| 161 | + endpoint = "/package/" + package_id |
| 162 | + url = self.BASE_URL + endpoint |
| 163 | + headers = make_headers(self.API_SECRET, self.API_KEY, endpoint) |
| 164 | + try: |
| 165 | + response = requests.delete(url, headers=headers).json() |
| 166 | + except Exception as e: |
| 167 | + raise DeletePackageException(details=str(e)) |
| 168 | + if response["response"] != "SUCCESS": |
| 169 | + raise DeletePackageException(details=response["message"]) |
| 170 | + return response |
| 171 | + |
| 172 | + def get_package_information(self, package_id): |
| 173 | + """ |
| 174 | + Get a detailed status of a given package |
| 175 | + :param package_id: The package you desire to inquire about. |
| 176 | + :return: The detailed status as a JSON response. |
| 177 | + """ |
| 178 | + endpoint = "/package/" + package_id |
| 179 | + url = self.BASE_URL + endpoint |
| 180 | + headers = make_headers(self.API_SECRET, self.API_KEY, endpoint) |
| 181 | + try: |
| 182 | + response = requests.get(url, headers=headers).json() |
| 183 | + except Exception as e: |
| 184 | + raise GetPackageInformationFailedException(details=str(e)) |
| 185 | + if response["response"] != "SUCCESS": |
| 186 | + raise GetPackageInformationFailedException(details=response["message"]) |
| 187 | + return response |
| 188 | + |
| 189 | + def get_package_information_from_link(self, link): |
| 190 | + """ |
| 191 | + Get a detailed status of a given package given a secure link |
| 192 | + :param link: The secure link. |
| 193 | + :return: The detailed status as a JSON response. |
| 194 | + """ |
| 195 | + tokens = re.split('[?&#]', link) |
| 196 | + package_code = [item for item in tokens if item.startswith("packageCode=")][0].split("packageCode=")[-1] |
| 197 | + return self.get_package_information(package_code) |
| 198 | + |
| 199 | + def get_received_packages(self, row_index=0, page_size=100): |
| 200 | + """ |
| 201 | + Get all packages received by this user. |
| 202 | + :param row_index: The row to start at |
| 203 | + :param page_size: The number of pages to fetch at a time |
| 204 | + :return: The JSON response as a list of packages |
| 205 | + """ |
| 206 | + endpoint = "/package/received" |
| 207 | + url = self.BASE_URL + endpoint |
| 208 | + all_packages = [] |
| 209 | + pagination_data = [] |
| 210 | + try: |
| 211 | + while True: |
| 212 | + params = { |
| 213 | + "rowIndex": row_index, |
| 214 | + "pageSize": page_size |
| 215 | + } |
| 216 | + headers = make_headers(self.API_SECRET, self.API_KEY, endpoint) |
| 217 | + response = requests.get(url, headers=headers, params=params).json() |
| 218 | + pagination = response["pagination"] |
| 219 | + pagination_data.append(pagination) |
| 220 | + packages = response["packages"] |
| 221 | + all_packages.extend(packages) |
| 222 | + if len(packages) < page_size: |
| 223 | + return {"packages": all_packages, "pagination": pagination_data} |
| 224 | + row_index += page_size |
| 225 | + except Exception as e: |
| 226 | + raise GetPackagesException(details=str(e)) |
| 227 | + |
| 228 | + def get_sent_packages(self, row_index=0, page_size=100): |
| 229 | + """ |
| 230 | + Get all packages sent by this user |
| 231 | + :param row_index: The row to start at |
| 232 | + :param page_size: The number of pages to fetch at a time |
| 233 | + :return: The JSON response as a list of packages |
| 234 | + """ |
| 235 | + endpoint = "/package" |
| 236 | + url = self.BASE_URL + endpoint |
| 237 | + all_packages = [] |
| 238 | + pagination_data = [] |
| 239 | + try: |
| 240 | + while True: |
| 241 | + params = { |
| 242 | + "rowIndex": row_index, |
| 243 | + "pageSize": page_size |
| 244 | + } |
| 245 | + headers = make_headers(self.API_SECRET, self.API_KEY, endpoint) |
| 246 | + response = requests.get(url, headers=headers, params=params).json() |
| 247 | + pagination = response["pagination"] |
| 248 | + pagination_data.append(pagination) |
| 249 | + packages = response["packages"] |
| 250 | + all_packages.extend(packages) |
| 251 | + if len(packages) < page_size: |
| 252 | + return {"packages": all_packages, "pagination": pagination_data} |
| 253 | + row_index += page_size |
| 254 | + except Exception as e: |
| 255 | + GetPackagesException(details=str(e)) |
0 commit comments