-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_handler.py
More file actions
89 lines (70 loc) · 2.75 KB
/
lambda_handler.py
File metadata and controls
89 lines (70 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import json
import os
import sys
import tempfile
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeout
from pathlib import Path
from pipeline import ObfuscationConfig, run_pipeline
from Utils.input_guard import validate_input
_TIMEOUT_SECONDS = 10
sys.setrecursionlimit(1000)
# Fields the caller must not inject (server-side concerns)
_BLOCKED_FIELDS = {"input_path", "output_path", "return_code"}
def lambda_handler(event, context):
try:
http = event.get("requestContext", {}).get("http", {})
method = http.get("method", "POST").upper()
if method != "POST":
return _error(405, "Method not allowed")
body = event.get("body") or "{}"
if isinstance(body, str) and len(body.encode()) > 51_200:
return _error(413, "Payload too large")
payload = json.loads(body) if isinstance(body, str) else body
source = payload.pop("source", None)
if source is None:
return _error(400, "Missing required field: 'source'")
for field in _BLOCKED_FIELDS:
payload.pop(field, None)
try:
validate_input(source)
except ValueError as e:
return _error(400, str(e))
with tempfile.NamedTemporaryFile(
mode="w", suffix=".py", dir="/tmp", delete=False, encoding="utf-8"
) as f:
f.write(source)
input_path = Path(f.name)
try:
cfg = ObfuscationConfig(input_path=input_path, return_code=True, **payload)
result = _run_with_timeout(cfg)
finally:
input_path.unlink(missing_ok=True)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"code": result}),
}
except FutureTimeout:
return _error(408, f"Pipeline timed out after {_TIMEOUT_SECONDS} seconds")
except json.JSONDecodeError:
return _error(400, "Invalid JSON body")
except SyntaxError as e:
return _error(400, f"Syntax error in source: {e}")
except RecursionError:
return _error(400, "Input too deeply nested")
except (ValueError, TypeError) as e:
return _error(400, str(e))
except FileNotFoundError as e:
return _error(404, str(e))
except Exception:
return _error(500, "Processing error")
def _run_with_timeout(cfg: ObfuscationConfig) -> str:
with ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(run_pipeline, cfg)
return future.result(timeout=_TIMEOUT_SECONDS)
def _error(status: int, message: str) -> dict:
return {
"statusCode": status,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"error": message}),
}