From ce3782ac86cc478c933570ba800e78d6a764a966 Mon Sep 17 00:00:00 2001 From: Michiboi29 Date: Fri, 15 May 2026 16:27:44 -0400 Subject: [PATCH] Add Emager v3.0 streamer and update streamer interface for compatibility --- libemg/_streamers/_emager_streamer.py | 219 +------------- libemg/_streamers/_emagerv3_streamer.py | 366 ++++++++++++++++++++++++ libemg/streamers.py | 71 ++++- 3 files changed, 438 insertions(+), 218 deletions(-) create mode 100644 libemg/_streamers/_emagerv3_streamer.py diff --git a/libemg/_streamers/_emager_streamer.py b/libemg/_streamers/_emager_streamer.py index 0a14426c..58348daa 100644 --- a/libemg/_streamers/_emager_streamer.py +++ b/libemg/_streamers/_emager_streamer.py @@ -97,233 +97,34 @@ def close(self): self.ser.close() return -class Emager3: - """Reader for the new Emager3 device which sends framed payloads: - HDR 0xAA55, APP_PAYLOAD=8192 bytes (64x64 samples x 2 bytes), TLR 0x55AA. - The payload is interpreted as 4096 16-bit samples arranged as (time x channel) - when reshaped to (64,64) and transposed to (channel x time). We emit one - 64-channel sample vector to handlers per timepoint (64 vectors per frame). - """ - HDR = b"\xAA\x55" - TLR = b"\x55\xAA" - - def __init__(self, baud_rate, endianness='le', signed=False, com_name=None, vid_pid=(12259, 256), - channels: int = 64, samples_per_frame: int = 64, version: str = "3.0"): - self.com_name = com_name - self.vid_pid = vid_pid - ports = list(serial.tools.list_ports.comports()) - com_port = None - for p in ports: - if self.com_name is None: - if (p.vid, p.pid) == self.vid_pid: - if platform.system() == 'Windows': - com_port = p.name - else: - com_port = p.device.replace('cu', 'tty') - break - else: - if self.com_name in p.description: - if platform.system() == 'Windows': - com_port = p.name - else: - com_port = p.device.replace('cu', 'tty') - break - - if com_port is None: - print(f"Could not find serial port for {self.com_name}") - # include a helpful error that lists all detected serial ports - ports_info = [] - for p in ports: - dev = getattr(p, "device", None) or getattr(p, "name", None) or "" - desc = getattr(p, "description", "") or "" - vid = getattr(p, "vid", None) - pid = getattr(p, "pid", None) - ports_info.append(f"{dev} - {desc} (VID: {vid}, PID: {pid})") - - if ports_info: - avail = "\n".join(f" - {pi}" for pi in ports_info) - else: - avail = " (no serial ports found)" - - raise RuntimeError(f"Could not find serial port for {self.com_name}. Available ports:\n{avail}") - - self.ser = serial.Serial(com_port, baud_rate, timeout=1) - self.ser.close() - self._buf = bytearray() - self.emg_handlers = [] - - # dtype selection - if endianness == 'le': - self.sample_dtype = np.int16 if signed else np.uint16 - else: - self.sample_dtype = np.dtype('>i2') if signed else np.dtype('>u2') - - # framing params - self.channels = int(channels) - self.samples_per_frame = int(samples_per_frame) - self.expected_samples = self.channels * self.samples_per_frame - self.APP_PAYLOAD = self.expected_samples * 2 - self.FRAME_SIZE = 2 + self.APP_PAYLOAD + 2 - # frame and counter stats - self.frames_ok = 0 - self.bad_tlr = 0 - self.resyncs = 0 - self.last_ctr = None - self.ctr_miss = 0 - # parser position for incremental search - self.pos = 0 - - def connect(self): - self.ser.open() - - def add_emg_handler(self, closure): - self.emg_handlers.append(closure) - - def clear_buffer(self): - try: - self.ser.reset_input_buffer() - except Exception: - pass - - def close(self): - try: - self.ser.close() - except Exception: - pass - - def _process_frame_payload(self, payload_bytes): - # Decode payload into a full block (samples_per_frame x channels) and emit once per frame. - try: - arr = np.frombuffer(payload_bytes, dtype=self.sample_dtype) - if arr.size != self.expected_samples: - return - # payload is time-major: samples_per_frame x channels - block_time_ch = arr.reshape(self.samples_per_frame, self.channels) - - # Emit the whole block once to each handler (shape: samples x channels) - for h in self.emg_handlers: - try: - h(block_time_ch) - except Exception: - pass - except Exception: - return - - def get_data(self): - # Read what's available and append to buffer, then parse frames - try: - n_av = self.ser.in_waiting - except Exception: - return - - if n_av <= 0: - return - - data = self.ser.read(n_av) - if not data: - return - - self._buf += data - - # incremental frame parsing (mirrors FrameParser.feed logic) - while True: - h = self._buf.find(self.HDR, self.pos) - if h < 0: - keep = min(len(self._buf), self.FRAME_SIZE - 1) - if keep: - self._buf[:] = self._buf[-keep:] - else: - self._buf.clear() - self.pos = 0 - return - - if len(self._buf) - h < self.FRAME_SIZE: - if h > 0: - self._buf[:] = self._buf[h:] - self.pos = 0 - else: - self.pos = h - return - - t0 = h + 2 + self.APP_PAYLOAD - if self._buf[t0:t0+2] == self.TLR: - # good frame - self.frames_ok += 1 - p = h + 2 - # counter (first 4 bytes of payload, big-endian) - try: - ctr = ((self._buf[p] << 24) | - (self._buf[p+1] << 16) | - (self._buf[p+2] << 8) | - self._buf[p+3]) - if self.last_ctr is not None: - expected = (self.last_ctr + 1) & 0xFFFFFFFF - if ctr != expected: - self.ctr_miss += 1 - self.last_ctr = ctr - except Exception: - pass - - try: - payload_bytes = bytes(self._buf[p : p + self.APP_PAYLOAD]) - self._process_frame_payload(payload_bytes) - except Exception: - pass - - self.pos = h + self.FRAME_SIZE - if self.pos > (self.FRAME_SIZE * 2): - self._buf[:] = self._buf[self.pos:] - self.pos = 0 - else: - self.bad_tlr += 1 - self.resyncs += 1 - self.pos = h + 1 - if self.pos > (self.FRAME_SIZE * 2): - self._buf[:] = self._buf[self.pos:] - self.pos = 0 - class EmagerStreamer(Process): - def __init__(self, shared_memory_items, version: str = "v1", emager_kwargs: dict | None = None): + def __init__(self, shared_memory_items, version: str = "v1.0", emager_kwargs: dict | None = None): """ :param shared_memory_items: list[(name, shape, dtype, lock)] - :param version: str Emager version: 'v1.0', 'v1.1', 'v3.0' - :param emager_kwargs: dict passed to Emager/Emager3. Supported keys: - baud_rate (int, default 1500000 or 5000000 for v3), endianness ('le'), signed (bool), - com_name, vid_pid (tuple), channels (int), samples_per_frame (int) + :param version: str Emager version: 'v1.0' or 'v1.1' + :param emager_kwargs: dict passed to Emager. Supported keys: + baud_rate (int, default 1500000) """ super().__init__(daemon=True) self.smm = SharedMemoryManager() self.shared_memory_items = shared_memory_items self._stop_event = Event() self.e = None - + version = version.strip().lower().lstrip('v').replace('_', '.') if '.' not in version: version += '.0' - if version not in ['1.0', '1.1', '3.0']: - raise ValueError(f"Unsupported Emager version: {version}") + if version not in ['1.0', '1.1']: + raise ValueError(f"Unsupported Emager version: {version}. Use 'v1.0' or 'v1.1' (for v3, use emagerv3_streamer).") self.version = version self.emager_kwargs = emager_kwargs or {} def run(self): for item in self.shared_memory_items: self.smm.create_variable(*item) - - # Instantiate the appropriate Emager reader based on version and kwargs - if self.version == "3.0": - bw = self.emager_kwargs - baud = bw.get('baud_rate', 5000000) - endianness = bw.get('endianness', 'le') - signed = bw.get('signed', False) - com_name = bw.get('com_name', None) - vid_pid = bw.get('vid_pid', (12259, 256)) - channels = bw.get('channels', 64) - samples_per_frame = bw.get('samples_per_frame', 64) - self.e = Emager3(baud, endianness=endianness, signed=signed, com_name=com_name, vid_pid=vid_pid, - channels=channels, samples_per_frame=samples_per_frame, version=self.version) - else: - baud = self.emager_kwargs.get('baud_rate', 1500000) - self.e = Emager(baud, version=self.version) + + baud = self.emager_kwargs.get('baud_rate', 1500000) + self.e = Emager(baud, version=self.version) self.e.connect() # Create a queue and writer thread to offload shared-memory writes q: Queue = Queue(maxsize=100) diff --git a/libemg/_streamers/_emagerv3_streamer.py b/libemg/_streamers/_emagerv3_streamer.py new file mode 100644 index 00000000..113cc406 --- /dev/null +++ b/libemg/_streamers/_emagerv3_streamer.py @@ -0,0 +1,366 @@ +import platform +import time +import threading +from multiprocessing import Event, Process +from queue import Queue, Empty +from typing import Callable + +import numpy as np +import serial # pyserial +import serial.tools.list_ports + +from libemg.shared_memory_manager import SharedMemoryManager + + +# ============================================================ +# Emager3 (v3.0) 8192-byte frames with packed 12-bit EMG + IMU + counter +# ============================================================ + +class Emager3: + """ + Reader for Emager v3.0 frame format (8192 bytes total): + + [0] 0xAA + [1] 0x55 + [2..8065] EMG payload (8064 bytes) = 5376 packed 12-bit values + [8066..8173] IMU payload area (108 bytes max) + [8174] IMU sample count (expected 8 or 9) + [8175] Alignment IMU (ignored) + [8176..8179] Counter (big-endian uint32) <-- frame_id + [8180..8189] Empty (ignored) + [8190] 0x55 + [8191] 0xAA + """ + + HDR0, HDR1 = 0xAA, 0x55 + TLR0, TLR1 = 0x55, 0xAA + + FRAME_SIZE = 8192 + + EMG_START = 2 + EMG_LEN = 8064 + EMG_END = EMG_START + EMG_LEN # 8066 + + IMU_START = 8066 + IMU_AREA_LEN = 108 + IMU_NSAMPLES_I = 8174 + + CTR_START = 8176 + + TRAILER0_I = 8190 + TRAILER1_I = 8191 + + EMG_VALUES_PER_FRAME = 5376 + CHANNELS = 64 + SAMPLES_PER_CH_PER_FRAME = EMG_VALUES_PER_FRAME // CHANNELS # 84 + + IMU_AXES = 6 + IMU_BYTES_PER_SAMPLE = IMU_AXES * 2 # int16 per axis + + def __init__(self, baud_rate: int, com_name=None, vid_pid=(12259, 256)): + self.com_name = com_name + self.vid_pid = vid_pid + + ports = list(serial.tools.list_ports.comports()) + com_port = None + + for p in ports: + if self.com_name is None: + if (p.vid, p.pid) == self.vid_pid: + com_port = p.name if platform.system() == "Windows" else p.device.replace("cu", "tty") + break + else: + if self.com_name in (p.description or ""): + com_port = p.name if platform.system() == "Windows" else p.device.replace("cu", "tty") + break + + if com_port is None: + ports_info = [] + for p in ports: + dev = getattr(p, "device", None) or getattr(p, "name", None) or "" + desc = getattr(p, "description", "") or "" + vid = getattr(p, "vid", None) + pid = getattr(p, "pid", None) + ports_info.append(f"{dev} - {desc} (VID: {vid}, PID: {pid})") + avail = "\n".join(f" - {pi}" for pi in ports_info) if ports_info else " (no serial ports found)" + raise RuntimeError(f"Could not find serial port for Emager3. Available ports:\n{avail}") + + # non-blocking; we buffer ourselves + self.ser = serial.Serial(com_port, baud_rate, timeout=0) + self.ser.close() + + self._buf = bytearray() + self.pos = 0 + + # stats + self.frames_ok = 0 + self.bad_tlr = 0 + self.resyncs = 0 + self.last_ctr = None + self.ctr_miss = 0 + + # handlers + self.frame_handlers = [] + + # imu dtype + self.imu_dtype = np.dtype(">i2") # if self.imu_endianness == "be" else np.dtype(" np.ndarray: + """ + Emager3 (v3.0) 8192-byte frames with packed 12-bit EMG + IMU + counter + Unpacks the 8064-byte EMG payload into 5376 uint16 values. + """ + + b = np.frombuffer(packed, dtype=np.uint8).astype(np.uint16) + out = np.empty((2 * (len(b) // 3),), dtype=np.uint16) + out[0::2] = (b[0::3] << 4) | (b[1::3] >> 4) + out[1::2] = ((b[1::3] & 0x0F) << 8) | b[2::3] + return out[:n_values] + + def get_data(self) -> bool: + """ + Returns True if at least one full frame was parsed+emitted in this call. + """ + try: + n_av = self.ser.in_waiting + except Exception: + return False + if n_av <= 0: + return False + + data = self.ser.read(n_av) + if not data: + return False + + self._buf += data + + emitted_any = False + + while True: + h = self._buf.find(self._hdr, self.pos) + + if h < 0: + keep = min(len(self._buf), self.FRAME_SIZE - 1) + self._buf = self._buf[-keep:] if keep else bytearray() + self.pos = 0 + return emitted_any + + if len(self._buf) - h < self.FRAME_SIZE: + if h > 0: + self._buf = self._buf[h:] + self.pos = 0 + else: + self.pos = h + return emitted_any + + # validate trailer + t0 = h + self.TRAILER0_I + t1 = h + self.TRAILER1_I + if self._buf[t0] == self.TLR0 and self._buf[t1] == self.TLR1: + self.frames_ok += 1 + + # frame counter big-endian uint32 + c0 = self._buf[h + self.CTR_START + 0] + c1 = self._buf[h + self.CTR_START + 1] + c2 = self._buf[h + self.CTR_START + 2] + c3 = self._buf[h + self.CTR_START + 3] + frame_id = (c0 << 24) | (c1 << 16) | (c2 << 8) | c3 + + if self.last_ctr is not None: + expected = (self.last_ctr + 1) & 0xFFFFFFFF + if frame_id != expected: + self.ctr_miss += 1 + self.last_ctr = frame_id + + # IMU sample count + imu_nsamp = int(self._buf[h + self.IMU_NSAMPLES_I]) + imu_bytes_used = imu_nsamp * self.IMU_BYTES_PER_SAMPLE + + emg_bytes = bytes(self._buf[h + self.EMG_START: h + self.EMG_END]) + imu_bytes = bytes(self._buf[h + self.IMU_START: h + self.IMU_START + imu_bytes_used]) + + # decode EMG -> (84,64) uint16 + emg_vals = self._unpack_12bit_be(emg_bytes, n_values=self.EMG_VALUES_PER_FRAME) + emg_block = emg_vals.reshape(self.SAMPLES_PER_CH_PER_FRAME, self.CHANNELS) + + # decode IMU -> (N,6) int16 + imu_arr = np.frombuffer(imu_bytes, dtype=self.imu_dtype) + imu_block = imu_arr.reshape(imu_nsamp, self.IMU_AXES) + + self._emit_frame(frame_id, emg_block, imu_block) + emitted_any = True + + self.pos = h + self.FRAME_SIZE + if self.pos > (self.FRAME_SIZE * 2): + self._buf = self._buf[self.pos:] + self.pos = 0 + + else: + self.bad_tlr += 1 + self.resyncs += 1 + self.pos = h + 1 + if self.pos > (self.FRAME_SIZE * 2): + self._buf = self._buf[self.pos:] + self.pos = 0 + + +# ============================================================ +# Streamer process (fast path: parse -> enqueue ONE tuple; writer thread updates SMM) +# ============================================================ + +class EmagerV3Streamer(Process): + def __init__(self, shared_memory_items, emager_kwargs: dict | None = None): + super().__init__(daemon=True) + self.shared_memory_items = shared_memory_items + self._stop_event = Event() + self.e = None + self.emager_kwargs = emager_kwargs or {} + + # cache shapes for ring writes + self._shapes = {item[0]: item[1] for item in shared_memory_items if len(item) >= 2} + + # writer thread plumbing (created in run) + self._q = None + self._writer = None + + def run(self): + # Create shared memory manager IN CHILD PROCESS + self.smm = SharedMemoryManager() + + # Create shared memory variables + for item in self.shared_memory_items: + self.smm.create_variable(*item) + + # Device + bw = self.emager_kwargs + baud = int(bw.get("baud_rate", 3000000)) + com_name = bw.get("com_name", None) + vid_pid = bw.get("vid_pid", (12259, 256)) + + self.e = Emager3(baud_rate=baud, com_name=com_name, vid_pid=vid_pid) + self.e.connect() + self.e.clear_buffer() + + # Queue carries ONE bundled item per frame to prevent desync + self._q = Queue(maxsize=200) # 1 item/frame; bump if you want + + def buffer_write(tag: str, data: np.ndarray) -> None: + """ + Prepend `data` (N,D) to the shared memory buffer `tag` (H,D), + keeping buffer size fixed, and increment `{tag}_count` by N. + """ + if data is None: + return + + if data.ndim == 1: + data = data.reshape(1, -1) + if data.ndim != 2: + return + + count_tag = f"{tag}_count" + + def add_to_buffer(buffer, new=data): + # prepend new rows + new_buffer = np.vstack((new[::-1], buffer)) + # keep buffer size fixed + return new_buffer[:buffer.shape[0], :] + + # write data + self.smm.modify_variable(tag, add_to_buffer) + + # increment count by number of rows written + nb_row = data.shape[0] + self.smm.modify_variable(count_tag, lambda x, r=nb_row: x + r) + + def writer_thread_fn(): + while not self._stop_event.is_set(): + try: + frame_id, emg_block, imu_block = self._q.get(timeout=0.1) + except Empty: + continue + try: + # EMG: store as uint16 (your shared memory is now uint16) + emg = np.asarray(emg_block, dtype=np.uint16) + + # IMU: int16 + imu = np.asarray(imu_block, dtype=np.int16) + + # Sample ID per EMG row: (frame_id * CHANNELS) + [0..N-1] + base = int(frame_id) * Emager3.SAMPLES_PER_CH_PER_FRAME + sample_id = (base + np.arange(emg.shape[0], dtype=np.int64)).reshape(-1, 1) + + # Write all three (still separate locks per tag, but all-or-nothing per frame at queue level) + buffer_write("emg", emg) + buffer_write("imu", imu) + buffer_write("sample_id", sample_id) + + except Exception: + pass + finally: + self._q.task_done() + + self._writer = threading.Thread(target=writer_thread_fn, daemon=True) + self._writer.start() + + # Frame handler: decode is already done in Emager3; this just enqueues ONE item + self.drop_count = 0 + def on_frame(frame_id, emg_block, imu_block): + try: + self._q.put_nowait((int(frame_id), emg_block, imu_block)) + except Exception: + self.drop_count += 1 + if self.drop_count % 10 == 0: + print("DROPPED frames:", self.drop_count, "qsize:", self._q.qsize()) + + self.e.add_frame_handler(on_frame) + + # Main streaming loop (avoid busy spin) + try: + while not self._stop_event.is_set(): + did = self.e.get_data() + if not did: + time.sleep(0.001) # 1 ms backoff when no complete frame parsed + finally: + self._cleanup() + + def stop(self): + self._stop_event.set() + self.join() + + def _cleanup(self): + try: + if self.e is not None: + self.e.close() + except Exception: + pass + try: + if hasattr(self, "smm") and self.smm is not None: + self.smm.cleanup() + except Exception: + pass diff --git a/libemg/streamers.py b/libemg/streamers.py index c2cc49e8..4d4a94f1 100644 --- a/libemg/streamers.py +++ b/libemg/streamers.py @@ -13,6 +13,7 @@ else: from libemg._streamers._oymotion_streamer import OyMotionStreamer from libemg._streamers._emager_streamer import EmagerStreamer +from libemg._streamers._emagerv3_streamer import EmagerV3Streamer from libemg._streamers._sifi_bridge_streamer import SiFiBridgeStreamer from libemg._streamers._leap_streamer import LeapStreamer from libemg._streamers._mindrove import MindroveStreamer @@ -475,20 +476,21 @@ def oymotion_streamer(shared_memory_items : list | None = None, def emager_streamer(shared_memory_items = None, version:str = "v1.0", **kwargs): - """The streamer for the emager armband. + """The streamer for the emager armband (v1.0 / v1.1). - This function connects to the emager cuff and streams its data over a serial port and access it via shared memory. + Connects to the emager cuff over a serial port and exposes EMG samples + via shared memory. For EMaGer v3 hardware, use :func:`emagerv3_streamer` + instead — its frame format and shared-memory layout differ. Parameters ---------- shared_memory_items : list (optional) Shared memory configuration parameters for the streamer in format: ["tag", (size), datatype]. - version: str Emager version: 'v1.0', 'v1.1', 'v3.0'. Default is 'v1.0'. - emager_kwargs: dict passed to Emager/Emager3. Supported keys: - baud_rate (int, default 1500000), endianness ('le'), signed (bool), - com_name, vid_pid (tuple), channels (int), samples_per_frame (int) - + version: str Emager version: 'v1.0' or 'v1.1'. Default is 'v1.0'. + emager_kwargs: dict passed to Emager. Supported keys: + baud_rate (int, default 1500000) + Returns ---------- Object: streamer @@ -509,11 +511,62 @@ def emager_streamer(shared_memory_items = None, version:str = "v1.0", **kwargs): if len(item) == 3: item.append(Lock()) - # Use unified EmagerStreamer and pass emager version + kwargs ema = EmagerStreamer(shared_memory_items, version=version, emager_kwargs=kwargs) ema.start() return ema, shared_memory_items + +def emagerv3_streamer(shared_memory_items=None, **kwargs): + """The streamer for the emager armband (v3). + + Connects to the EMaGer v3 cuff (8192-byte framed protocol with packed + 12-bit EMG, IMU, and a frame counter) over a serial port and exposes + decoded samples via shared memory. For v1.0/v1.1 hardware, use + :func:`emager_streamer` instead. + + Parameters + ---------- + shared_memory_items : list (optional) + Shared memory configuration parameters for the streamer in format: + ["tag", (size), datatype]. Defaults expose the modalities below. + emager_kwargs: dict passed to Emager3. Supported keys: + baud_rate (int, default 3000000), com_name (str), + vid_pid (tuple, default (12259, 256)), debug (bool). + + The default shared memory exposes: + - 'emg' : (2000, 64) uint16 rolling buffer (rows = EMG samples) + - 'imu' : (2000, 6) int16 rolling buffer (rows = IMU samples) + - 'sample_id' : (2000, 1) int64 aligned row-for-row with EMG samples + - 'emg_count', 'imu_count', 'sample_id_count' as (1, 1) int64 + + Returns + ---------- + Object: streamer + The emager streamer object. + Object: shared memory + The shared memory object. + Examples + --------- + >>> streamer, shared_memory = emagerv3_streamer() + """ + if shared_memory_items is None: + shared_memory_items = [] + shared_memory_items.append(['emg', (2000, 64), np.uint16]) + shared_memory_items.append(['emg_count', (1, 1), np.int64]) + shared_memory_items.append(['imu', (2000, 6), np.int16]) + shared_memory_items.append(['imu_count', (1, 1), np.int64]) + shared_memory_items.append(['sample_id', (2000, 1), np.int64]) + shared_memory_items.append(['sample_id_count', (1, 1), np.int64]) + + for item in shared_memory_items: + if len(item) == 3: + item.append(Lock()) + + ema = EmagerV3Streamer(shared_memory_items, emager_kwargs=kwargs) + ema.start() + return ema, shared_memory_items + + #TODO: Update docs def leap_streamer(shared_memory_items : list | None =None, arm_basis : bool = True, @@ -736,4 +789,4 @@ def otb_muovi_plus_streamer(shared_memory_items = None, muoviplus = OTBMuoviPlusEMGStreamer(ip=ip, port=port, shared_memory_items=shared_memory_items, emg_channels=emg_channels) muoviplus.start() - return muoviplus, shared_memory_items + return muoviplus, shared_memory_items \ No newline at end of file