-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvisualize_data.py
More file actions
76 lines (66 loc) · 2.78 KB
/
visualize_data.py
File metadata and controls
76 lines (66 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import asyncio
import datetime
import json
from typing import Callable
from Madgwick.MadgwickPlotter import MadgwickRedisPlotter
from RedisPostman.models import LogMessage, Message
from RedisPostman.RedisWorker import AsyncRedisWorker
import numpy as np
from visualizer_options import options_names, options
from config import log_message_channel
async def visualize_imu(madgwick_plotter: MadgwickRedisPlotter, reader: Callable, channel: str, dataClass: type[Message]):
"""
Generalized function for plotting the data from IMU, all possible options listed in reader_options.py
"""
worker = AsyncRedisWorker()
async for message in worker.subscribe(block=1, count=100000, dataClass=dataClass, channel=channel):
if message is not None:
try:
result: dict[str, np.ndarray] = reader(message)
madgwick_plotter.update_plot_from_redis(acc=result["acc"], gyr=result["gyr"], mag=result["mag"], quaternion=result["quaternion"])
except Exception as e:
error_message = LogMessage(date=datetime.datetime.now(), process_name="visualize", status=LogMessage.exception_to_dict(e))
await worker.broker.publish(log_message_channel, json.dumps(error_message.to_dict()))
def print_dict(obj):
s: str = ""
if isinstance(obj, dict):
for k, v in obj.items():
if hasattr(v, '__iter__'):
s += k
s += '\n'
print_dict(v)
else:
s += '%s : \t%s\n' % (k, v)
elif isinstance(obj, list):
for v in obj:
if hasattr(v, '__iter__'):
print_dict(v)
else:
s += v
s += '\t'
else:
s += obj
return s
if __name__ == '__main__':
available_options = {name: i for i, name in enumerate(list(options_names))}
print("Choose an option:")
print(print_dict(available_options))
user_input = input()
option_name = list(options_names)[int(user_input)]
option = options[option_name]
print(f"You've chosen {option_name}")
assert isinstance(option["imu_name"], str)
assert callable(option["reader"])
assert isinstance(option["channel"], str)
assert isinstance(option["dataClass"], type)
assert issubclass(option["dataClass"], Message)
assert isinstance(option['to_draw_imu_data'], bool)
assert isinstance(option["to_draw_3d"], bool)
madgwick_state = MadgwickRedisPlotter(
to_draw_imu_data=bool(option['to_draw_imu_data']),
to_draw_3d=bool(option["to_draw_3d"]),
to_draw_magnetic=True,
window_size=20,
imu_n=option["imu_name"]
)
asyncio.run(visualize_imu(madgwick_plotter=madgwick_state, reader=option["reader"], channel=option["channel"], dataClass=option["dataClass"]))