-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtello_sim_client.py
More file actions
263 lines (205 loc) · 9.58 KB
/
tello_sim_client.py
File metadata and controls
263 lines (205 loc) · 9.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
from dataclasses import dataclass
import logging
import subprocess
import platform
import sys
import time
import socket
import cv2
import os
import numpy as np
@dataclass
class BackgroundFrameRead():
frame: cv2.typing.MatLike
class TelloSimClient:
def __init__(self, host='localhost', port=9999, auto_start_simulation=True):
self.host = host
self.port = port
self.latest_frame = None
if auto_start_simulation and not self._check_simulation_running():
self._start_simulation()
print("[Wrapper] Starting Tello Simulation...")
self._wait_for_simulation()
def _start_simulation(self):
sim_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'tello_drone_sim.py'))
if platform.system() == "Windows":
command = f'start cmd /k python "{sim_path}"'
print("[DEBUG] Launching simulation command:", command)
subprocess.Popen(command, shell=True)
elif platform.system() == "Linux":
subprocess.Popen(['gnome-terminal', '--', 'python3', 'tello_drone_sim.py'])
elif platform.system() == "Darwin":
subprocess.Popen(['ls'])
subprocess.Popen(['pwd'])
python_path = os.path.join(os.path.dirname(sys.executable), 'python3')
print("Running python3 from path:", python_path)
subprocess.Popen([python_path, './tello_drone_sim.py'], cwd=os.getcwd(),
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
start_new_session=True)
else:
raise OSError("Unsupported OS for launching terminal simulation.")
def _check_simulation_running(self):
try:
with socket.create_connection((self.host, self.port), timeout=1):
return True
except (ConnectionRefusedError, socket.timeout, OSError) as ex:
logging.error("[Wrapper] Simulation is not running.", ex)
return False
def _wait_for_simulation(self, timeout=30):
print("[Wrapper] Waiting for simulation to become ready...")
start_time = time.time()
while time.time() - start_time < timeout:
if self._check_simulation_running():
print("[Wrapper] Simulation is now ready!")
return
time.sleep(1)
raise TimeoutError("[Error] Simulation did not become ready in time.")
def _send_command(self, command: str):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port))
s.send(command.encode())
except ConnectionRefusedError:
print(f"[Error] Unable to connect to the simulation at {self.host}:{self.port}")
def get_frame_read(self) -> BackgroundFrameRead:
"""Get the latest frame directly from the simulator over TCP."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port))
s.send(b'get_latest_frame')
# Receive frame size (4 bytes)
size_data = s.recv(4)
if len(size_data) != 4:
print("[Error] Failed to receive frame size")
return BackgroundFrameRead(frame=np.zeros([360, 640, 3], dtype=np.uint8))
frame_size = int.from_bytes(size_data, byteorder='big')
# If size is 0, no frame available
if frame_size == 0:
print("[Debug] No frame available from simulator")
return BackgroundFrameRead(frame=np.zeros([360, 640, 3], dtype=np.uint8))
# Receive the frame data
frame_data = b''
bytes_received = 0
while bytes_received < frame_size:
chunk = s.recv(min(4096, frame_size - bytes_received))
if not chunk:
break
frame_data += chunk
bytes_received += len(chunk)
# Decode the frame from PNG bytes
if len(frame_data) == frame_size:
nparr = np.frombuffer(frame_data, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is not None:
# Return frame in BGR format (OpenCV's native format)
# Users should convert to RGB if needed for display
return BackgroundFrameRead(frame=image)
print("[Error] Failed to decode frame data")
return BackgroundFrameRead(frame=np.zeros([360, 640, 3], dtype=np.uint8))
except ConnectionRefusedError:
print(f"[Error] Unable to connect to the simulation at {self.host}:{self.port}")
return BackgroundFrameRead(frame=np.zeros([360, 640, 3], dtype=np.uint8))
except Exception as e:
print(f"[Error] Failed to get frame: {e}")
return BackgroundFrameRead(frame=np.zeros([360, 640, 3], dtype=np.uint8))
def _request_data(self, command):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port))
s.send(command.encode())
return s.recv(4096).decode()
except ConnectionRefusedError:
print(f"[Error] Unable to retrieve '{command}' from {self.host}:{self.port}")
return "N/A"
def wait_until_motion_complete(self):
while self._request_data("get_is_moving") == "True":
time.sleep(0.1)
def get_battery(self):
return self._request_data('get_battery')
def get_distance_tof(self):
return self._request_data('get_distance_tof')
def get_height(self):
return self._request_data('get_height')
def get_flight_time(self):
return self._request_data('get_flight_time')
def get_speed_x(self):
return self._request_data('get_speed_x')
def get_speed_y(self):
return self._request_data('get_speed_y')
def get_speed_z(self):
return self._request_data('get_speed_z')
def get_acceleration_x(self):
return self._request_data('get_acceleration_x')
def get_acceleration_y(self):
return self._request_data('get_acceleration_y')
def get_acceleration_z(self):
return self._request_data('get_acceleration_z')
def get_pitch(self):
return self._request_data('get_pitch')
def get_roll(self):
return self._request_data('get_roll')
def get_yaw(self):
return self._request_data('get_yaw')
def query_attitude(self):
return self._request_data('query_attitude')
def get_current_state(self):
return self._request_data('get_current_state')
def connect(self):
self._send_command('connect')
def takeoff(self):
self._send_command('takeoff')
def land(self):
self._send_command('land')
def rotate_clockwise(self, degrees):
self._send_command(f'rotate_cw {degrees}')
def rotate_counter_clockwise(self, degrees):
self._send_command(f'rotate_ccw {degrees}')
def streamon(self):
self._send_command('streamon')
def streamoff(self):
self._send_command('streamoff')
def capture_frame(self):
self._send_command('capture_frame')
def emergency(self):
self._send_command('emergency')
def move_forward(self, distance):
self._send_command(f'forward {distance}')
def move_back(self, distance):
self._send_command(f'backward {distance}')
def move_left(self, distance):
self._send_command(f'left {distance}')
def move_right(self, distance):
self._send_command(f'right {distance}')
def move_up(self, distance):
self._send_command(f'up {distance}')
def move_down(self, distance):
self._send_command(f'down {distance}')
def flip_left(self):
self._send_command('flip_left')
def flip_right(self):
self._send_command('flip_right')
def flip_forward(self):
self._send_command('flip_forward')
def flip_back(self):
self._send_command('flip_back')
def go_xyz_speed(self, x, y, z, speed):
self._send_command(f"go {x} {y} {z} {speed}")
def curve_xyz_speed(self, x1, y1, z1, x2, y2, z2, speed):
self._send_command(f"curve {x1} {y1} {z1} {x2} {y2} {z2} {speed}")
def set_speed(self, speed):
self._send_command(f"set_speed {speed}")
def send_rc_control(self, left_right_velocity, forward_backward_velocity, up_down_velocity, yaw_velocity):
self._send_command(f"send_rc_control {left_right_velocity} {forward_backward_velocity} {up_down_velocity} {yaw_velocity}")
def end(self):
self._send_command('end')
def get_info(self):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port))
s.send(b'get_info')
return s.recv(4096).decode()
except ConnectionRefusedError:
print(f"[Error] Unable to retrieve info from {self.host}:{self.port}")
return "{}"
def initiate_throw_takeoff(self):
self._send_command('throw_takeoff')