-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrecognizer.py
More file actions
137 lines (112 loc) · 5.2 KB
/
recognizer.py
File metadata and controls
137 lines (112 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import csv
import os
import pickle
import queue
from datetime import datetime
import cv2
import face_recognition
import numpy as np
from logger import get_logger
logger = get_logger(__name__)
class AttendanceRecognizer:
def __init__(self, encodings_path: str = "data/encodings.pickle", tolerance: float = 0.5):
if not os.path.exists(encodings_path):
raise FileNotFoundError(f"Encodings file not found: {encodings_path}")
with open(encodings_path, "rb") as f:
self.known = pickle.load(f)
self.known_encodings = self.known["encodings"]
self.known_names = self.known["names"]
self.known_rollnos = self.known.get("rollnos", ["UNKNOWN"] * len(self.known_names))
self.attendance_csv = os.path.join("output", f"attendance_{datetime.now().strftime('%Y_%m_%d')}.csv")
self.tolerance = tolerance
# Ensure output directory exists
os.makedirs(os.path.dirname(self.attendance_csv), exist_ok=True)
# Initialize CSV with headers if new
if not os.path.exists(self.attendance_csv):
with open(self.attendance_csv, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["name", "roll_no", "timestamp"])
def is_already_marked_today(self, roll_no: str) -> bool:
"""Check if a student is already marked present today."""
if not os.path.exists(self.attendance_csv):
return False
today = datetime.now().strftime("%Y-%m-%d")
try:
with open(self.attendance_csv, "r", newline="") as f:
reader = csv.reader(f)
next(reader, None) # Skip header
for row in reader:
if len(row) >= 3 and row[1] == roll_no:
# Check if timestamp is from today
if row[2].startswith(today):
return True
except (FileNotFoundError, IndexError):
pass
return False
def mark_attendance(self, name: str, roll_no: str):
# Check if already marked today
if self.is_already_marked_today(roll_no):
# logger.info(f"{roll_no} - {name} already marked present today")
return False
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(self.attendance_csv, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow([name, roll_no, timestamp])
logger.info(f"Marked {roll_no} - {name} present")
return True
def face_recognition_worker(frame_queue, result_queue, known_encodings, known_names, known_rollnos, tolerance,
stop_event, scale_factor=0.5):
"""
Worker process function for face recognition.
Processes frames from frame_queue and puts results in result_queue.
scale_factor: Resize frame by this factor for faster processing (0.5 = half size)
"""
while not stop_event.is_set():
try:
# Get frame with timeout to allow checking stop_event
frame_data = frame_queue.get(timeout=0.1)
if frame_data is None: # Poison pill to stop worker
break
frame_id, rgb_frame, original_height, original_width = frame_data
# Downscale frame for faster processing
if scale_factor != 1.0:
small_frame = cv2.resize(rgb_frame, (0, 0), fx=scale_factor, fy=scale_factor)
else:
small_frame = rgb_frame
# Perform face recognition on smaller frame
boxes = face_recognition.face_locations(small_frame, model="hog")
encodings = face_recognition.face_encodings(small_frame, boxes)
recognition_results = []
for (box, encoding) in zip(boxes, encodings):
if len(known_encodings) > 0:
distances = face_recognition.face_distance(known_encodings, encoding)
best_idx = np.argmin(distances)
best_dist = distances[best_idx]
if best_dist <= tolerance:
name = known_names[best_idx]
roll_no = known_rollnos[best_idx]
else:
name = "Unknown"
roll_no = "UNKNOWN"
else:
name = "Unknown"
roll_no = "UNKNOWN"
best_dist = 1.0
# Scale box coordinates back to original frame size
if scale_factor != 1.0:
top, right, bottom, left = box
top = int(top / scale_factor)
right = int(right / scale_factor)
bottom = int(bottom / scale_factor)
left = int(left / scale_factor)
scaled_box = (top, right, bottom, left)
else:
scaled_box = box
recognition_results.append((scaled_box, name, roll_no, best_dist))
# Send results back
result_queue.put((frame_id, recognition_results))
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error in face recognition worker: {e}")
continue