-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfeatureExtractor.py
More file actions
114 lines (97 loc) · 5.31 KB
/
featureExtractor.py
File metadata and controls
114 lines (97 loc) · 5.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from rawFeatureHandler.rawFeatureHandler import rawHandler
from rawFeatureExtractor.runIDAPro import extractByIDAPro, MultiThreadingExtractByIDAPro
import threading
import os, shutil
binary_dir_path = '/Users/Max/Documents/capstone/FeatureEngineering/FeatureResource/binary/'
rawFeatures_path = '/Users/Max/Documents/capstone/FeatureEngineering/FeatureResource/rawFeatures/'
final_feature_path = '/Users/Max/Documents/capstone/FeatureEngineering/FeatureResource/features/'
class featureGenerator():
def __init__(self, binary_dir, rawFeatures_dir):
self.binary_dir = binary_dir
self.rawFeature_dir = rawFeatures_dir
def extractIDAFilefromBinary(self):
print('[INFO] Generating IDA File for Binary by IDA Pro')
for root, dirs, files in os.walk(self.binary_dir):
for file_name in files:
if file_name.startswith('.') or (not file_name.endswith('.o') and not file_name.endswith('.so')):
continue
binary_file_path = os.path.join(root, file_name)
bin_file_relative_path = binary_file_path[len(binary_dir_path):]
ida_file_relative_path = bin_file_relative_path + '.ida'
if os.path.exists(os.path.join(rawFeatures_path, ida_file_relative_path)):
print("[INFO] Skiping {0}. Found {1}".format(bin_file_relative_path, ida_file_relative_path))
continue
if not extractByIDAPro(binary_file_path):
continue
def collectBinary(self):
print('[INFO] Generating IDA File for Binary by IDA Pro')
bin_to_extract = []
for root, dirs, files in os.walk(self.binary_dir):
for file_name in files:
if file_name.startswith('.') or (not file_name.endswith('.o') and not file_name.endswith('.so')):
continue
binary_file_path = os.path.join(root, file_name)
bin_file_relative_path = binary_file_path[len(binary_dir_path):]
ida_file_relative_path = bin_file_relative_path + '.ida'
if os.path.exists(os.path.join(rawFeatures_path, ida_file_relative_path)):
print("[INFO] Skiping {0}. Found {1}".format(bin_file_relative_path, ida_file_relative_path))
continue
bin_to_extract.append(binary_file_path)
#if not extractByIDAPro(binary_file_path):
# continue
return bin_to_extract
def multithreadingIDA(self):
bin_files = self.collectBinary()
if len(bin_files) > 11:
block_len = len(bin_files)/6
bin_files_1 = bin_files[0:block_len]
bin_files_2 = bin_files[block_len:block_len*2]
bin_files_3 = bin_files[block_len*2:block_len*3]
bin_files_4 = bin_files[block_len*3:block_len*4]
bin_files_5 = bin_files[block_len*4:block_len*5]
bin_files_6 = bin_files[block_len*5:]
th = [
threading.Thread(target=MultiThreadingExtractByIDAPro, args=(bin_files_1,)),
threading.Thread(target=MultiThreadingExtractByIDAPro, args=(bin_files_2,)),
threading.Thread(target=MultiThreadingExtractByIDAPro, args=(bin_files_3,)),
threading.Thread(target=MultiThreadingExtractByIDAPro, args=(bin_files_4,)),
threading.Thread(target=MultiThreadingExtractByIDAPro, args=(bin_files_5,)),
threading.Thread(target=MultiThreadingExtractByIDAPro, args=(bin_files_6,))
]
for t in th:
t.start()
for t in th:
t.join()
else:
MultiThreadingExtractByIDAPro(bin_files)
def idaFileAnalyzer(self):
print('[INFO] Extracting Features from IDA File')
errorlog = []
for root, dirs, files in os.walk(self.rawFeature_dir):
for file_name in files:
if file_name == '.DS_Store' or file_name.startswith('.'):
continue
ida_file_path = os.path.join(root, file_name)
feature_results = rawHandler(ida_file=ida_file_path, error=errorlog).extractFromGraphs()
if feature_results == None:
continue
#feature_file_path = os.path.join(final_feature_path, os.path.basename(root)) + '.json'
feature_file_path = os.path.join(final_feature_path, root[len(rawFeatures_path):].split('/')[0] + '.json')
#if not os.path.exists(feature_file_path):
# os
print(' Writing Features into {}'.format(feature_file_path))
with open(feature_file_path, 'a+') as f:
for func in feature_results:
f.write(func)
f.write('\n')
if len(errorlog) > 0:
print('[INFO] Error When Extracting: ')
with open('ErrorExtractIDA.log', 'w') as f:
for error in errorlog:
e = '[-] ' + error
print(e)
f.write(e + '/n')
if __name__ == '__main__':
#featureGenerator(binary_dir_path, rawFeatures_path).extractIDAFilefromBinary()
featureGenerator(binary_dir_path, rawFeatures_path).multithreadingIDA()
featureGenerator(binary_dir_path, rawFeatures_path).idaFileAnalyzer()