diff --git a/docs/source/documentation/supported_hardware/supported_hardware_doc.md b/docs/source/documentation/supported_hardware/supported_hardware_doc.md index a03c5219..79635b38 100644 --- a/docs/source/documentation/supported_hardware/supported_hardware_doc.md +++ b/docs/source/documentation/supported_hardware/supported_hardware_doc.md @@ -23,21 +23,24 @@ By default, LibEMG supports several hardware devices (shown in Table 1). - The [**Delsys**](https://delsys.com/) is a commercially available system primarily used for medical applications due to its relatively high cost. - The [**SIFI Cuff**](https://sifilabs.com/) is a pre-released device that will soon be commercially available. Compared to the Myo armband, this device has a much higher sampling rate (~2000 Hz). - The [**Oymotion Cuff**](http://www.oymotion.com/en/product32/149) is a commercial device that samples EMG at 1000 Hz (8 bits) or 500 Hz (12 bits). +- The [**MindRove Armband**](https://mindrove.com/armband/) is a commercial device that samples 8-channel EMG at 500 Hz (24 bit) and IMU at 50 Hz. Data are streamed over a Wifi connection. +- The [**OTB Muovi+**](https://otbioelettronica.it/en/muoviplus/) is part of the OTBioelettronica hardware family and supports High Density EMG (HDEMG). - The [**MindRove Armband**](https://mindrove.com/armband/) is a commercial device that samples 8-channel EMG at 500 Hz (24 bit) and IMU at 50 Hz. Data are streamed over a Wifi connection. + If selecting EMG hardware for real-time use, wireless armbands that sample above 500 Hz are preferred. Additionally, future iterations of LibEMG will include Inertial Measurement Unit (IMU) support. As such, devices should have IMUs to enable more interaction opportunities. -|
Hardware
|
Function
|
Image
| -| ------------- | ------------- | ------------- | -| Myo Armband | `myo_streamer()` |
![](devices/Myo.png)
| -| Delsys | `delsys_streamer()` or `delsys_API_streamer()` |
![](devices/delsys_trigno.png)
| -| SIFI Cuff | `sifi_streamer()` |
![](devices/sifi_cuff.png)
| -| Oymotion | `oymotion_streamer()`|
![](devices/oymotion.png)
| -| MindRove Armband | `mindrove_streamer()`|
![](devices/mindrove.png)
| +|
Hardware
|
Function
|
Image
| +| ------------------------- | ---------------------------------------------- | --------------------------------------------------------------- | +| Myo Armband | `myo_streamer()` |
![](devices/Myo.png)
| +| Delsys | `delsys_streamer()` or `delsys_API_streamer()` |
![](devices/delsys_trigno.png)
| +| SIFI Cuff | `sifi_streamer()` |
![](devices/sifi_cuff.png)
| +| Oymotion | `oymotion_streamer()` |
![](devices/oymotion.png)
| +| MindRove Armband | `mindrove_streamer()` |
![](devices/mindrove.png)
| +| Muovi+ | `otb_muovi_plus_streamer()` |
![](devices/muovi+.png)
|

Table 1: The list of all implemented streamers.

diff --git a/libemg/_streamers/_OTB_MuoviPlus.py b/libemg/_streamers/_OTB_MuoviPlus.py index d715fa52..28fc772e 100644 --- a/libemg/_streamers/_OTB_MuoviPlus.py +++ b/libemg/_streamers/_OTB_MuoviPlus.py @@ -1,6 +1,12 @@ import socket import pickle import numpy as np +import signal +import atexit + +from multiprocessing import Event, Process +from libemg.shared_memory_manager import SharedMemoryManager +from crc import Crc8, Calculator """ OT Bioelettronica @@ -224,3 +230,178 @@ def start_stream(self): print("Worker Stopped.") device.stop() quit() + + +class PacketParser: + @staticmethod + def parse_raw(n, packet): + """ + Parse raw binary packet into signed 16-bit EMG values. + Assumes each packet consists of 64 samples per channel and 9 bytes of overhead. + """ + try: + int_data = np.frombuffer(packet, dtype='>i2').astype(np.int32) + int_data[int_data > 32767] -= 65536 + + if int_data.shape[0] < n * (292 // 2): + return None + + parsed_packets = [] + for i in range(n): + base = 2 * i * (64 + 9) + ch1_start = base + ch1_end = base + 64 + ch2_start = base + 64 + 9 + ch2_end = ch2_start + 64 + + ch1 = int_data[ch1_start:ch1_end] + ch2 = int_data[ch2_start:ch2_end] + + if len(ch1) == 64 and len(ch2) == 64: + parsed_packets.append(np.concatenate((ch1, ch2))) + + return parsed_packets + + except Exception as e: + print(f"[PacketParser Error] {e}") + return None + + +class OTBMuoviPlusEMGStreamer(Process): + """ + Handles EMG streaming from the OTB MuoviPlus device using a TCP socket. + Parses, filters, and stores data into shared memory for real-time access. + """ + + # Define the start and stop signals for the device + START_SIGNAL = [0b00000101, 0b01011011, 0b01001011] + STOP_TCP = [0b00000000] + + def __init__(self, ip: str, port: int, shared_memory_items: list, emg_channels=128): + super().__init__() + self.ip = ip + self.port = port + self.shared_memory_items = shared_memory_items + self.emg_channels = emg_channels + + self.client = None + self.stop_event = Event() + self.smm = SharedMemoryManager() + + # Graceful exit + atexit.register(self.cleanup) + signal.signal(signal.SIGINT, self._handle_exit_signal) + + + def run(self): + """Main loop that runs in its own process to stream EMG data.""" + + self._setup_shared_memory() + + try: + # Connect to the device and start streaming + self._connect() + self._send_packet(self.START_SIGNAL) + + while not self.stop_event.is_set(): + # Receive raw data packets from the device + raw = self.client.recv(292 * 8) + if not raw: + break + + # Parse the raw data into EMG packets + emg_packets = PacketParser.parse_raw(n=8, packet=raw) + + # Process the received EMG packets + if emg_packets and len(emg_packets) == 8: + emg_packets = [self._filter_channels(packet) for packet in emg_packets] + self._write_emg_data(emg_packets) + + except Exception as e: + print(f"[OTBStreamer Error] {e}") + finally: + self.cleanup() + + + def stop(self): + """Stop the streaming process.""" + self.stop_event.set() + self._send_packet(self.STOP_TCP) + self.join() + + + def _connect(self): + """Connect to the OTB MuoviPlus device.""" + self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.client.settimeout(5) + self.client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.client.connect((socket.gethostbyname(self.ip), self.port)) + print(f"[OTBStreamer] Connected to {self.ip}:{self.port}") + + + def _send_packet(self, sig_bits): + """Send a packet to the OTB MuoviPlus device.""" + if self.client: + packet = bytearray(sig_bits) + crc_calc = Calculator(Crc8.MAXIM_DOW) + packet.append(crc_calc.calculate_checksum(packet)) + self.client.send(packet) + + + def _disconnect(self): + """Disconnect from the OTB MuoviPlus device.""" + if self.client: + try: + self.client.shutdown(socket.SHUT_RDWR) + except: + pass + self.client.close() + self.client = None + print("[OTBStreamer] Disconnected") + + + def _filter_channels(self, packet): + """ + Filters out the unused 64 channels if only 64 channels are configured. + """ + if self.emg_channels == 128 or len(packet) != 128: + return packet + + first_half, second_half = packet[:64], packet[64:] + + if np.all(first_half == 0): + return second_half + elif np.all(second_half == 0): + return first_half + else: + return first_half + second_half + + + def _write_emg_data(self, packets): + """ + Write EMG data to shared memory, FIFO-style. + """ + emg_array = np.array(packets) + num_samples = emg_array.shape[0] + + def update_buffer(buffer): + return np.vstack((emg_array, buffer[:-num_samples])) + + self.smm.modify_variable('emg', update_buffer) + self.smm.modify_variable('emg_count', lambda x: x + num_samples) + + + def _setup_shared_memory(self): + """Initialize shared memory variables.""" + for item in self.shared_memory_items: + self.smm.create_variable(*item) + + + def cleanup(self): + self._disconnect() + self.smm.cleanup() + + + def _handle_exit_signal(self, signum, frame): + print(f"[OTBStreamer] Received exit signal {signum}, cleaning up.") + self.cleanup() diff --git a/libemg/_streamers/_myo_streamer.py b/libemg/_streamers/_myo_streamer.py index 0e8b1237..f69c9f8b 100644 --- a/libemg/_streamers/_myo_streamer.py +++ b/libemg/_streamers/_myo_streamer.py @@ -516,7 +516,7 @@ def on_battery(self, battery_level): import numpy as np from multiprocessing import Process, Event class MyoStreamer(Process): - def __init__(self, filtered, emg, imu, shared_memory_items=[]): + def __init__(self, filtered, emg, imu, shared_memory_items=[], addr=None): Process.__init__(self, daemon=True) self.filtered = filtered self.emg = emg @@ -524,6 +524,7 @@ def __init__(self, filtered, emg, imu, shared_memory_items=[]): self.smm = SharedMemoryManager() self.shared_memory_items = shared_memory_items self.signal = Event() + self.addr = addr def run(self): for item in self.shared_memory_items: @@ -533,7 +534,7 @@ def run(self): if not self.filtered: mode = emg_mode.RAW self.m = Myo(mode=mode) - self.m.connect() + self.m.connect(addr=self.addr) if self.emg: def write_emg(emg): diff --git a/libemg/discrete.py b/libemg/discrete.py new file mode 100644 index 00000000..a67456c7 --- /dev/null +++ b/libemg/discrete.py @@ -0,0 +1,99 @@ +import numpy as np +import torch.nn.functional as F +import torch +from libemg.feature_extractor import FeatureExtractor +from libemg.utils import get_windows +import pyautogui +import time +import statistics + +class DiscreteControl: + """ + The temporary discrete control class for interfacing the cross-user Myo model made available at: . + The model currently supports 5 gestures: Close, Flexion, Extension, Open, Pinch. + These gestures can be mapped to keyboard keys for controlling applications. + + Parameters + ---------- + odh: OnlineDataHandler + The online data handler object for streaming EMG data. + window_size: int + The window size (in samples) to use for splitting up each template. + increment: int + The increment size (in samples) for the sliding window. + model: torch.nn.Module + The trained PyTorch model for gesture classification. + buffer: int, optional + The size of the prediction buffer to use for mode filtering. Default is 1. + template_size: int, optional + The size of each EMG template (in samples). Default is 250 (1.5s for the Myo Armband). + min_template_size: int, optional + The minimum number of samples required before starting to make predictions (helps reduce the delay needed between subsequent gestures). Default is 100. + key_mapping: dict, optional + A dictionary mapping gesture names to keyboard keys. Default maps 'Close' to 'c', 'Flexion' to 'f', 'Extension' to 'e', 'Open' to 'o', and 'Pinch' to 'p'. + debug: bool, optional + If True, enables debug mode with additional print statements. Default is True. + """ + def __init__(self, odh, window_size, increment, model, buffer=5, template_size=250, min_template_size=150, key_mapping={'Close':'c', 'Flexion':'f', 'Extension':'e', 'Open':'o', 'Pinch':'p'}, debug=True): + self.odh = odh + self.window_size = window_size + self.increment = increment + self.buffer_size = buffer + self.model = model + self.template_size = template_size + self.min_template_size = min_template_size + self.key_mapping = key_mapping + self.debug = debug + + def run(self): + """ + Main loop for gesture detection. + Runs a sliding window over incoming EMG data and makes predictions based on the trained model. + """ + gesture_mapping = ['Nothing', 'Close', 'Flexion', 'Extension', 'Open', 'Pinch'] + expected_count = self.min_template_size + buffer = [] + + while True: + # Get and process EMG data + _, counts = self.odh.get_data(self.window_size) + if counts['emg'][0][0] >= expected_count: + data, counts = self.odh.get_data(self.template_size) + emg = data['emg'][::-1] + feats = self._get_features([emg], self.window_size, self.increment, None, None) + pred, _ = self._predict(feats[0]) + buffer.append(pred) + mode_pred = statistics.mode(buffer[-self.buffer_size:]) + if mode_pred != 0: + if self.debug: + print(str(time.time()) + ' ' + gesture_mapping[mode_pred]) + self._key_press(mode_pred, gesture_mapping) + self.odh.reset() + expected_count = self.min_template_size + buffer = [] + else: + expected_count += 10 + + def _key_press(self, pred, mapping): + if mapping[pred] in self.key_mapping: + pyautogui.press(self.key_mapping[mapping[pred]]) + + def _predict(self, gest, device='cpu'): + g_tensor = torch.tensor(np.expand_dims(np.array(gest, dtype=np.float32), axis=0), dtype=torch.float32).to(device) + with torch.no_grad(): + output = self.model.forward_once(g_tensor) + pred = output.argmax(dim=1).item() + prob = F.softmax(output, dim=1).max().item() + return pred, prob + + def _get_features(self, data, window_size, window_inc, feats, feat_dic): + fe = FeatureExtractor() + data = np.array([get_windows(d, window_size, window_inc) for d in data], dtype='object') + if feats is None: + return data + if feat_dic is not None: + feats = np.array([fe.extract_features(feats, d, array=True, feature_dic=feat_dic) for d in data], dtype='object') + else: + feats = np.array([fe.extract_features(feats, np.array(d, dtype='float'), array=True) for d in data], dtype='object') + feats = np.nan_to_num(feats, copy=True, nan=0, posinf=0, neginf=0) + return feats \ No newline at end of file diff --git a/libemg/emg_predictor.py b/libemg/emg_predictor.py index fd8c805e..d2cbdad4 100644 --- a/libemg/emg_predictor.py +++ b/libemg/emg_predictor.py @@ -981,13 +981,8 @@ def insert_classifier_output(data): insert_classifier_output) self.options['model_smm_writes'] += 1 - if self.output_format == "predictions": - message = str(prediction) + calculated_velocity + '\n' - elif self.output_format == "probabilities": - message = ' '.join([f'{i:.2f}' for i in probabilities[0]]) + calculated_velocity + " " + str(time_stamp) - else: - raise ValueError(f"Unexpected value for output_format. Accepted values are 'predictions' and 'probabilities'. Got: {self.output_format}.") - + message = str(prediction) + " " + str(np.abs(np.array(window['emg'])).mean(axis=2).mean()) + str(calculated_velocity) + if not self.tcp: self.sock.sendto(bytes(message, 'utf-8'), (self.ip, self.port)) else: @@ -1015,7 +1010,6 @@ def visualize(self, max_len=50, legend=None): cmap = cm.get_cmap('turbo', num_classes) controller = ClassifierController(output_format=self.output_format, num_classes=num_classes, ip=self.ip, port=self.port) - controller.start() if legend is not None: for i in range(num_classes): @@ -1202,7 +1196,6 @@ def visualize(self, max_len = 50, legend = False): ax.set_ylabel('Prediction') controller = RegressorController(ip=self.ip, port=self.port) - controller.start() # Wait for controller to start receiving data predictions = None diff --git a/libemg/streamers.py b/libemg/streamers.py index 69882d56..cdd69731 100644 --- a/libemg/streamers.py +++ b/libemg/streamers.py @@ -16,6 +16,7 @@ from libemg._streamers._sifi_bridge_streamer import SiFiBridgeStreamer from libemg._streamers._leap_streamer import LeapStreamer from libemg._streamers._mindrove import MindroveStreamer +from libemg._streamers._OTB_MuoviPlus import OTBMuoviPlusEMGStreamer def sifi_biopoint_streamer( name = "BioPoint_v1_3", @@ -248,7 +249,8 @@ def myo_streamer( shared_memory_items : list | None = None, emg : bool = True, imu : bool = False, - filtered : bool=True): + filtered : bool=True, + addr : list | None = None): """The streamer for the myo armband. This function connects to the Myo. It leverages the PyoMyo @@ -265,6 +267,9 @@ def myo_streamer( Specifies whether IMU data should be forwarded to shared memory. filtered : bool (optional), default=True If True, the data is the filtered data. Otherwise it is the raw unfiltered data. + addr : list (optional) + The MAC address of the Myo armband to connect to. Addr is the MAC address in format: [93, 41, 55, 245, 82, 194] + If None, it will connect to the first Myo it finds. Returns ---------- Object: streamer @@ -286,7 +291,7 @@ def myo_streamer( for item in shared_memory_items: item.append(Lock()) - myo = MyoStreamer(filtered, emg, imu, shared_memory_items) + myo = MyoStreamer(filtered, emg, imu, shared_memory_items, addr) myo.start() return myo, shared_memory_items @@ -679,3 +684,48 @@ def mindrove_streamer(shared_memory_items = None): m.start() return m, shared_memory_items +def otb_muovi_plus_streamer(shared_memory_items = None, + ip: str = '0.0.0.0', + port: int = 54321, + emg_channels: int = 128): + """The streamer for the OTB Muovi+ device. + + This function connects to the OTB Muovi+ device and streams its data over a network socket and access it via shared memory. + + Parameters + ---------- + shared_memory_items : list (optional) + Shared memory configuration parameters for the streamer in format: + ["tag", (size), datatype]. + + + ip : str (optional), default='0.0.0.0' + The IP address of the OTB Muovi+ device. Used to connect to the device. + port : int (optional), default=54321 + The port number to connect to the OTB Muovi+ device. + emg_channels : int (optional), default=128 + The number of EMG channels to stream from the OTB Muovi+ device. + This should match the number of channels configured on the device. + Returns + ---------- + Object: streamer + The OTB Muovi+ streamer object. + Object: shared memory + The shared memory object. + Examples + --------- + >>> streamer, shared_memory = otb_muovi_plus_streamer(ip='192.168.76.1', port=54320) + """ + if shared_memory_items is None: + shared_memory_items = [ + ["emg", (2000, emg_channels), np.double], + ["emg_count", (1,1), np.int32] + ] + + for item in shared_memory_items: + item.append(Lock()) + + muoviplus = OTBMuoviPlusEMGStreamer(ip=ip, port=port, shared_memory_items=shared_memory_items, emg_channels=emg_channels) + muoviplus.start() + + return muoviplus, shared_memory_items diff --git a/requirements.txt b/requirements.txt index 12b0c640..d799b59b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,3 +32,4 @@ h5py onedrivedownloader sifi-bridge-py mindrove +crc \ No newline at end of file diff --git a/setup.py b/setup.py index f5a6e728..6177503c 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,8 @@ "onedrivedownloader", "sifi-bridge-py", "pygame", - "mindrove" + "mindrove", + "crc" ], keywords=[ "emg",