-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent.py
More file actions
71 lines (59 loc) · 2.18 KB
/
agent.py
File metadata and controls
71 lines (59 loc) · 2.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from typing import Tuple
import re
from sqlglot import parse_one
DISALLOWED_KEYWORDS = [
"insert",
"update",
"delete",
"drop",
"truncate",
"alter",
"create",
"merge",
"replace",
"grant",
"revoke",
]
def _normalize(sql: str) -> str:
if not sql:
return ""
s = sql.strip()
# Remove block comments and line comments to avoid bypasses
s = re.sub(r"/\*.*?\*/", "", s, flags=re.S)
s = re.sub(r"--.*?$", "", s, flags=re.M)
return s.strip()
def verify_sql_query(sql: str, dialect: str = "default") -> Tuple[bool, str]:
"""
Verify that the SQL is read-only and not a CRUD or DDL statement.
Returns (is_allowed, message). If allowed, message is empty.
"""
s = _normalize(sql)
if not s:
return False, "Empty SQL"
# Reject multiple statements separated by semicolon
if ";" in s and s.strip().count(";") >= 1:
# allow a single trailing semicolon but not multiple statements
parts = [p for p in s.split(";") if p.strip()]
if len(parts) > 1:
return False, "Multiple statements are not allowed"
s = parts[0]
first_word = s.split()[0].lower()
# Allow SELECT and common read-only meta-commands
allowed_first = {"select", "with", "explain", "describe", "show", "pragma"}
if first_word not in allowed_first:
return False, f"Only read-only statements are allowed. Found: {first_word}"
# Quick keyword check for disallowed words
low = s.lower()
for kw in DISALLOWED_KEYWORDS:
if re.search(r"\b" + re.escape(kw) + r"\b", low):
return False, f"Disallowed keyword found: {kw}"
# Use sqlglot parsing to ensure syntactic validity and check AST type
try:
expr = parse_one(s, read=dialect if dialect != "default" else None)
except Exception as e:
return False, f"SQL parsing error: {e}"
# If parsed, ensure it's a SELECT/EXPLAIN/SHOW/PRAGMA-like node
node_type = expr.__class__.__name__.lower()
if node_type not in {"select", "explain", "show"} and not first_word in allowed_first:
return False, f"Only read-only query types are allowed (detected: {node_type})"
return True, ""