-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
201 lines (171 loc) · 8.34 KB
/
main.py
File metadata and controls
201 lines (171 loc) · 8.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""
ERIC — Edge Robotics Innovation by Cosmos
================================================
NVIDIA Cosmos Cookoff 2026
Stack:
- Cosmos Reason 2 (vLLM) : vision + physical reasoning
- Piper via RealtimeTTS : streaming TTS, CPU only, zero VRAM
- Waveshare UGV Beast : tracked robot via serial UART → ESP32
- Gradio : dual camera GUI + mission control
- ROS2 Nav2 + SLAM Toolbox : autonomous path planning + online mapping
- D500 LiDAR : /scan for SLAM + reactive safety layer
- OAK-D Lite : stereo depth + YOLO person detection
Hardware:
- Jetson Orin Nano Super 8GB
- Waveshare UGV Beast (tracked, D500 LiDAR, OAK-D Lite)
- ~$750 CAD total cost
- Vancouver BC Canada
Usage:
source /opt/ros/humble/setup.bash
uv run main.py
# Then open http://JETSON_IP:7860
Enable Nav2 + LiDAR + OAK-D:
Set USE_NAV2=true, USE_LIDAR=true, USE_OAKD=true in .env
ROS2 init order (enforced below):
motors -> odom -> lidar -> slam -> oakd -> nav2
motors owns UART; odom needs motors router; lidar publishes /scan;
slam needs /odom + /scan; nav2 waits internally for slam_available().
"""
import atexit
import logging
import signal
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
log = logging.getLogger("eric")
# ── Graceful shutdown ─────────────────────────────────────────────────────────
# MUST be registered before init_nav2() is called so the executor is stopped
# cleanly before Python tears down daemon threads. Without this the C++ layer
# inside rclpy fires std::terminate() when pthread_exit is called mid-flight.
def _graceful_shutdown(signum=None, frame=None):
"""
Called on SIGINT, SIGTERM, or normal process exit via atexit.
Shutdown order is the reverse of init order.
"""
try:
from nav2 import shutdown as nav2_shutdown
nav2_shutdown()
except Exception as e:
log.debug(f"Nav2 shutdown skipped ({e})")
try:
from slam import shutdown_slam
shutdown_slam()
except Exception as e:
log.debug(f"SLAM shutdown skipped ({e})")
try:
from ros_core import shutdown as ros_shutdown
ros_shutdown()
except Exception as e:
log.debug(f"ROS2 core shutdown skipped ({e})")
atexit.register(_graceful_shutdown)
signal.signal(signal.SIGTERM, _graceful_shutdown)
# Leave SIGINT as default (KeyboardInterrupt) so Ctrl-C still works normally;
# atexit will fire _graceful_shutdown() on exit anyway.
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
log.info("ERIC starting — Edge Robotics Innovation by Cosmos")
from config import USE_NAV2, USE_LIDAR, USE_OAKD
# motors.py is always imported — it owns the UART port and must come first
# so that odom.py can subscribe to its router.
from motors import motors # noqa: F401 — side effect: opens serial port
# ── ROS2 stack — init order matters ──────────────────────────────────────
# odom, lidar, slam, nav2 all share the single ros_core node.
# Each module gracefully no-ops if ROS2 is not available.
if USE_NAV2:
log.info("Nav2 + SLAM enabled — initializing ROS2 stack...")
# 1. Odometry — subscribes to UART T=1001, publishes /odom + TF
from odom import init_odom, odom_available
init_odom()
if odom_available():
log.info("Odometry: /odom publisher active")
else:
log.warning("Odometry unavailable — SLAM localisation degraded")
# 2. LiDAR — publishes /scan (required for SLAM map building)
# Always init when Nav2 is on — SLAM cannot map without /scan.
log.info("LiDAR initializing for SLAM /scan topic...")
from lidar import init_lidar, lidar_available
init_lidar()
if lidar_available():
log.info("LiDAR: /scan publishing — safety monitor active")
else:
log.warning("LiDAR unavailable — SLAM map building disabled")
# 3. SLAM Toolbox — online async mapping from /scan + /odom
from slam import init_slam, slam_available, _slam_ok as slam_node_ok
init_slam()
if slam_node_ok:
log.info("SLAM Toolbox: node running — map building started")
else:
log.warning("SLAM unavailable — Nav2 will attempt direct motor control")
# 4. OAK-D — stereo depth + /oakd/depth publisher for Nav2 costmap.
# Init before Nav2 so the depth topic exists when Nav2 configures
# its costmap layers.
if USE_OAKD:
log.info("OAK-D Lite initializing (SLAM mode — /oakd/depth -> Nav2 costmap)...")
from oakd import (init_oakd, oakd_available,
set_yolo_callback, set_yolo_active, yolo_available)
init_oakd()
if oakd_available():
log.info("OAK-D depth perception active — publishing /oakd/depth")
_register_yolo_callback(set_yolo_callback, set_yolo_active, yolo_available)
else:
log.warning("OAK-D unavailable — no depth perception or /oakd/depth")
# 5. Nav2 — launch the full Nav2 stack, then connect action client
from ros_core import launch_nav2
launch_nav2()
from nav2 import init_nav2, nav2_available
init_nav2()
if nav2_available():
log.info("Nav2 ready — full autonomous navigation + SLAM enabled")
else:
log.warning("Nav2 unavailable — using direct motor control")
else:
# Non-ROS mode — LiDAR and OAK-D still provide reactive safety
if USE_LIDAR:
log.info("LiDAR enabled (safety only, no ROS2)...")
from lidar import init_lidar, lidar_available
init_lidar()
if lidar_available():
log.info("LiDAR safety monitor active")
else:
log.warning("LiDAR unavailable — no hardware safety layer")
if USE_OAKD:
log.info("OAK-D Lite enabled (depth safety only, no SLAM)...")
from oakd import (init_oakd, oakd_available,
set_yolo_callback, set_yolo_active, yolo_available)
init_oakd()
if oakd_available():
log.info("OAK-D depth perception active")
_register_yolo_callback(set_yolo_callback, set_yolo_active, yolo_available)
else:
log.warning("OAK-D unavailable — no depth perception")
# ── Cosmos connectivity test ──────────────────────────────────────────────
from cosmos import ask_cosmos
test = ask_cosmos("Say exactly: ERIC online and ready.", max_tokens=20)
log.info(f"Cosmos test: {test}")
# ── Launch Gradio GUI (blocking) ──────────────────────────────────────────
try:
from gui import launch
except ImportError as e:
import traceback
traceback.print_exc()
raise SystemExit(f"Failed to import gui.launch: {e}")
launch()
def _register_yolo_callback(set_yolo_callback, set_yolo_active, yolo_available):
"""Register the YOLO detection callback and activate Layer 2 detection."""
def _on_yolo_detection(label: str, dist_m: float,
bearing: str, bearing_deg: float):
log.info(
f"YOLO: {label} at {dist_m:.1f}m ({bearing}, {bearing_deg:+.1f} deg)"
)
# Forward to mission.py when ready:
# from mission import on_detection
# on_detection(label, dist_m, bearing, bearing_deg)
set_yolo_callback(_on_yolo_detection)
set_yolo_active(True)
if yolo_available():
log.info("OAK-D YOLO Layer 2 active — person/animal detection on Myriad X")
else:
log.warning("OAK-D YOLO unavailable — check blob file in models/")
if __name__ == "__main__":
main()