-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path1_sync_nearest.py
More file actions
350 lines (279 loc) · 12.8 KB
/
1_sync_nearest.py
File metadata and controls
350 lines (279 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
"""
순차 매칭 (마스터-슬레이브)
원리:
1. 한 카메라를 기준(Master)으로 설정
2. Master 카메라에서 프레임을 하나 가져옴
3. 성공적으로 가져온 직후, 다른 카메라(Slave)에서 다음 프레임을 즉시 가져옴
4. 두 프레임의 타임스탬프 차이와 관계없이 동기화된 쌍으로 간주
5. 모든 Master 프레임은 프레임 손실 없이 Slave 프레임과 1:1로 매칭됨
장점:
- 프레임 손실이 없음 (Pass 없음)
- 구현이 간단하고 처리 속도가 빠름
단점:
- 동기화 정확도가 하드웨어 및 시스템 상태에 따라 유동적임
- 타임스탬프 차이가 임계값 기반 방식보다 클 수 있음
"""
import gc
import time
import settings
import PySpin
from collections import deque
from typing import Optional, List, Dict
# ============================================================
# 사용자 설정
# ============================================================
CAMERA_SERIALS = settings.CAMERA_SERIALS
MASTER_CAMERA_IDX = 0 # Master 카메라 인덱스 (0 또는 1)
NUM_SYNCED_IMAGES = 10 # 저장할 동기화된 이미지 수
BUFFER_SIZE = 30 # 각 카메라의 이미지 버퍼 크기 (이 코드에서는 직접 사용되지 않음)
SAVE_IMAGES = True
VERBOSE = False # 출력 최소화: 최종 통계만 출력
# ============================
def dbg(msg: str) -> None:
"""디버그 메시지 출력 (VERBOSE가 True일 때만)"""
if VERBOSE:
print(msg)
def set_enum(nm: PySpin.INodeMap, node_name: str, entry_name: str) -> bool:
"""열거형 노드 설정"""
enum_node = PySpin.CEnumerationPtr(nm.GetNode(node_name))
if not (PySpin.IsAvailable(enum_node) and PySpin.IsWritable(enum_node)):
return False
entry = enum_node.GetEntryByName(entry_name)
if not (PySpin.IsAvailable(entry) and PySpin.IsReadable(entry)):
return False
enum_node.SetIntValue(entry.GetValue())
return True
def configure_continuous_acquisition(cam: PySpin.CameraPtr, cam_serial: str) -> bool:
"""연속 촬영 모드로 카메라 설정"""
try:
nm = cam.GetNodeMap()
set_enum(nm, "TriggerMode", "Off")
set_enum(nm, "AcquisitionMode", "Continuous")
dbg(f"✅ [{cam_serial}] 연속 촬영 모드 설정 완료")
return True
except Exception as e:
dbg(f"❌ [{cam_serial}] 설정 오류: {e}")
return False
def sequential_matching(cameras: List[PySpin.CameraPtr],
camera_serials: List[str],
master_idx: int,
num_images: int,
save_images: bool) -> Dict:
"""
순차 매칭 (Master-Slave Sequential)
Master 프레임 획득 후 즉시 Slave 프레임을 획득하여 1:1로 매칭합니다.
Args:
cameras: 카메라 리스트
camera_serials: 시리얼 리스트
master_idx: Master 카메라 인덱스
num_images: 저장할 동기화 이미지 수
save_images: 저장 여부
Returns:
Dict: 성능 통계
"""
dbg(f"\n▶ Sequential Matching 동기화 시작...")
dbg(f" Master 카메라: {camera_serials[master_idx]}")
slave_idx = 1 - master_idx
stats = {
'method': 'Sequential Matching',
'synced_pairs': 0,
'total_frames': [0, 0],
'dropped_frames': [0, 0], # 불완전하거나 획득 실패한 프레임
'timestamp_diffs': [],
'processing_time': 0
}
start_time = time.time()
try:
# 카메라 초기화 및 촬영 시작
for cam, serial in zip(cameras, camera_serials):
cam.Init()
configure_continuous_acquisition(cam, serial)
cam.BeginAcquisition()
dbg(f"✅ [{serial}] 연속 촬영 시작")
processor = PySpin.ImageProcessor()
processor.SetColorProcessing(PySpin.SPINNAKER_COLOR_PROCESSING_ALGORITHM_HQ_LINEAR)
synced_count = 0
dbg(f"\n목표: {num_images}개의 동기화된 이미지 쌍\n")
# 카메라 간 타임스탬프 오프셋 계산용
first_master_ts: Optional[int] = None
timestamp_offset_ns: Optional[int] = None
while synced_count < num_images:
master_image = None
slave_image = None
try:
# ===== 1. Master에서 프레임 가져오기 =====
master_image = cameras[master_idx].GetNextImage(1000)
if master_image.IsIncomplete():
dbg(" ⚠️ Master 프레임 불완전, 건너뜀")
stats['dropped_frames'][master_idx] += 1
master_image.Release()
continue
stats['total_frames'][master_idx] += 1
master_timestamp = master_image.GetTimeStamp()
# ===== 2. Slave에서 다음 프레임 즉시 가져오기 =====
slave_image = cameras[slave_idx].GetNextImage(1000)
if slave_image.IsIncomplete():
dbg(" ⚠️ Slave 프레임 불완전, Master/Slave 모두 건너뜀")
stats['dropped_frames'][master_idx] += 1
stats['dropped_frames'][slave_idx] += 1
master_image.Release()
slave_image.Release()
continue
stats['total_frames'][slave_idx] += 1
slave_timestamp = slave_image.GetTimeStamp()
# ===== 3. 타임스탬프 및 통계 계산 =====
# 첫 프레임 쌍으로 오프셋 계산
if timestamp_offset_ns is None:
first_master_ts = master_timestamp
timestamp_offset_ns = slave_timestamp - master_timestamp
dbg(f" ⚙️ 타임스탬프 오프셋 설정: {timestamp_offset_ns / 1_000_000:.3f} ms")
# 오프셋 보정
aligned_slave_ts = slave_timestamp - timestamp_offset_ns
diff = abs(aligned_slave_ts - master_timestamp)
stats['timestamp_diffs'].append(diff)
synced_count += 1
dbg(f"✅ 동기화 성공 ({synced_count}/{num_images}) | "
f"TS 차이: {diff / 1_000_000:.3f} ms")
# ===== 4. 이미지 저장 (필요 시) =====
if save_images:
# Master 이미지 처리 및 저장
master_converted = processor.Convert(master_image, PySpin.PixelFormat_BGR8)
master_img_obj = PySpin.Image.Create(
master_converted.GetWidth(), master_converted.GetHeight(), 0, 0,
PySpin.PixelFormat_BGR8, master_converted.GetData()
)
master_filename = f"seq_sync_{camera_serials[master_idx]}_{synced_count-1:03d}.jpg"
master_img_obj.Save(master_filename)
dbg(f" 💾 저장: {master_filename}")
# Slave 이미지 처리 및 저장
slave_converted = processor.Convert(slave_image, PySpin.PixelFormat_BGR8)
slave_img_obj = PySpin.Image.Create(
slave_converted.GetWidth(), slave_converted.GetHeight(), 0, 0,
PySpin.PixelFormat_BGR8, slave_converted.GetData()
)
slave_filename = f"seq_sync_{camera_serials[slave_idx]}_{synced_count-1:03d}.jpg"
slave_img_obj.Save(slave_filename)
dbg(f" 💾 저장: {slave_filename}")
except PySpin.SpinnakerException as e:
dbg(f" ❌ 프레임 획득 중 오류: {e}")
# 오류 발생 시 해당 프레임 쌍은 건너뜀
stats['dropped_frames'][master_idx] += 1
stats['dropped_frames'][slave_idx] += 1
continue
finally:
if master_image:
master_image.Release()
if slave_image:
slave_image.Release()
stats['synced_pairs'] = synced_count
stats['processing_time'] = time.time() - start_time
return stats
except Exception as e:
dbg(f"❌ 심각한 오류: {e}")
import traceback
traceback.print_exc()
return stats
finally:
for cam, serial in zip(cameras, camera_serials):
try:
if cam.IsInitialized():
cam.EndAcquisition()
dbg(f" [{serial}] 촬영 종료")
cam.DeInit()
except Exception as e:
dbg(f" ❌ [{serial}] 정리 중 오류: {e}")
def print_statistics(stats: Dict):
"""통계 출력"""
if not stats or 'method' not in stats:
print("\n통계 정보가 없습니다.")
return
print("\n" + "="*60)
print(f"📊 {stats['method']} 성능 분석")
print("="*60)
print(f"\n✅ 동기화 결과:")
print(f" - 성공적으로 동기화된 이미지 쌍: {stats.get('synced_pairs', 0)}개")
print(f"\n📈 프레임 통계:")
total_frames_list = stats.get('total_frames', [0, 0])
dropped_frames_list = stats.get('dropped_frames', [0, 0])
for idx in range(len(total_frames_list)):
cam_type = "Master" if idx == MASTER_CAMERA_IDX else "Slave"
total = total_frames_list[idx]
dropped = dropped_frames_list[idx]
print(f" Camera {idx} ({cam_type}):")
print(f" 총 획득 시도: {total}장")
print(f" 드롭 (불완전/오류): {dropped}장")
if total > 0:
drop_rate = (dropped / total) * 100
print(f" 드롭률: {drop_rate:.1f}%")
timestamp_diffs = stats.get('timestamp_diffs', [])
if timestamp_diffs:
print(f"\n🎯 동기화 정확도 (타임스탬프 차이):")
avg_diff = sum(timestamp_diffs) / len(timestamp_diffs)
max_diff = max(timestamp_diffs)
min_diff = min(timestamp_diffs)
std_dev = (sum([(x - avg_diff)**2 for x in timestamp_diffs]) / len(timestamp_diffs))**0.5
print(f" - 평균 차이: {avg_diff / 1_000_000:.3f} ms")
print(f" - 최대 차이: {max_diff / 1_000_000:.3f} ms")
print(f" - 최소 차이: {min_diff / 1_000_000:.3f} ms")
print(f" - 표준편차: {std_dev / 1_000_000:.3f} ms")
processing_time = stats.get('processing_time', 0)
synced_pairs = stats.get('synced_pairs', 0)
if processing_time > 0:
print(f"\n⏱️ 처리 시간:")
print(f" - 총 처리 시간: {processing_time:.2f}초")
if synced_pairs > 0:
print(f" - 이미지 쌍당 평균 시간: {processing_time / synced_pairs:.2f}초")
print("="*60)
def main() -> None:
"""메인 함수"""
# Retrieve singleton reference to system object
system = PySpin.System.GetInstance()
# Retrieve list of cameras from the system
cam_list = system.GetCameras()
cnt = cam_list.GetSize()
dbg(f"📦 감지된 카메라: {cnt}개\n")
cameras = []
found_serials = []
stats = {}
try:
if cnt < len(CAMERA_SERIALS):
print(f"❌ {len(CAMERA_SERIALS)}대의 카메라가 필요하지만 {cnt}대만 감지되었습니다.")
return
dbg("🔎 카메라 검색 중...")
for serial in CAMERA_SERIALS:
cam = cam_list.GetBySerial(serial)
if cam is None:
print(f"❌ 시리얼 번호 {serial}에 해당하는 카메라를 찾을 수 없습니다.")
return
tl = cam.GetTLDeviceNodeMap()
model = PySpin.CStringPtr(tl.GetNode("DeviceModelName")).GetValue()
dbg(f" ✅ {model} (SN: {serial}) 발견")
cameras.append(cam)
found_serials.append(serial)
# 순차 매칭 실행
stats = sequential_matching(
cameras,
found_serials,
MASTER_CAMERA_IDX,
NUM_SYNCED_IMAGES,
SAVE_IMAGES
)
except Exception as e:
print(f"❌ 메인 함수 오류 발생: {e}")
import traceback
traceback.print_exc()
finally:
# 리소스 정리
cameras.clear()
found_serials.clear()
if 'cam_list' in locals() and cam_list.GetSize() > 0:
cam_list.Clear()
try:
system.ReleaseInstance()
except PySpin.SpinnakerException:
pass
gc.collect()
# 최종 통계 출력
print_statistics(stats)
if __name__ == "__main__":
main()