Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 54 additions & 25 deletions src/contraqctor/qc/harp/treadmill.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class HarpTreadmillTestSuite(HarpDeviceTypeTestSuite):
Provides tests specific to the Treadmill device.

Attributes:
harp_device: The Harp Treadmill device to test.
data: The data from the periodic sensor events.
_harp_device: The Harp Treadmill device to test.
_data: The data from the periodic sensor events.

Examples:
```python
Expand All @@ -36,25 +36,26 @@ class HarpTreadmillTestSuite(HarpDeviceTypeTestSuite):
_WHOAMI = 1402

@override
def __init__(
self,
harp_device: HarpDevice,
):
def __init__(self, _harp_device: HarpDevice, *, adc_mid_tol_percent: float = 0.05, max_tick_jump: int = 410):
"""Initialize the Treadmill test suite.

Args:
harp_device: The Harp Treadmill device to test.
_harp_device: The Harp Treadmill device to test.
adc_mid_tol_percent: Tolerance percentage for the median torque value to be around the midpoint of the expected ADC range. Default is 10%.
max_tick_jump: Maximum allowed jump in encoder ticks between consecutive readings. Default is 410 ticks, which corresponds to 5% of a full revolution for an encoder with 8192 pulses per revolution (PPR).
"""
super().__init__(harp_device)
self.harp_device = harp_device
self.data: pd.DataFrame = self.harp_device["SensorData"].data.copy()
self.data = self.data[self.data["MessageType"] == "EVENT"]
super().__init__(_harp_device)
self._harp_device = _harp_device
self._data: pd.DataFrame = self._harp_device["SensorData"].data.copy()
self._data = self._data[self._data["MessageType"] == "EVENT"]
self._adc_mid_tol_percent = adc_mid_tol_percent
self._max_tick_jump = max_tick_jump

def test_sampling_rate(self):
"""Tests if the sampling rate of the treadmill is within nominal values"""
period = self.data.index.diff().dropna()
period = self._data.index.diff().dropna()
mean_period = np.mean(period)
fs: float = self.harp_device["SensorDataDispatchRate"].data.iloc[-1].values[0]
fs: float = self._harp_device["SensorDataDispatchRate"].data.iloc[-1].values[0]
if fs == 0:
return self.fail_test(0, "Sampling rate is zero")

Expand All @@ -69,35 +70,63 @@ def test_encoder(self):
"""Tests the quality of the treadmill signal by calculating total distance and sudden jumps."""
metrics = {}

d = self.data["Encoder"].diff().dropna()
d = self._data["Encoder"].diff().dropna()
# apply two's complement wrap for signed 32-bit
mask = 0xFFFFFFFF
d = d.astype(np.int64) & mask # force 32-bit space
d = d.astype(np.int64)

# reinterpret as signed 32-bit
d = np.where(d >= 0x80000000, d - 0x100000000, d)
metrics["total_ticks"] = np.sum(d)
metrics["max_jump"] = np.max(np.abs(d))
if metrics["total_ticks"] == 0:
return self.fail_test(metrics, "Total ticks is zero")
else:
return self.pass_test(metrics, f"Total ticks is {metrics['total_ticks']}")
return self.fail_test(
metrics, "Total ticks is zero, indicating the treadmill did not move during the session."
)
if metrics["max_jump"] > self._max_tick_jump:
return self.warn_test(
metrics,
f"Maximum jump between consecutive encoder readings is {metrics['max_jump']} ticks (expected a maximum of {self._max_tick_jump}), which is unusually high and may indicate signal corruption or missed readings.",
)
return self.pass_test(metrics, "All encoder metrics are within expected limits.")

def test_torque(self):
"""Tests if the torque signal was within nominal values."""
torque = self.data["Torque"].copy()
def test_torque_range(self):
"""Tests if the torque signal is within expected nominal ADC range (10-4000)"""
MIN, MAX = 95, 4000 # The ADC is 12-bit, but we add a small fudge factor on the edges.
MID = (MIN + MAX) / 2 # The ADC is expected to be around mid-scale when the treadmill is stationary
SOFT_MIN = MIN + (MAX - MIN) * 0.2 # 20% above the minimum
SOFT_MAX = MAX - (MAX - MIN) * 0.2 # 20% below the maximum

torque = self._data["Torque"].copy()
metrics = {}
metrics["min"] = torque.min()
metrics["max"] = torque.max()
metrics["mean"] = torque.mean()
metrics["median"] = torque.median()
metrics["std"] = torque.std()
if metrics["min"] < 10 or metrics["max"] > 4000:
return self.warn_test(metrics, "Torque signal out of expected nominal range (10-4000)")
return self.pass_test(metrics, "Torque signal within expected nominal range (10-4000)")
metrics["iqr"] = torque.quantile(0.75) - torque.quantile(0.25)

if metrics["min"] < MIN or metrics["max"] > MAX:
return self.fail_test(
metrics,
f"Torque signal out of expected nominal ADC range ({MIN} : {MAX}). This indicates the torque sensor was damaged during operation.",
)
if metrics["min"] < SOFT_MIN or metrics["max"] > SOFT_MAX:
return self.warn_test(
metrics,
f"Torque signal out of expected soft ADC range ({SOFT_MIN} : {SOFT_MAX}). This MAY indicate the torque sensor was damaged during operation or it is not properly installed/calibrated.",
)
if abs(metrics["median"] - MID) > (MID * self._adc_mid_tol_percent):
return self.warn_test(
metrics,
f"Torque signal median value {metrics['median']} is expected to be within {self._adc_mid_tol_percent * 100}% of mid-scale value: {MID}.",
)

return self.pass_test(metrics, "All metrics are within expected limits.")

def test_torque_limit_tripwire(self):
"""Tests if the torque limit tripwire was triggered."""
tripwire = self.harp_device["TorqueLimitState"].read()
tripwire = self._harp_device["TorqueLimitState"].read()
tripwire = tripwire[tripwire["MessageType"] == "EVENT"]
trips = tripwire["TorqueLimitState"] > 0
if n := trips.sum() == 0:
Expand Down
18 changes: 9 additions & 9 deletions tests/test_qc/harp/test_treadmill.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _create_default_registers(self):
time_index = np.linspace(0, 10, 1000)
np.random.seed(0)
encoder = np.cumsum(np.random.randint(1, 5, len(time_index)))
torque = np.random.uniform(100, 2000, len(time_index))
torque = np.random.uniform(1000, 3000, len(time_index))
sensor_df = pd.DataFrame(
{
"Encoder": encoder,
Expand Down Expand Up @@ -154,8 +154,8 @@ class TestHarpTreadmillTestSuite:
def test_init(self, mock_treadmill_device):
suite = HarpTreadmillTestSuite(mock_treadmill_device)
assert suite.harp_device == mock_treadmill_device
assert "Encoder" in suite.data.columns
assert "Torque" in suite.data.columns
assert "Encoder" in suite._data.columns
assert "Torque" in suite._data.columns

def test_sampling_rate(self, mock_treadmill_device, mock_treadmill_device_bad_rate):
suite = HarpTreadmillTestSuite(mock_treadmill_device)
Expand All @@ -172,24 +172,24 @@ def test_encoder(self, mock_treadmill_device, mock_treadmill_device_zero_ticks):
suite = HarpTreadmillTestSuite(mock_treadmill_device)
result = suite.test_encoder()
assert result.status == Status.PASSED
assert result.message is not None and "Total ticks is" in result.message
assert result.context is None or "total_ticks" in result.context
assert result.message is not None and "All encoder metrics" in result.message
assert "total_ticks" in result.result

suite = HarpTreadmillTestSuite(mock_treadmill_device_zero_ticks)
result = suite.test_encoder()
assert result.status == Status.FAILED
assert result.message is not None and "Total ticks is zero" in result.message

def test_torque(self, mock_treadmill_device, mock_treadmill_device_bad_torque):
def test_torque_range(self, mock_treadmill_device, mock_treadmill_device_bad_torque):
suite = HarpTreadmillTestSuite(mock_treadmill_device)
result = suite.test_torque()
result = suite.test_torque_range()
assert result.status == Status.PASSED
assert result.message is not None
assert result.result is not None

suite = HarpTreadmillTestSuite(mock_treadmill_device_bad_torque)
result = suite.test_torque()
assert result.status == Status.WARNING
result = suite.test_torque_range()
assert result.status == Status.FAILED
assert result.message is not None
assert result.result is not None

Expand Down
Loading