-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess-manager.py
More file actions
109 lines (96 loc) · 3.26 KB
/
process-manager.py
File metadata and controls
109 lines (96 loc) · 3.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python3
"""
Secure process execution manager
"""
import os
import subprocess
import resource
import signal
from typing import List, Dict
class SecureProcessManager:
def __init__(self,
max_memory_mb: int = 4096,
max_cpu_time_sec: int = 300,
max_processes: int = 100):
self.max_memory = max_memory_mb * 1024 * 1024
self.max_cpu_time = max_cpu_time_sec
self.max_processes = max_processes
def set_limits(self):
"""Set resource limits before process execution"""
# Memory limit
resource.setrlimit(
resource.RLIMIT_AS,
(self.max_memory, self.max_memory)
)
# CPU time limit
resource.setrlimit(
resource.RLIMIT_CPU,
(self.max_cpu_time, self.max_cpu_time)
)
# Process/thread limit
resource.setrlimit(
resource.RLIMIT_NPROC,
(self.max_processes, self.max_processes)
)
# File descriptor limit
resource.setrlimit(
resource.RLIMIT_NOFILE,
(20000, 20000)
)
# File size limit (prevent large files)
resource.setrlimit(
resource.RLIMIT_FSIZE,
(100 * 1024 * 1024, 100 * 1024 * 1024) # 100MB
)
def execute_command(self,
cmd: List[str],
env: Dict[str, str] = None,
timeout: int = 30) -> Dict:
"""Execute command with security constraints"""
try:
# Create restricted environment
safe_env = {
'PATH': '/usr/local/bin:/usr/bin:/bin',
'HOME': '/home/developer',
'USER': 'developer',
'SHELL': '/bin/bash'
}
if env:
safe_env.update(env)
# Execute with preexec_fn to set limits
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=safe_env,
preexec_fn=self.set_limits,
start_new_session=True # Create new process group
)
# Wait with timeout
try:
stdout, stderr = proc.communicate(timeout=timeout)
return {
'returncode': proc.returncode,
'stdout': stdout.decode('utf-8', errors='replace'),
'stderr': stderr.decode('utf-8', errors='replace')
}
except subprocess.TimeoutExpired:
# Kill entire process group
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
return {
'returncode': -1,
'stdout': '',
'stderr': 'Process killed: timeout exceeded'
}
except Exception as e:
return {
'returncode': -1,
'stdout': '',
'stderr': f'Execution error: {str(e)}'
}
# Usage
if __name__ == '__main__':
manager = SecureProcessManager()
result = manager.execute_command(['python3', 'user_script.py'])
print(f"Exit code: {result['returncode']}")
print(f"Output: {result['stdout']}")