-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmalware_detect.py
More file actions
91 lines (74 loc) · 3.02 KB
/
malware_detect.py
File metadata and controls
91 lines (74 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import os
import json
import datetime
import base64
import uuid
import shutil
from mimetypes import MimeTypes
import requests
from flask import make_response, request
from flask_restful import Resource
from clustering import clustering
UPLOAD_DIR = 'uploads'
class CustomException(Exception):
pass
class StillProcessing(Exception):
pass
class MalwareDetect(Resource):
def __init__(self):
self.session = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/74.0.3729.169 Safari/537.36',
}
self.user_data = None
self.device_id = uuid.uuid4().hex
self.uuid = uuid.uuid4().hex
self.mime = MimeTypes()
current_time = int(datetime.datetime.utcnow().timestamp())
self.upload_filename = f'{current_time}.json'
print(self.upload_filename)
self.response = {'malware': '', 'confidence': 0.0}
def post(self):
response_object = {
'status': 'fail',
'message': 'Invalid payload.'
}
is_parse = request.is_json
if not is_parse:
response_object['message'] = 'Arguments parsing is failed.'
return make_response(json.dumps(response_object), 404)
# receive data
try:
content = request.get_json()
file_list = content['file']
filename_list = content['names']
if len(file_list) == 0:
response_object['message'] = 'Not file selected.'
return make_response(json.dumps(response_object), 503)
except Exception as error:
response_object['message'] = 'Failed to read data because {}'.format(repr(error))
return make_response(json.dumps(response_object), 404)
# saving data
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs('data/json', exist_ok=True)
try:
for i, file in enumerate(file_list):
filename = os.path.join(UPLOAD_DIR, filename_list[i])
encoded_text = file.split(',')[-1]
text_buf = base64.b64decode(encoded_text)
with open(filename, 'wb') as f:
f.write(text_buf)
shutil.copy(filename, os.path.join('data', 'json', os.path.basename(filename)))
except Exception as error:
response_object['message'] = 'Failed to save data because {}'.format(repr(error))
return make_response(json.dumps(response_object), 503)
# call clustering module
try:
result = clustering(new_file_list=filename_list)
response_object['status'] = 'success'
response_object['message'] = '<br>'.join(["{}: {}".format(x[0], x[1]) for x in result])
except Exception as error:
response_object['message'] = repr(error)
shutil.rmtree(UPLOAD_DIR, ignore_errors=True)
return make_response(json.dumps(response_object, indent=2), 200)