-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmongo_field_level_encryption.py
More file actions
76 lines (62 loc) · 2.69 KB
/
mongo_field_level_encryption.py
File metadata and controls
76 lines (62 loc) · 2.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from mongoengine import Document, StringField, connect
from cryptography.fernet import Fernet
import base64
class EncryptFieldMixin:
@staticmethod
def _encrypt(value, encryption_key):
f = Fernet(encryption_key)
encrypted_value = f.encrypt(value.encode())
return base64.b64encode(encrypted_value).decode("utf-8")
@staticmethod
def _decrypt(value, encryption_key):
try:
decoded_value = base64.b64decode(value.encode("utf-8")) # Base64 decode
f = Fernet(encryption_key)
decrypted_value = f.decrypt(decoded_value).decode() # Decrypt and decode
return decrypted_value
except (base64.binascii.Error, ValueError):
return None # Handle potential decoding errors
def _generate_encryption_key(self):
key = getattr(self, "key", "") # Retrieve base64-encoded key
code_bytes = key.encode("utf-8")
hash_key = base64.urlsafe_b64encode(code_bytes.ljust(32)[:32])
return hash_key # Use the hashed key for encryption/decryption
class EncryptedStringField(StringField, EncryptFieldMixin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.encrypted_fields = [self.name]
def to_mongo(self, value):
if value is not None: # Check if value is provided
encryption_key = self._generate_encryption_key()
encrypted_value = self._encrypt(value, encryption_key)
return encrypted_value
return value
def to_python(self, value):
if value is not None: # Check if value is provided
encryption_key = self._generate_encryption_key()
decrypted_value = self._decrypt(value, encryption_key)
return decrypted_value
return value
class MyDocument(Document, EncryptFieldMixin):
key = "aTtGg3I8CVj3nQ0nKwlSENnxO_Dl-oz8hdyArVeGvOw="
ssn = EncryptedStringField(required=True, key=key)
name = StringField(required=True)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if 'ssn' in kwargs:
self.ssn = kwargs['ssn']
if __name__ == '__main__':
# Example usage
connect(host="mongodb://localhost:27017/test-db")
# Replace with your actual base64-encoded key
ssn_value = "12333-12323-1212"
# Pass ssn directly to EncryptedStringField during instantiation
my_doc = MyDocument(ssn=ssn_value, name="John Doe")
try: # Manually trigger validation
my_doc.save()
print("Object saved")
except Exception as e:
print(f"Error: {str(e)}")
my_docs = MyDocument.objects
for item in my_docs:
print(f"ssn:{item.ssn} and name:{item.name}")