From 7e226f08b06340fcb7ca616188176560116aadef Mon Sep 17 00:00:00 2001 From: bruno-f-cruz <7049351+bruno-f-cruz@users.noreply.github.com> Date: Fri, 8 May 2026 10:03:02 -0700 Subject: [PATCH] Improve handling of treadmil limits --- src/contraqctor/qc/harp/treadmill.py | 79 +++++++++++++++++++--------- tests/test_qc/harp/test_treadmill.py | 18 +++---- 2 files changed, 63 insertions(+), 34 deletions(-) diff --git a/src/contraqctor/qc/harp/treadmill.py b/src/contraqctor/qc/harp/treadmill.py index 4d29762..52c0e8b 100644 --- a/src/contraqctor/qc/harp/treadmill.py +++ b/src/contraqctor/qc/harp/treadmill.py @@ -12,8 +12,8 @@ class HarpTreadmillTestSuite(HarpDeviceTypeTestSuite): Provides tests specific to the Treadmill device. Attributes: - harp_device: The Harp Treadmill device to test. - data: The data from the periodic sensor events. + _harp_device: The Harp Treadmill device to test. + _data: The data from the periodic sensor events. Examples: ```python @@ -36,25 +36,26 @@ class HarpTreadmillTestSuite(HarpDeviceTypeTestSuite): _WHOAMI = 1402 @override - def __init__( - self, - harp_device: HarpDevice, - ): + def __init__(self, _harp_device: HarpDevice, *, adc_mid_tol_percent: float = 0.05, max_tick_jump: int = 410): """Initialize the Treadmill test suite. Args: - harp_device: The Harp Treadmill device to test. + _harp_device: The Harp Treadmill device to test. + adc_mid_tol_percent: Tolerance percentage for the median torque value to be around the midpoint of the expected ADC range. Default is 10%. + max_tick_jump: Maximum allowed jump in encoder ticks between consecutive readings. Default is 410 ticks, which corresponds to 5% of a full revolution for an encoder with 8192 pulses per revolution (PPR). """ - super().__init__(harp_device) - self.harp_device = harp_device - self.data: pd.DataFrame = self.harp_device["SensorData"].data.copy() - self.data = self.data[self.data["MessageType"] == "EVENT"] + super().__init__(_harp_device) + self._harp_device = _harp_device + self._data: pd.DataFrame = self._harp_device["SensorData"].data.copy() + self._data = self._data[self._data["MessageType"] == "EVENT"] + self._adc_mid_tol_percent = adc_mid_tol_percent + self._max_tick_jump = max_tick_jump def test_sampling_rate(self): """Tests if the sampling rate of the treadmill is within nominal values""" - period = self.data.index.diff().dropna() + period = self._data.index.diff().dropna() mean_period = np.mean(period) - fs: float = self.harp_device["SensorDataDispatchRate"].data.iloc[-1].values[0] + fs: float = self._harp_device["SensorDataDispatchRate"].data.iloc[-1].values[0] if fs == 0: return self.fail_test(0, "Sampling rate is zero") @@ -69,35 +70,63 @@ def test_encoder(self): """Tests the quality of the treadmill signal by calculating total distance and sudden jumps.""" metrics = {} - d = self.data["Encoder"].diff().dropna() + d = self._data["Encoder"].diff().dropna() # apply two's complement wrap for signed 32-bit mask = 0xFFFFFFFF d = d.astype(np.int64) & mask # force 32-bit space - d = d.astype(np.int64) # reinterpret as signed 32-bit d = np.where(d >= 0x80000000, d - 0x100000000, d) metrics["total_ticks"] = np.sum(d) + metrics["max_jump"] = np.max(np.abs(d)) if metrics["total_ticks"] == 0: - return self.fail_test(metrics, "Total ticks is zero") - else: - return self.pass_test(metrics, f"Total ticks is {metrics['total_ticks']}") + return self.fail_test( + metrics, "Total ticks is zero, indicating the treadmill did not move during the session." + ) + if metrics["max_jump"] > self._max_tick_jump: + return self.warn_test( + metrics, + f"Maximum jump between consecutive encoder readings is {metrics['max_jump']} ticks (expected a maximum of {self._max_tick_jump}), which is unusually high and may indicate signal corruption or missed readings.", + ) + return self.pass_test(metrics, "All encoder metrics are within expected limits.") - def test_torque(self): - """Tests if the torque signal was within nominal values.""" - torque = self.data["Torque"].copy() + def test_torque_range(self): + """Tests if the torque signal is within expected nominal ADC range (10-4000)""" + MIN, MAX = 95, 4000 # The ADC is 12-bit, but we add a small fudge factor on the edges. + MID = (MIN + MAX) / 2 # The ADC is expected to be around mid-scale when the treadmill is stationary + SOFT_MIN = MIN + (MAX - MIN) * 0.2 # 20% above the minimum + SOFT_MAX = MAX - (MAX - MIN) * 0.2 # 20% below the maximum + + torque = self._data["Torque"].copy() metrics = {} metrics["min"] = torque.min() metrics["max"] = torque.max() metrics["mean"] = torque.mean() + metrics["median"] = torque.median() metrics["std"] = torque.std() - if metrics["min"] < 10 or metrics["max"] > 4000: - return self.warn_test(metrics, "Torque signal out of expected nominal range (10-4000)") - return self.pass_test(metrics, "Torque signal within expected nominal range (10-4000)") + metrics["iqr"] = torque.quantile(0.75) - torque.quantile(0.25) + + if metrics["min"] < MIN or metrics["max"] > MAX: + return self.fail_test( + metrics, + f"Torque signal out of expected nominal ADC range ({MIN} : {MAX}). This indicates the torque sensor was damaged during operation.", + ) + if metrics["min"] < SOFT_MIN or metrics["max"] > SOFT_MAX: + return self.warn_test( + metrics, + f"Torque signal out of expected soft ADC range ({SOFT_MIN} : {SOFT_MAX}). This MAY indicate the torque sensor was damaged during operation or it is not properly installed/calibrated.", + ) + if abs(metrics["median"] - MID) > (MID * self._adc_mid_tol_percent): + return self.warn_test( + metrics, + f"Torque signal median value {metrics['median']} is expected to be within {self._adc_mid_tol_percent * 100}% of mid-scale value: {MID}.", + ) + + return self.pass_test(metrics, "All metrics are within expected limits.") def test_torque_limit_tripwire(self): """Tests if the torque limit tripwire was triggered.""" - tripwire = self.harp_device["TorqueLimitState"].read() + tripwire = self._harp_device["TorqueLimitState"].read() tripwire = tripwire[tripwire["MessageType"] == "EVENT"] trips = tripwire["TorqueLimitState"] > 0 if n := trips.sum() == 0: diff --git a/tests/test_qc/harp/test_treadmill.py b/tests/test_qc/harp/test_treadmill.py index ab0a610..41d1551 100644 --- a/tests/test_qc/harp/test_treadmill.py +++ b/tests/test_qc/harp/test_treadmill.py @@ -51,7 +51,7 @@ def _create_default_registers(self): time_index = np.linspace(0, 10, 1000) np.random.seed(0) encoder = np.cumsum(np.random.randint(1, 5, len(time_index))) - torque = np.random.uniform(100, 2000, len(time_index)) + torque = np.random.uniform(1000, 3000, len(time_index)) sensor_df = pd.DataFrame( { "Encoder": encoder, @@ -154,8 +154,8 @@ class TestHarpTreadmillTestSuite: def test_init(self, mock_treadmill_device): suite = HarpTreadmillTestSuite(mock_treadmill_device) assert suite.harp_device == mock_treadmill_device - assert "Encoder" in suite.data.columns - assert "Torque" in suite.data.columns + assert "Encoder" in suite._data.columns + assert "Torque" in suite._data.columns def test_sampling_rate(self, mock_treadmill_device, mock_treadmill_device_bad_rate): suite = HarpTreadmillTestSuite(mock_treadmill_device) @@ -172,24 +172,24 @@ def test_encoder(self, mock_treadmill_device, mock_treadmill_device_zero_ticks): suite = HarpTreadmillTestSuite(mock_treadmill_device) result = suite.test_encoder() assert result.status == Status.PASSED - assert result.message is not None and "Total ticks is" in result.message - assert result.context is None or "total_ticks" in result.context + assert result.message is not None and "All encoder metrics" in result.message + assert "total_ticks" in result.result suite = HarpTreadmillTestSuite(mock_treadmill_device_zero_ticks) result = suite.test_encoder() assert result.status == Status.FAILED assert result.message is not None and "Total ticks is zero" in result.message - def test_torque(self, mock_treadmill_device, mock_treadmill_device_bad_torque): + def test_torque_range(self, mock_treadmill_device, mock_treadmill_device_bad_torque): suite = HarpTreadmillTestSuite(mock_treadmill_device) - result = suite.test_torque() + result = suite.test_torque_range() assert result.status == Status.PASSED assert result.message is not None assert result.result is not None suite = HarpTreadmillTestSuite(mock_treadmill_device_bad_torque) - result = suite.test_torque() - assert result.status == Status.WARNING + result = suite.test_torque_range() + assert result.status == Status.FAILED assert result.message is not None assert result.result is not None