-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpython code
More file actions
322 lines (257 loc) · 12.7 KB
/
python code
File metadata and controls
322 lines (257 loc) · 12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
import os
import base64
import hashlib
import time
class AESCipher:
def __init__(self, password: str, salt: bytes, mode: str = 'CBC', print_inner_data: bool = False):
"""
AES Cipher class to handle encryption and decryption.
:param password: Password for key generation.
:param salt: Salt for key generation.
:param mode: Encryption mode ('CBC' or 'GCM'). Defaults to 'CBC'.
"""
self.password = password
self.salt = salt
assert mode in ('CBC', 'GCM')
self.mode = mode
self.key = self.generate_key(password, salt)
self.print_inner_data = print_inner_data
def generate_key(self, password: str, salt: bytes) -> bytes:
"""
Derives a cryptographic key from the password and salt using PBKDF2-HMAC.
:param password: Password to derive the key from.
:param salt: Salt to add entropy to the key derivation process.
:return: Derived key (32 bytes).
"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32, # AES-256
salt=salt,
iterations=100000,
backend=default_backend()
)
return kdf.derive(password.encode())
def encrypt(self, message: str) -> str:
"""
Encrypts a message using AES in either CBC or GCM mode.
:param message: The plaintext message to encrypt.
:return: Base64-encoded ciphertext.
"""
if self.mode == 'CBC':
return self.encrypt_cbc(message)
elif self.mode == 'GCM':
return self.encrypt_gcm(message)
def decrypt(self, encrypted_message: str) -> str:
"""
Decrypts a base64-encoded ciphertext using AES in either CBC or GCM mode.
:param encrypted_message: The base64-encoded ciphertext to decrypt.
:return: The decrypted plaintext message.
"""
if self.mode == 'CBC':
return self.decrypt_cbc(encrypted_message)
elif self.mode == 'GCM':
return self.decrypt_gcm(encrypted_message)
def encrypt_cbc(self, message: str) -> str:
"""
Encrypts a message using AES in CBC mode with PKCS7 padding.
:param message: The plaintext message to encrypt.
:return: Base64-encoded ciphertext.
"""
iv = os.urandom(16) # Generate a random IV
cipher = Cipher(algorithms.AES(self.key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
if self.print_inner_data:
print(f"{str(self)}\npassword: {self.password}, salt: {self.salt}, iv: {iv}", end="\n\n")
# Apply PKCS7 padding to the message
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_message = padder.update(message.encode()) + padder.finalize()
# Encrypt the padded message
encrypted_message = encryptor.update(padded_message) + encryptor.finalize()
# Combine IV and ciphertext, then base64 encode
return base64.b64encode(iv + encrypted_message).decode()
def decrypt_cbc(self, encrypted_message: str) -> str:
"""
Decrypts a base64-encoded ciphertext using AES in CBC mode.
:param encrypted_message: The base64-encoded ciphertext to decrypt.
:return: The decrypted plaintext message.
"""
encrypted_message = base64.b64decode(encrypted_message.encode())
iv = encrypted_message[:16] # Extract the IV
cipher = Cipher(algorithms.AES(self.key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
# Decrypt the message
decrypted_message = decryptor.update(encrypted_message[16:]) + decryptor.finalize()
# Remove PKCS7 padding
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
unpadded_message = unpadder.update(decrypted_message) + unpadder.finalize()
return unpadded_message.decode()
def encrypt_gcm(self, message: str) -> str:
"""
Encrypts a message using AES in GCM mode.
:param message: The plaintext message to encrypt.
:return: Base64-encoded ciphertext with IV and authentication tag.
"""
iv = os.urandom(12) # Generate a random 12-byte IV for AES-GCM
cipher = Cipher(algorithms.AES(self.key), modes.GCM(iv), backend=default_backend())
encryptor = cipher.encryptor()
if self.print_inner_data:
print(f"{str(self)}\npassword: {self.password}, salt: {self.salt}, iv: {iv}", end="\n\n")
# Encrypt the message
ciphertext = encryptor.update(message.encode()) + encryptor.finalize()
# Return IV + authentication tag + ciphertext as base64
return base64.b64encode(iv + encryptor.tag + ciphertext).decode()
def decrypt_gcm(self, encrypted_message: str) -> str:
"""
Decrypts a base64-encoded ciphertext using AES in GCM mode.
:param encrypted_message: The base64-encoded ciphertext to decrypt.
:return: The decrypted plaintext message.
"""
encrypted_message = base64.b64decode(encrypted_message)
iv = encrypted_message[:12] # Extract the 12-byte IV
tag = encrypted_message[12:28] # Extract the authentication tag
ciphertext = encrypted_message[28:]
cipher = Cipher(algorithms.AES(self.key), modes.GCM(iv, tag), backend=default_backend())
decryptor = cipher.decryptor()
# Decrypt the ciphertext
return (decryptor.update(ciphertext) + decryptor.finalize()).decode('utf-8')
def __str__(self):
"""
Return a string representation of the AES_Cipher instance, including
the mode of encryption, the password hash, and the salt.
This helps to uniquely identify the instance.
"""
# Create a unique identifier using a hash of the password and salt
password_hash = hashlib.sha256(self.password.encode()).hexdigest()[:6]
salt_hash = hashlib.sha256(self.salt).hexdigest()[:6]
return f"AESCipher(mode={self.mode}, password_hash={password_hash}, salt_hash={salt_hash})"
from memory_profiler import memory_usage
def run_aes_operations(mode, iteration_count, custom_message=None, message_print_limit=None):
"""
Perform AES encryption and decryption in the specified mode (CBC or GCM) for a given number of iterations.
Args:
- mode (str): The AES mode to use ('CBC' or 'GCM').
- iteration_count (int): The number of iterations to run the encryption/decryption cycle.
- custom_message (str): Assess custom message
- message_print_limit (int): Message substring length for printing (can be used for large input)
"""
for i in range(iteration_count):
# Record the start time for performance measurement
start_time = time.perf_counter()
# Generate a password dynamically for each iteration
password = f"{mode.lower()}password{i}"
# Generate a random salt for key derivation
salt = os.urandom(16)
# Create the AES Cipher instance (assuming AESCipher class is defined and handles CBC/GCM modes)
cipher = AESCipher(password, salt, mode=mode, print_inner_data=True)
# Message to be encrypted and decrypted
message = f"{mode} secret message {i}" if custom_message is None else custom_message
print(f"Original Message ({mode}): {message[:message_print_limit if message_print_limit is not None else len(message)]}")
# Encrypt the message using the specified AES cipher mode
mem_usage_before = memory_usage()[0]
encrypted_message = cipher.encrypt(message)
mem_usage_after = memory_usage()[0]
print(f"Encrypted ({mode}): {encrypted_message[:message_print_limit if message_print_limit is not None else len(encrypted_message)]}, memory usage: {mem_usage_after - mem_usage_before}Mb")
# Decrypt the message to verify if it matches the original
mem_usage_before = memory_usage()[0]
decrypted_message = cipher.decrypt(encrypted_message)
mem_usage_after = memory_usage()[0]
print(f"Decrypted ({mode}): {decrypted_message[:message_print_limit if message_print_limit is not None else len(decrypted_message)]}, memory usage: {mem_usage_after - mem_usage_before}Mb")
# Record the end time after encryption and decryption operations
end_time = time.perf_counter()
# Calculate the elapsed time (in seconds) for this operation
elapsed_time = end_time - start_time
print(f"Elapsed Time ({mode}): {elapsed_time:.6f} seconds", end="\n\n\n")
# Run AES operations in CBC mode for 2 iterations
run_aes_operations(mode='CBC', iteration_count=2)
# Run AES operations in GCM mode for 2 iterations
run_aes_operations(mode='GCM', iteration_count=2)
import random
import string
length = 5*10**7 # 50 million characters
characters = string.ascii_letters + string.digits + string.punctuation # Available characters
random_large_string = ''.join(random.choices(characters, k=length))
random_large_string[:10]
# Run AES operations in CBC mode with large input
run_aes_operations(mode='CBC', iteration_count=1, custom_message=random_large_string, message_print_limit=10)
# Run AES operations in GCM mode with large input
run_aes_operations(mode='GCM', iteration_count=1, custom_message=random_large_string, message_print_limit=10)
import requests
import json
from base64 import b64decode, b64encode
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Util.Padding import pad
# AWS instance IP
API_URL = "http://54.206.11.77:8000/"
# The message we want to encrypt and decrypt
message = "Hello, this is a secret message!"
# Function to get the public key from API
def get_public_key():
response = requests.get(f"{API_URL}/v1/encryption/get-rsa-key")
if response.status_code == 200:
public_key_base64 = response.json().get('publicKey')
# Decode key from base64
decoded_key = b64decode(public_key_base64)
return decoded_key
else:
print("Error fetching public key")
return None
print("Original message: ", message)
# Fetch the public key from the API
public_key_pem = get_public_key()
public_key_pem.decode('utf-8').replace('\\n', '\n')
public_key_pem = get_public_key()
public_key_encoded = b64encode(public_key_pem).decode('utf-8')
if public_key_pem:
response = requests.post(f"{API_URL}/v1/encryption/encrypt-rsa", json={"message": message, "publicKey": public_key_encoded})
encrypted_message = None
if response.status_code == 200:
encrypted_message = response.json().get("encryptedMessage")
print("Encrypted message: ", encrypted_message)
else:
print(f"Failed to send message: {response.status_code} - {response.text}")
response = requests.post(f"{API_URL}/v1/encryption/decrypt-rsa", json={"encryptedMessage": encrypted_message})
if response.status_code == 200:
print("Decrypted message: ", response.json().get("decryptedMessage"))
else:
print(f"Failed to send message: {response.status_code} - {response.text}")
else:
print("Public key not available, unable to encrypt the message.")
password = "cbcpassword"
message = "CBC secret message"
encrypted_message, iv, salt = None, None, None
response = requests.post(f"{API_URL}/v1/encryption/encrypt-aes", json={"message": message, "password": password})
if response.status_code == 200:
response_data = response.json()
encrypted_message = response_data.get('encryptedMessage')
iv = response_data.get('iv')
salt = response_data.get('salt')
print(f"Encrypted message: {encrypted_message}")
print(f"IV: {iv}")
print(f"Salt: {salt}")
response = requests.post(f"{API_URL}/v1/encryption/decrypt-aes", json={
"encryptedMessage": encrypted_message,
"password": password,
"iv": iv,
"salt": salt,
})
if response.status_code == 200:
decrypted_message = response.json().get("decryptedMessage")
print("Decrypted message: ", decrypted_message)
response = requests.post(f"{API_URL}/v1/encryption/decrypt-aes", json={
"encryptedMessage": encrypted_message,
"password": "wrongpassword",
"iv": iv,
"salt": salt,
})
if response.status_code == 200:
decrypted_message = response.json().get("decryptedMessage")
print("Decrypted message: ", decrypted_message)
elif response.status_code == 500:
error_msg = response.json().get("error")
print(error_msg)