-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutor.py
More file actions
82 lines (71 loc) · 2.46 KB
/
executor.py
File metadata and controls
82 lines (71 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import numpy
import array
import sys
import os
import enum
import subprocess
import pexpect
from multiprocessing.managers import SharedMemoryManager
from multiprocessing import shared_memory
MEM_SIZE = 0x100000
class Executor():
def __init__(self, engine):
self.prog_argv = []
self.engine = engine
self.prog_argv.append(engine)
self.smm = SharedMemoryManager()
self.smm.start() # Start the process that manages the shared memory blocks
self.raw_shm = self.smm.SharedMemory(size=MEM_SIZE)
os.environ["SHM_ID"] = self.raw_shm.name
#with open("blank.php", "w") as f:
# f.write("<?php\n?>")
#self.execute_prog("blank.php")
self.ret_code = None
os.remove("blank.php")
def __del__(self):
self.smm.shutdown(self)
def execute_prog(self, script):
command = self.prog_argv.copy()
command.append(script)
try:
child = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
stdout, stderr = child.communicate(timeout=40) #timeout after 40 seconds
self.ret_code = child.returncode
child.kill()
except Exception as e:
return -1
if self.ret_code == 1:
return 'bad'
else:
return stdout
def adjust_coverage_with_dummy_executions(self):
child = pexpect.spawnu(self.engine, ['-a'])
child.expect('php >')
cmd_input = "<?php\n?>"
for i in range(150):
child.write(cmd_input)
def save_global_coverage_map_in_file(self, file_name):
with open(file_name,'wb') as f:
f.write(self.raw_shm.buf[:])
def load_global_coverage_map_from_file(self, file_name):
with open(file_name,'rb') as f:
data = f.read()
self.raw_shm.buf[:] = data[:]
def read(self):
edge_count = 0
arr = array.array('B', self.raw_shm.buf[:])
for i in arr:
if i != 0:
edge_count += i.bit_count()
return edge_count
def make_base_map(self, filename):
self.adjust_coverage_with_dummy_executions()
self.save_global_coverage_map_in_file(filename)
def main():
executor = Executor(COVERAGE_ENGINE)
executor.make_base_map("base_map_php_4_20_24")
if __name__ == "__main__":
main()