diff --git a/examples/bt_api_example.py b/examples/bt_api_example.py index c11a6b2..2603822 100644 --- a/examples/bt_api_example.py +++ b/examples/bt_api_example.py @@ -36,6 +36,9 @@ def main(args=None): + "]" ) + hw_info = shim_dev.get_device_hardware_version() + print(f"- hardware: [{hw_info.name}]") + shim_dev.add_stream_callback(stream_cb) shim_dev.start_streaming() diff --git a/pyshimmer/bluetooth/bt_api.py b/pyshimmer/bluetooth/bt_api.py index 719fea0..b73e170 100644 --- a/pyshimmer/bluetooth/bt_api.py +++ b/pyshimmer/bluetooth/bt_api.py @@ -63,6 +63,7 @@ ChannelDataType, EChannelType, ESensorGroup, + set_active_dtype_assignment, ) from pyshimmer.dev.exg import ExGRegister from pyshimmer.dev.fw_version import ( @@ -384,7 +385,9 @@ def initialize(self) -> None: """ self._thread.start() self._set_fw_capabilities() - + # Select the active channel dtype map for the detected hardware. + set_active_dtype_assignment(self._hw_version) + if self.capabilities.supports_ack_disable and self._disable_ack: self.set_status_ack(enabled=False) @@ -604,7 +607,8 @@ def get_inquiry(self) -> tuple[float, int, list[EChannelType]]: - The active data channels of the device as list, does not include the TIMESTAMP channel """ - return self._process_and_wait(InquiryCommand()) + cmd = InquiryCommand(self._hw_version) # pass HW so the command knows header length + return self._process_and_wait(cmd) def get_data_types(self): """Get the active data channels of the device diff --git a/pyshimmer/bluetooth/bt_commands.py b/pyshimmer/bluetooth/bt_commands.py index 52e8080..b7b9e61 100644 --- a/pyshimmer/bluetooth/bt_commands.py +++ b/pyshimmer/bluetooth/bt_commands.py @@ -400,8 +400,9 @@ class InquiryCommand(ResponseCommand): channels """ - def __init__(self): + def __init__(self, hw_version: HardwareVersion | None = None): super().__init__(INQUIRY_RESPONSE) + self._hw_version = hw_version @staticmethod def decode_channel_types(ct_bin: bytes) -> list[EChannelType]: @@ -412,9 +413,14 @@ def decode_channel_types(ct_bin: bytes) -> list[EChannelType]: def send(self, ser: BluetoothSerial) -> None: ser.write_command(INQUIRY_COMMAND) + def _is_shimmer3r(self) -> bool: + return self._hw_version == HardwareVersion.SHIMMER3R + + def receive(self, ser: BluetoothSerial) -> any: + fmt = " None: + """ + Mutate the active dtype map in-place to match the given hardware. + Using in-place mutation ensures modules that imported the dict earlier see the update. + """ + target = ChDataTypeAssignmentShimmer3R if hw == HardwareVersion.SHIMMER3R else ChDataTypeAssignmentShimmer3 + ChDataTypeAssignmentShimmer.clear() + ChDataTypeAssignmentShimmer.update(target) def get_enabled_channels(sensors: list[ESensorGroup]) -> list[EChannelType]: """Determine the set of data channels for a set of enabled sensors diff --git a/test/bluetooth/test_bt_commands.py b/test/bluetooth/test_bt_commands.py index 57b7df7..ce97a11 100644 --- a/test/bluetooth/test_bt_commands.py +++ b/test/bluetooth/test_bt_commands.py @@ -186,6 +186,32 @@ def test_inquiry_command(self): self.assertEqual(buf_size, 1) self.assertEqual(ctypes, [EChannelType.INTERNAL_ADC_A1]) + def test_inquiry_command_shimmer3r(self): + # INQUIRY response layout for 3R: + # 0x02 (resp code) + # sr_val = 0x0040 + # misc = 0x0901FF01 (LE bytes: 01 FF 01 09) + # + 3 extra bytes (skipped by parser) + # n_ch = 0x01 + # buf = 0x01 + # channels[0] = 0x12 (INTERNAL_ADC_A1) + resp_3r = ( + b"\x02" # INQUIRY_RESPONSE + b"\x40\x00" # sr_val (0x0040 = 512 Hz after dr2sr) + b"\x01\xff\x01\x09" # misc (LE) + b"\x00\x00\x00" # the 3 extra bytes for Shimmer3R + b"\x01" # n_ch + b"\x01" # buf_size + b"\x00" # channel id + ) + + cmd = InquiryCommand(HardwareVersion.SHIMMER3R) + sr, buf_size, ctypes = self.assert_cmd(cmd, b"\x01", b"\x02", resp_3r) + + self.assertEqual(sr, 512.0) + self.assertEqual(buf_size, 1) + self.assertEqual(ctypes, [EChannelType.ACCEL_LN_X]) # 0x01 = ACCEL_LN_X + def test_start_streaming_command(self): cmd = StartStreamingCommand() self.assert_cmd(cmd, b"\x07")