-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb_utils.py
More file actions
105 lines (88 loc) · 3.53 KB
/
db_utils.py
File metadata and controls
105 lines (88 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""Database utilities for the MariaDB REST API.
This module isolates low-level DB operations so that the rest of the code can
use simple helpers instead of talking to `mariadb` directly.
"""
from typing import Dict, List, Optional
import configparser
import logging
import os
import mariadb
from fastapi import HTTPException, status
# Local config loading so this module does not depend on main.py
_config = configparser.ConfigParser()
_config.read("config.ini")
DB_HOST = os.getenv("DB_HOST", _config.get("database", "host", fallback="localhost"))
DB_PORT = int(os.getenv("DB_PORT", str(_config.getint("database", "port", fallback=3306))))
DB_USER = os.getenv("DB_USER", _config.get("database", "user", fallback="root"))
DB_PASSWORD = os.getenv("DB_PASSWORD", _config.get("database", "password", fallback=""))
DB_NAME = os.getenv("DB_NAME", _config.get("database", "database", fallback="test"))
logger = logging.getLogger("mariadb_api")
def get_db_connection():
"""Create a new MariaDB connection using global configuration.
Raises a 500 HTTPException if the connection fails.
"""
try:
conn = mariadb.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
)
return conn
except mariadb.Error as e:
logger.exception("Database connection error")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database connection error: {e}",
)
def validate_table_and_columns(table_name: str, columns: Optional[List[str]] = None) -> None:
"""Validate that table and optional columns exist in the database.
Uses INFORMATION_SCHEMA to guard against SQL injection on identifiers.
Raises HTTPException(400) if table/columns are invalid.
"""
conn = get_db_connection()
try:
cur = conn.cursor()
cur.execute(
"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s",
(DB_NAME, table_name),
)
if not cur.fetchone():
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unknown table")
if columns:
cur.execute(
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s",
(DB_NAME, table_name),
)
valid_cols = {row[0] for row in cur.fetchall()}
for col in columns:
if col not in valid_cols:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Unknown column: {col}")
finally:
conn.close()
def write_audit_log(
actor: str,
action: str,
table_name: str,
row_id: Optional[int] = None,
details: Optional[Dict[str, object]] = None,
) -> None:
"""Write an audit log entry into the `audit_log` table.
This is best-effort: on failure the error is logged but not propagated.
"""
try:
conn = get_db_connection()
cur = conn.cursor()
cur.execute(
"INSERT INTO audit_log (actor, action, table_name, row_id, details) VALUES (%s, %s, %s, %s, %s)",
(actor, action, table_name, row_id, str(details) if details is not None else None),
)
conn.commit()
except mariadb.Error:
logger.warning("Failed to write audit log for actor=%s action=%s table=%s", actor, action, table_name)
finally:
try:
conn.close()
except Exception:
pass