Skip to content

Commit 2b50f73

Browse files
committed
dotbot/dotbot_simulator: rework threading
1 parent 20d120f commit 2b50f73

2 files changed

Lines changed: 81 additions & 63 deletions

File tree

doc/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
("py:class", r"dotbot.models.Annotated"),
4646
("py:class", r"Query"),
4747
("py:class", r"PydanticUndefined"),
48+
("py:class", r"queue.Queue"),
4849
]
4950

5051
# -- Options for HTML output -------------------------------------------------

dotbot/dotbot_simulator.py

Lines changed: 80 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
"""Dotbot simulator for the DotBot project."""
88

9+
import queue
910
import random
1011
import threading
1112
import time
@@ -76,11 +77,10 @@ class InitStateToml(BaseModel):
7677
network: SimulatedNetworkSettings = SimulatedNetworkSettings()
7778

7879

79-
class DotBotSimulator(threading.Thread):
80+
class DotBotSimulator:
8081
"""Simulator class for the dotbot."""
8182

82-
def __init__(self, settings: SimulatedDotBotSettings):
83-
super().__init__(daemon=True)
83+
def __init__(self, settings: SimulatedDotBotSettings, tx_queue: queue.Queue):
8484
self.address = settings.address.lower()
8585
self.pos_x = settings.pos_x
8686
self.pos_y = settings.pos_y
@@ -97,10 +97,18 @@ def __init__(self, settings: SimulatedDotBotSettings):
9797
self.waypoints = []
9898
self.waypoint_index = 0
9999

100+
self._lock = threading.Lock()
101+
self.tx_queue = tx_queue
102+
self.queue = queue.Queue()
103+
self.advertise_thread = threading.Thread(target=self.advertise, daemon=True)
104+
self.rx_thread = threading.Thread(target=self.rx_frame, daemon=True)
105+
self.main_thread = threading.Thread(target=self.run, daemon=True)
100106
self.controller_mode: DotBotSimulatorMode = DotBotSimulatorMode.MANUAL
101107
self.logger = LOGGER.bind(context=__name__, address=self.address)
102-
self.running = True
103-
self.start()
108+
self._stop_event = threading.Event()
109+
self.rx_thread.start()
110+
self.advertise_thread.start()
111+
self.main_thread.start()
104112

105113
@property
106114
def header(self):
@@ -191,94 +199,103 @@ def update(self, dt: float):
191199

192200
def advertise(self):
193201
"""Send an advertisement message to the gateway."""
194-
payload = Frame(
195-
header=self.header,
196-
packet=Packet.from_payload(
197-
PayloadDotBotAdvertisement(
198-
calibrated=self.calibrated,
199-
direction=int(self.theta * 180 / pi + 90),
200-
pos_x=int(self.pos_x) if self.pos_x >= 0 else 0,
201-
pos_y=int(self.pos_y) if self.pos_y >= 0 else 0,
202-
pos_z=0,
203-
battery=battery_discharge_model(self.time_elapsed_s),
204-
)
205-
),
206-
)
207-
return payload
202+
while self._stop_event.is_set() is False:
203+
payload = Frame(
204+
header=self.header,
205+
packet=Packet.from_payload(
206+
PayloadDotBotAdvertisement(
207+
calibrated=self.calibrated,
208+
direction=int(self.theta * 180 / pi + 90),
209+
pos_x=int(self.pos_x) if self.pos_x >= 0 else 0,
210+
pos_y=int(self.pos_y) if self.pos_y >= 0 else 0,
211+
pos_z=0,
212+
battery=battery_discharge_model(self.time_elapsed_s),
213+
)
214+
),
215+
)
216+
self.tx_queue.put(payload)
217+
time.sleep(0.5)
208218

209-
def handle_frame(self, frame: Frame):
219+
def rx_frame(self):
210220
"""Decode the serial input received from the gateway."""
211221

