From b8141b78fbe58c4b2d0fda912f3961361f36895a Mon Sep 17 00:00:00 2001 From: Jamie Forth <j.forth@gold.ac.uk> Date: Mon, 7 Apr 2025 16:59:55 +0100 Subject: [PATCH] automatically look up ports given types and add lux and acc options --- src/bitalino/cli.py | 28 ++++++++++++---- src/bitalino/csv.py | 7 ++-- src/bitalino/device.py | 51 +++++++++++++++++++----------- src/bitalino/lsl.py | 8 +++-- src/bitalino/transfer_functions.py | 22 ++++++++++--- 5 files changed, 81 insertions(+), 35 deletions(-) diff --git a/src/bitalino/cli.py b/src/bitalino/cli.py index e0ae884..2023432 100644 --- a/src/bitalino/cli.py +++ b/src/bitalino/cli.py @@ -1,6 +1,19 @@ import argparse +class UniqueAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + if len(values) != len(set(values)): + unique = set() + duplicated = set() + for x in values: + if x not in unique: + unique.add(x) + else: + duplicated.add(x) + raise argparse.ArgumentError(self, f"Duplicate values: {duplicated}") + setattr(namespace, self.dest, values) + def base_parser(description): parser = argparse.ArgumentParser(description=description) parser.add_argument("device", help="Device MAC address.") @@ -12,19 +25,22 @@ def base_parser(description): choices=[10, 100, 1000], help="Sample rate (Hz).", ) - parser.add_argument( + + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( "-p", "--ports", nargs="+", - default=[1, 2, 3, 4], type=int, - help="Active ports", + action=UniqueAction, + help="Active ports: Raw data values.", ) - parser.add_argument( + group.add_argument( "-t", "--types", nargs="+", - choices=['eeg', 'eda', 'ecg', 'emg', 'pzt'], - help="Active port types.", + action=UniqueAction, + choices=["emg", "ecg", "eda", "eeg", "acc", "lux"], + help="Active ports: Transformed data values.", ) return parser diff --git a/src/bitalino/csv.py b/src/bitalino/csv.py index 01b31c3..6093068 100644 --- a/src/bitalino/csv.py +++ b/src/bitalino/csv.py @@ -67,13 +67,12 @@ def main(): # Device setup device = Bitalino(args.device) + device.set_ports(args.ports, args.types) device.dumpInfo() device.start( args.sample_rate, - args.ports, data_queue, stop_event, - port_types=args.types, duration=args.duration, ) device_thread = threading.Thread(target=device.loop) @@ -85,7 +84,9 @@ def main(): else: data_cols = [ p_type if p_type is not None else p - for p_type, p in itertools.zip_longest(args.types, args.ports) + for p_type, p in itertools.zip_longest( + device.port_types, device.active_ports + ) ] writer.writerow(["sample", *data_cols, "time_stamp"]) diff --git a/src/bitalino/device.py b/src/bitalino/device.py index 6a9c7ef..af126c1 100644 --- a/src/bitalino/device.py +++ b/src/bitalino/device.py @@ -5,13 +5,39 @@ import plux class Bitalino(plux.BITalinoDev): - start_time = None - sample_rate = None - duration = None - transfer_functions = None + port_types = { + "emg": 1, + "ecg": 2, + "eda": 3, + "eeg": 4, + "acc": 5, + "lux": 6, + } def __init__(self, address): plux.SignalsDev.__init__(f"BTH{address}") + self.start_time = None + self.sample_rate = None + self.duration = None + self.transfer_functions = None + + def set_ports(self, active_ports=None, port_types=None): + if active_ports is None: + active_ports = [self.port_types[p_type] for p_type in port_types] + if port_types is not None: + num_types = len(port_types) + num_ports = len(active_ports) + if num_types < num_ports: + port_types = port_types + [None] * (num_ports - num_types) + elif num_types > num_ports: + raise ValueError( + f"{num_types} port types provided but only {num_ports} ports." + ) + self.set_transfer_functions(port_types) + print("Active ports:", active_ports) + print("Port types:", port_types) + self.active_ports = active_ports + self.port_types = port_types def onRawFrame(self, nSeq, sample): time_stamp = self.get_clock_time() @@ -38,10 +64,8 @@ class Bitalino(plux.BITalinoDev): def start( self, sample_rate, - active_ports, data_queue, stop_event, - port_types=None, duration=None, ): self.sample_rate = sample_rate @@ -49,20 +73,9 @@ class Bitalino(plux.BITalinoDev): self.stop_event = stop_event self.duration = duration - if port_types is not None: - num_types = len(port_types) - num_ports = len(active_ports) - if num_types < num_ports: - port_types = port_types + [None] * (num_ports - num_types) - elif num_types > num_ports: - raise ValueError( - f"{num_types} port types provided but only {num_ports} ports." - ) - self.set_transfer_functions(port_types) - # Third argument is bit-depth but this is ignored by Bitalino devices - # as it is hardwired. Ports 1-4 are 10 bit, ports 5-6 are 6 bit. - super().start(sample_rate, active_ports, 0) + # as it is hardwired to 10-bits. + super().start(sample_rate, self.active_ports, 0) def set_transfer_functions(self, port_types): self.transfer_functions = [ diff --git a/src/bitalino/lsl.py b/src/bitalino/lsl.py index a97c467..b09f06f 100644 --- a/src/bitalino/lsl.py +++ b/src/bitalino/lsl.py @@ -123,7 +123,10 @@ def main(): args = parser.parse_args() - channel_count = len(args.ports) + if args.ports is not None: + channel_count = len(args.ports) + else: + channel_count = len(args.types) nominal_srate = args.sample_rate # Threading @@ -134,13 +137,12 @@ def main(): # Device setup device = BitalinoLSLDev(args.device) + device.set_ports(args.ports, args.types) device.dumpInfo() device.start( args.sample_rate, - args.ports, data_queue, stop_event, - port_types=args.types, duration=args.duration, ) device_thread = threading.Thread(target=device.loop) diff --git a/src/bitalino/transfer_functions.py b/src/bitalino/transfer_functions.py index 2626c8c..12e9080 100644 --- a/src/bitalino/transfer_functions.py +++ b/src/bitalino/transfer_functions.py @@ -21,14 +21,28 @@ def emg(signal, resolution=10): def eda(signal, resolution=10): """EDA value in microsiemens [0ðœ‡ð‘†, 25ðœ‡ð‘†]""" - return ((signal / 2 ** resolution) * VCC) / 0.132 + return ((signal / 2**resolution) * VCC) / 0.132 def eeg(signal, resolution=10): """EEG value in microvolts [-39.49ðœ‡ð‘‰, 39.49ðœ‡ð‘‰]""" - return (((signal / 2 ** resolution) - 0.5) * VCC) / 41782 * 1e6 + return (((signal / 2**resolution) - 0.5) * VCC) / 41782 * 1e6 + + +def acc(signal, resolution=10): + """ACC value in percents [-50%, 50%]""" + return pzt(signal, resolution) + + +def lux(signal, resolution=10): + """LUX value in unit norm range [0, 1]""" + return unit_norm(signal, resolution) + +def unit_norm(signal, resolution=10): + """Unit normalisation [0, 1]""" + return signal / 2**resolution def pzt(signal, resolution=10): - """EEG value in percents [-50%, 50%]""" - return ((signal / 2 ** resolution) - 0.5) * 100 + """Percents scale [-50%, 50%]""" + return (unit_norm(signal, resolution) - 0.5) * 100 -- GitLab