-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
128 lines (101 loc) · 3.85 KB
/
database.py
File metadata and controls
128 lines (101 loc) · 3.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# =============================================================
# database.py — All MongoDB read / write operations
#
# Collections:
# users → registered users + their embeddings
# logs → every verification event
# unknowns → snapshots of unrecognised faces
# =============================================================
import streamlit as st
from pymongo import MongoClient
from datetime import datetime, timedelta
import numpy as np
import config
# ── Persistent connection ─────────────────────────────────────
@st.cache_resource
def _get_db():
"""
Create the MongoDB connection exactly once per Streamlit session.
@st.cache_resource keeps it alive across all reruns — no
reconnection on every button click or page interaction.
"""
try:
client = MongoClient(
config.MONGO_URI,
serverSelectionTimeoutMS = 5000,
connectTimeoutMS = 5000,
)
client.admin.command("ping") # confirms connection is live
print("[DB] MongoDB connected.")
return client[config.DB_NAME]
except Exception:
st.error(
"**MongoDB is not running.**\n\n"
"Open a terminal as Administrator and run:\n"
"```\nnet start MongoDB\n```\n"
"Then refresh this page."
)
st.stop()
def _cols():
"""Return (users_col, logs_col, unknowns_col) from the cached DB."""
db = _get_db()
return db["users"], db["logs"], db["unknowns"]
# ── Users ──────────────────────────────────────────────────────
def save_user(set_id, name, embedding):
users_col, _, _ = _cols()
users_col.update_one(
{"set_id": set_id},
{"$set": {
"name": name,
"embedding": embedding.tolist(),
"registered_at": datetime.now()
}},
upsert=True
)
def load_all_users():
users_col, _, _ = _cols()
return [
{
"set_id": doc["set_id"],
"name": doc["name"],
"embedding": np.array(doc["embedding"])
}
for doc in users_col.find()
]
def user_exists(set_id):
users_col, _, _ = _cols()
return users_col.find_one({"set_id": set_id}) is not None
def delete_user(set_id):
users_col, _, _ = _cols()
return users_col.delete_one({"set_id": set_id}).deleted_count > 0
def count_users():
users_col, _, _ = _cols()
return users_col.count_documents({})
# ── Logs ───────────────────────────────────────────────────────
def save_log(identity, status, score, image_b64=None):
_, logs_col, _ = _cols()
logs_col.insert_one({
"identity": identity,
"status": status,
"score": round(float(score), 4),
"timestamp": datetime.now(),
"image_b64": image_b64
})
def get_recent_logs(limit=50):
_, logs_col, _ = _cols()
logs = list(logs_col.find().sort("timestamp", -1).limit(limit))
for log in logs:
log.pop("_id", None)
return logs
# ── Unknowns ───────────────────────────────────────────────────
def save_unknown(image_b64, source="webcam"):
_, _, unknowns_col = _cols()
unknowns_col.insert_one({
"image_b64": image_b64,
"source": source,
"timestamp": datetime.now()
})
def count_unknowns_in_window(minutes):
_, _, unknowns_col = _cols()
cutoff = datetime.now() - timedelta(minutes=minutes)
return unknowns_col.count_documents({"timestamp": {"$gte": cutoff}})