212-
if self.address == hex(frame.header.destination)[2:]:
213-
if frame.payload_type == PayloadType.CMD_MOVE_RAW:
214-
self.controller_mode = DotBotSimulatorMode.MANUAL
215-
self.v_left = frame.packet.payload.left_y
216-
self.v_right = frame.packet.payload.right_y
217-
218-
if self.v_left > 127:
219-
self.v_left = self.v_left - 256
220-
if self.v_right > 127:
221-
self.v_right = self.v_right - 256
222-
223-
elif frame.payload_type == PayloadType.LH2_WAYPOINTS:
224-
self.v_left = 0
225-
self.v_right = 0
226-
self.controller_mode = DotBotSimulatorMode.MANUAL
227-
self.waypoint_threshold = frame.packet.payload.threshold
228-
self.waypoints = frame.packet.payload.waypoints
229-
if self.waypoints:
230-
self.controller_mode = DotBotSimulatorMode.AUTOMATIC
231-
232-
def stop(self):
233-
self.logger.info("Stopping DotBot simulator...")
234-
self.running = False
235-
self.join()
222+
while self._stop_event.is_set() is False:
223+
frame = self.queue.get()
224+
if frame is None:
225+
break
226+
with self._lock:
227+
if self.address == hex(frame.header.destination)[2:]:
228+
if frame.payload_type == PayloadType.CMD_MOVE_RAW:
229+
self.controller_mode = DotBotSimulatorMode.MANUAL
230+
self.v_left = frame.packet.payload.left_y
231+
self.v_right = frame.packet.payload.right_y
232+
233+
if self.v_left > 127:
234+
self.v_left = self.v_left - 256
235+
if self.v_right > 127:
236+
self.v_right = self.v_right - 256
237+
238+
elif frame.payload_type == PayloadType.LH2_WAYPOINTS:
239+
self.v_left = 0
240+
self.v_right = 0
241+
self.controller_mode = DotBotSimulatorMode.MANUAL
242+
self.waypoint_threshold = frame.packet.payload.threshold
243+
self.waypoints = frame.packet.payload.waypoints
244+
if self.waypoints:
245+
self.controller_mode = DotBotSimulatorMode.AUTOMATIC
236246

237247
def run(self):
238248
"""Update the state of the dotbot simulator."""
239-
while self.running is True:
240-
self.update(0.1)
249+
while self._stop_event.is_set() is False:
250+
with self._lock:
251+
self.update(0.1)
252+
253+
def stop(self):
254+
self.logger.info(f"Stopping DotBot {self.address} simulator...")
255+
self._stop_event.set()
256+
self.queue.put(None) # unblock the rx_thread if waiting on the queue
257+
self.advertise_thread.join()
258+
self.rx_thread.join()
259+
self.main_thread.join()
241260

242261

243-
class DotBotSimulatorCommunicationInterface(threading.Thread):
262+
class DotBotSimulatorCommunicationInterface:
244263
"""Bidirectional serial interface to control simulated robots"""
245264

246265
def __init__(self, on_frame_received: Callable, simulator_init_state_path: str):
266+
self.queue = queue.Queue()
247267
self.on_frame_received = on_frame_received
248-
self.running = True
249-
super().__init__(daemon=True)
268+
self._stp_event = threading.Event()
269+
self.main_thread = threading.Thread(target=self.run, daemon=True)
250270
init_state = InitStateToml(**toml.load(simulator_init_state_path))
251271
self.network_pdr = init_state.network.pdr
252272
self.dotbots = [
253273
DotBotSimulator(
254274
settings=dotbot_settings,
275+
tx_queue=self.queue,
255276
)
256277
for dotbot_settings in init_state.dotbots
257278
]
258279

259-
self.start()
280+
self.main_thread.start()
260281
self.logger = LOGGER.bind(context=__name__)
261282
self.logger.info("DotBot Simulation Started")
262283

263284
def run(self):
264285
"""Listen continuously at each byte received on the fake serial interface."""
265-
for dotbot in self.dotbots:
266-
self.on_frame_received(dotbot.advertise())
267-
time.sleep(0.1)
268-
269-
while self.running:
270-
for dotbot in self.dotbots:
271-
self.handle_dotbot_frame(dotbot.advertise())
272-
time.sleep(
273-
0.5 - PayloadDotBotAdvertisement().size * len(self.dotbots) * 0.000001
274-
)
286+
while self._stp_event.is_set() is False:
287+
frame = self.queue.get()
288+
if frame is None:
289+
break
290+
self.handle_dotbot_frame(frame)
275291

276292
def stop(self):
277293
self.logger.info("Stopping DotBot Simulation...")
278-
self.running = False
294+
self._stp_event.set()
295+
self.queue.put(None) # unblock the run thread if waiting on the queue
279296
for dotbot in self.dotbots:
280297
dotbot.stop()
281-
self.join()
298+
self.main_thread.join()
282299

283300
def flush(self):
284301
"""Flush fake serial output."""
@@ -304,4 +321,4 @@ def write(self, bytes_):
304321
f"Packet to DotBot {dotbot.address} lost in simulation"
305322
)
306323
continue
307-
dotbot.handle_frame(Frame.from_bytes(bytes_))
324+
dotbot.queue.put(Frame.from_bytes(bytes_))

0 commit comments

Comments
 (0)