-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
147 lines (122 loc) · 5.37 KB
/
client.py
File metadata and controls
147 lines (122 loc) · 5.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from Gamal import *
from cert_utils import *
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives import serialization
from flask import Flask, redirect, url_for, request
import threading
import os
import base64
app = Flask(__name__)
import requests
import argparse
from base64 import b64encode, b64decode
@app.route('/msg', methods=['POST'])
def receive_message():
input_json = request.get_json(force=True)
encrypted_message = input_json["msg"]
sig_r = input_json["sig_r"]
sig_s = input_json["sig_s"]
sender_id = input_json["id"]
message = private_key_decrypt(encrypted_message, client_private_key) # Decrypt the message.
print(" Requesting certificates for user: ", sender_id)
client_data = {'id': client_id,
'receiver': sender_id}
res = requests.post(ca_url + "/get_client_key_cert", json=client_data)
receiver_cert = cert_from_bytes(str.encode(res.text))
valid_cert = cert_validate_signature(receiver_cert, ca_public_key) # Validate cert.
if valid_cert:
print("RSA public key cert received from CA for receiver: ", client_data["receiver"],
" is valid")
else:
print("RSA public key cert received from CA for receiver: ", client_data["receiver"],
" is invalid")
return
res = requests.post(ca_url + "/get_client_gammal_cert", json=client_data)
receiver_cert = cert_from_bytes(str.encode(res.text))
valid_cert = cert_validate_signature(receiver_cert, ca_public_key) # Validate cert.
if valid_cert:
print("Gammal public key cert received from CA for receiver: ", client_data["receiver"],
" is valid")
else:
print("Gammal public key cert received from CA for receiver: ", client_data["receiver"],
" is invalid")
return
gammal_fake_pub_key = cert_get_pub_key(receiver_cert)
gammal_pub_key = get_fake_key_val(gammal_fake_pub_key)
valid_signature = verifyElGamalSignature(gammal_pub_key, sig_r, sig_s, message)
print("Message received from: ", sender_id)
if valid_signature:
print("Valid signature of: ", sender_id)
else:
print("Invalid signature")
return "no"
if valid_signature:
print("Message:", message)
return "ok"
def send_message(message, url):
"""
Get receiver certificate, validate it, encrypts the message with it and then send it.
"""
client_data = {'id': client_id,
'receiver': other_client_id}
res = requests.post(ca_url + "/get_client_key_cert", json=client_data)
receiver_cert = cert_from_bytes(str.encode(res.text))
valid_cert = cert_validate_signature(receiver_cert, ca_public_key) # Validate cert.
if valid_cert:
print("RSA public key cert received from CA for receiver: ", client_data["receiver"],
" is valid")
else:
print("RSA public key cert received from CA for receiver: ", client_data["receiver"],
" is invalid")
return
receiver_public_key = cert_get_pub_key(receiver_cert)
encrypted_message = public_key_encrypt(message, receiver_public_key)
rr, ss = generateElGamalSignature(sig_priv, message) # signature.
client_data = {'id': client_id,
'msg': encrypted_message,
"sig_r": rr,
"sig_s": ss
}
requests.post(url + "/msg", json=client_data)
def publish_client_data_to_ca(ca_url):
# Generating certificate for rsa public key.
client_data = {'id': client_id,
'public_key': client_public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)}
requests.post(ca_url + "/generate_key_cert", json=client_data)
# Generating certificate for el gammal public key.
client_data = {'id': client_id,
'public_key': sig_pub_key}
requests.post(ca_url + "/generate_gammal_cert", json=client_data)
def send_loop():
while True:
message = input(" write message to send to client: " + other_client_url +
" | write exit to exit\n") # Message to send.
if message == "exit":
return
send_message(message, other_client_url)
print(" Message sent")
if __name__ == "__main__":
sig_priv, sig_pub_key = generateElGamalKey()
curr_dir_path = os.path.dirname(os.path.realpath(__file__)) # Get current directory
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", help="your port")
parser.add_argument("-op", "--otherport", help="receiver port")
args = parser.parse_args()
ca_public_key = public_key_load(curr_dir_path + "/ca/ca_public_key.pem")
other_client_id = args.otherport
other_client_url = 'http://127.0.0.1:' + args.otherport
client_id = args.port
client_private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()) # Generate private key for client.
client_public_key = client_private_key.public_key() # Generate public key for client.
ca_url = 'http://127.0.0.1:5000'
print(" Sending ", client_id, " data to CA")
publish_client_data_to_ca(ca_url)
sending_thread = threading.Thread(target=send_loop)
sending_thread.start()
app.run(port=args.port)