-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutor.py
More file actions
224 lines (181 loc) · 6.03 KB
/
executor.py
File metadata and controls
224 lines (181 loc) · 6.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
from codesafety import CodeSafety
import os
import json
import subprocess
import datetime
import logging
from uuid import uuid4
logger = logging.getLogger('executor')
def gen_filename():
return f"{uuid4()}-{datetime.datetime.now().strftime('%Y%m%d')}.py".replace('-', "_")
class Executor:
"""
Executes arbitrary user-defined python functions by creating
temporary script files within an `executions` directory
and calling the generated file with `subprocess.run`.
Class Attributes:
EXEC_PATH : str
directory that stores temporary user scripts during execution
will be auto-created if it does not already exist
TIMEOUT : int
max time in wall-clock seconds a user's function will run
within a subprocess
STDOUT_SPLIT_MAGIC : str
a magic string used to identify the user's function
result and separate it within stdout
DEFAULT_IMPORTS : list[str]
a list of imports to add into the user script
"""
EXEC_PATH = "executions"
TIMEOUT = 5 # seconds
STDOUT_SPLIT_MAGIC = "||------*|*+=|=+*|*------||"
DEFAULT_IMPORTS = [
"numpy"
, "pandas"
, "os"
, "json"
, "re"
, "datetime"
]
@classmethod
def setup(cls) -> None:
""" Creates the execution directory """
os.makedirs(cls.EXEC_PATH, exist_ok=True)
@classmethod
def import_headers(cls) -> str:
"""
Reads DEFAULT_IMPORTS to create valid import statements
for a downstream user script
"""
return '\n'.join(
f"import {library}"
for library in cls.DEFAULT_IMPORTS
) + '\n'
@classmethod
def gen_script(cls, script : str) -> str:
"""
Generates a temporary python script loaded with
DEFAULT_IMPORTS, a helper function `handle_output`,
and the user-defined `main` function
CodeSafety will verify that the function isn't malicious
(to a degree) and will raise an exception if the internal
heuristics identify malicious behavior
Creates a temporary file within `EXEC_PATH` and returns
the filename
"""
fname = gen_filename()
CodeSafety.check_user_main_function(
fname, script
)
imports = cls.import_headers()
script_text = f"""
{imports}
{script}
def handle_output(res):
match res:
case str():
print(json.dumps(json.loads(res)))
case dict():
print(json.dumps(res))
case _:
raise Exception(str(type(res)) + " isn't handled currently!")
if __name__ == "__main__":
result = main()
print("{cls.STDOUT_SPLIT_MAGIC}")
print(handle_output(result))
"""
with open(os.path.join(cls.EXEC_PATH, fname), 'w') as f:
f.write(script_text)
return fname
@classmethod
def cleanup(cls, fname : str) -> None:
"""
Removes the temporary user script at {EXEC_PATH}/{fname}
"""
if fname is None:
return
path = os.path.join(cls.EXEC_PATH, fname)
if os.path.exists(path):
os.remove(path)
@classmethod
def handle_execution_result(
cls
, result : subprocess.CompletedProcess | None = None
, error : Exception | None = None
) -> dict:
"""
Handles the success or failure of a specific user script's
execution. If `error` exists, the function raises it immediately,
otherwise result will be populated.
If result exists, parses the stdout from the subprocess
to return a dictionary containing the user-defined
script result along with the rest of stdout
"""
if error is not None:
raise error
status = result.returncode
if status != 0:
raise Exception(f"Process exited with non-zero status: {status}!")
raw_stdout = result.stdout.decode()
splits = raw_stdout.split('\n')
stdout = []
output = None
for i, line in enumerate(splits):
if line.strip() == cls.STDOUT_SPLIT_MAGIC:
output = json.loads(splits[i + 1])
break
else:
stdout.append(line)
return {
'result': output
, 'stdout': ''.join(stdout)
}
@classmethod
def execute(cls, fname : str, nsjail : bool = False) -> dict:
"""
Executes a given filename through subprocess
and hands the result/error to handle_execution_result
nsjail flag can be set to force execution in a jailed environment
"""
error = result = None
logger.info(f"Executing {fname}...")
try:
if nsjail:
cmd = [
'nsjail', '-C', '/etc/nsjail.cfg'
, '--chroot', '/', '--'
, '/usr/bin/python3'
, os.path.join('/app', cls.EXEC_PATH, fname)
]
else:
cmd = ['python3', os.path.join(cls.EXEC_PATH, fname)]
result = subprocess.run(
cmd
, timeout=cls.TIMEOUT
, stdout=subprocess.PIPE
)
except Exception as e:
logger.warning(e)
error = e
return cls.handle_execution_result(
result=result, error=error
)
@classmethod
def run(cls, script : str, nsjail : bool = False) -> dict:
"""
The main entry point of Executor, runs the entire
setup -> verify -> generate -> execute -> cleanup
loop for a given user-defined script
"""
cls.setup()
result = None
fname = None
try:
fname = cls.gen_script(script)
result = cls.execute(fname, nsjail=nsjail)
except Exception as e:
# TODO: log errors to db or inspect results
result = {'error': e}
finally:
cls.cleanup(fname)
return result