diff --git a/lib/bme280/bme280/device.py b/lib/bme280/bme280/device.py index 199b6bab..bd253fef 100644 --- a/lib/bme280/bme280/device.py +++ b/lib/bme280/bme280/device.py @@ -6,6 +6,9 @@ BME280_I2C_DEFAULT_ADDR, CALIB_H_SIZE, CALIB_TP_SIZE, + DATA_BLOCK_SIZE, + MODE_FORCED, + MODE_MASK, MODE_SLEEP, OSRS_P_SHIFT, OSRS_T_SHIFT, @@ -15,11 +18,13 @@ REG_CHIP_ID, REG_CTRL_HUM, REG_CTRL_MEAS, + REG_DATA_START, REG_SOFT_RESET, REG_STATUS, RESET_DELAY_MS, SOFT_RESET_CMD, STATUS_IM_UPDATE, + STATUS_MEASURING, ) from bme280.exceptions import BME280Error, BME280InvalidDevice, BME280NotFound @@ -51,10 +56,6 @@ def _write_reg(self, reg, value): """Write a single byte to register.""" self.i2c.writeto_mem(self.address, reg, bytes([value])) - # -------------------------------------------------- - # Device identification and initialization - # -------------------------------------------------- - # -------------------------------------------------- # Calibration data # -------------------------------------------------- @@ -137,3 +138,145 @@ def reset(self): self.soft_reset() self._read_calibration() self._configure_default() + + # -------------------------------------------------- + # Status + # -------------------------------------------------- + + def _status(self): + """Return the raw STATUS register value.""" + return self._read_reg(REG_STATUS) + + def data_ready(self): + """Return True when all measurements are available.""" + return not (self._status() & STATUS_MEASURING) + + def temperature_ready(self): + """Return True when temperature data is available.""" + return self.data_ready() + + def pressure_ready(self): + """Return True when pressure data is available.""" + return self.data_ready() + + def humidity_ready(self): + """Return True when humidity data is available.""" + return self.data_ready() + + # -------------------------------------------------- + # Forced measurement trigger + # -------------------------------------------------- + + def trigger_one_shot(self): + """Trigger a single forced measurement. Poll data_ready() for completion.""" + ctrl = self._read_reg(REG_CTRL_MEAS) + self._write_reg(REG_CTRL_MEAS, (ctrl & ~MODE_MASK) | MODE_FORCED) + + def _wait_measurement(self, timeout_ms=100): + """Wait for measurement to complete. Raises on timeout.""" + for _ in range(timeout_ms // 5): + if self.data_ready(): + return + sleep_ms(5) + raise BME280Error("BME280 measurement timeout") + + # -------------------------------------------------- + # Raw data burst read + # -------------------------------------------------- + + def _read_raw(self): + """Read raw ADC values via burst read (0xF7-0xFE, 8 bytes). + + Returns (raw_temp, raw_press, raw_hum) as 20-bit/20-bit/16-bit integers. + """ + data = self._read_block(REG_DATA_START, DATA_BLOCK_SIZE) + raw_press = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4) + raw_temp = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4) + raw_hum = (data[6] << 8) | data[7] + return raw_temp, raw_press, raw_hum + + # -------------------------------------------------- + # Compensation formulas (BME280 datasheet section 4.2.3) + # -------------------------------------------------- + + def _compensate_temperature(self, raw_temp): + """Compute compensated temperature in hundredths of °C. Updates t_fine.""" + var1 = (((raw_temp >> 3) - (self.dig_T1 << 1)) * self.dig_T2) >> 11 + var2 = ( + (((raw_temp >> 4) - self.dig_T1) * ((raw_temp >> 4) - self.dig_T1)) >> 12 + ) * self.dig_T3 >> 14 + self.t_fine = var1 + var2 + return (self.t_fine * 5 + 128) >> 8 + + def _compensate_pressure(self, raw_press): + """Compute compensated pressure in Pa (Q24.8 fixed point).""" + var1 = self.t_fine - 128000 + var2 = var1 * var1 * self.dig_P6 + var2 = var2 + ((var1 * self.dig_P5) << 17) + var2 = var2 + (self.dig_P4 << 35) + var1 = ((var1 * var1 * self.dig_P3) >> 8) + ((var1 * self.dig_P2) << 12) + var1 = ((1 << 47) + var1) * self.dig_P1 >> 33 + if var1 == 0: + return 0 + p = 1048576 - raw_press + p = (((p << 31) - var2) * 3125) // var1 + var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25 + var2 = (self.dig_P8 * p) >> 19 + return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4) + + def _compensate_humidity(self, raw_hum): + """Compute compensated humidity in Q22.10 fixed point.""" + h = self.t_fine - 76800 + h = ( + (((raw_hum << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) + 16384) + >> 15 + ) * ( + ( + ( + (((h * self.dig_H6) >> 10) * (((h * self.dig_H3) >> 11) + 32768)) + >> 10 + ) + + 2097152 + ) + * self.dig_H2 + + 8192 + ) >> 14 + h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4) + h = max(h, 0) + h = min(h, 419430400) + return h >> 12 + + # -------------------------------------------------- + # Public measurement methods + # -------------------------------------------------- + + def temperature(self): + """Return compensated temperature in °C (float).""" + raw_temp, _, _ = self._read_raw() + return self._compensate_temperature(raw_temp) / 100.0 + + def pressure_hpa(self): + """Return compensated pressure in hPa (float).""" + raw_temp, raw_press, _ = self._read_raw() + self._compensate_temperature(raw_temp) + return self._compensate_pressure(raw_press) / 25600.0 + + def humidity(self): + """Return compensated relative humidity in %RH (float).""" + raw_temp, _, raw_hum = self._read_raw() + self._compensate_temperature(raw_temp) + return self._compensate_humidity(raw_hum) / 1024.0 + + def read(self): + """Return (temperature_c, pressure_hpa, humidity_rh) tuple.""" + raw_temp, raw_press, raw_hum = self._read_raw() + temp_c = self._compensate_temperature(raw_temp) / 100.0 + press_hpa = self._compensate_pressure(raw_press) / 25600.0 + hum_rh = self._compensate_humidity(raw_hum) / 1024.0 + return temp_c, press_hpa, hum_rh + + def read_one_shot(self): + """Trigger a forced measurement, wait, and return (temp_c, press_hpa, hum_rh).""" + self.trigger_one_shot() + self._wait_measurement() + return self.read() diff --git a/tests/fake_machine/i2c.py b/tests/fake_machine/i2c.py index 326c702a..c5d9b903 100644 --- a/tests/fake_machine/i2c.py +++ b/tests/fake_machine/i2c.py @@ -14,6 +14,7 @@ class FakeI2C: def __init__(self, bus_id=None, *, registers=None, address=None, **kwargs): self._registers = {} + self._sequences = {} self._address = address self._write_log = [] self._read_log = [] @@ -28,6 +29,12 @@ def __init__(self, bus_id=None, *, registers=None, address=None, **kwargs): def readfrom_mem(self, addr, reg, nbytes, *, addrsize=8): self._check_address(addr) self._read_log.append(reg) + seq = self._sequences.get(reg) + if seq: + data = seq.pop(0) + if not seq: + del self._sequences[reg] + return data[:nbytes] data = self._registers.get(reg, b"\x00" * nbytes) return data[:nbytes] @@ -74,6 +81,18 @@ def get_read_log(self): def clear_read_log(self): self._read_log.clear() + def set_register_sequence(self, reg, values): + """Set a sequence of values for a register. + + Each read pops the next value from the list. When the list is + exhausted, reads fall back to the static register value. + + Args: + reg: register address. + values: list of bytes values to return on successive reads. + """ + self._sequences[reg] = [bytes(v) if not isinstance(v, bytes) else v for v in values] + def _check_address(self, addr): if self._address is not None and addr != self._address: raise OSError("I2C device not found at 0x{:02X}".format(addr)) diff --git a/tests/scenarios/bme280.yaml b/tests/scenarios/bme280.yaml index 15ffd89d..46f7c3f9 100644 --- a/tests/scenarios/bme280.yaml +++ b/tests/scenarios/bme280.yaml @@ -10,6 +10,10 @@ mock_registers: 0xD0: 0x60 # Status (not measuring, NVM copy done) 0xF3: 0x00 + + # Data registers 0xF7..0xFE (8 bytes): raw_press=415148, raw_temp=519888, raw_hum=28680 + # Expected compensated: ~25.08°C, ~1009.21 hPa, ~50.57 %RH + 0xF7: [0x65, 0x5A, 0xC0, 0x7E, 0xED, 0x00, 0x70, 0x08] # ctrl_hum (default) 0xF2: 0x00 # ctrl_meas (default sleep mode) @@ -157,3 +161,116 @@ tests: ) expect_true: true mode: [mock] + + - name: "wait_boot polls STATUS until IM_UPDATE clears" + action: script + script: | + # Simulate NVM copy in progress: STATUS returns 0x01 twice, then 0x00 + i2c.set_register_sequence(0xF3, [bytes([0x01]), bytes([0x01]), bytes([0x00])]) + i2c.clear_read_log() + from bme280 import BME280 + dev2 = BME280(i2c, address=0x76) + log = i2c.get_read_log() + # STATUS (0xF3) should have been read at least 3 times during boot + status_reads = [r for r in log if r == 0xF3] + result = len(status_reads) >= 3 + expect_true: true + mode: [mock] + + # ----- Status ----- + + - name: "data_ready returns True when not measuring" + action: script + script: | + result = dev.data_ready() + expect_true: true + mode: [mock] + + - name: "All ready methods delegate to data_ready" + action: script + script: | + result = dev.temperature_ready() and dev.pressure_ready() and dev.humidity_ready() + expect_true: true + mode: [mock] + + # ----- Compensated readings ----- + + - name: "Temperature returns correct value from raw data" + action: script + script: | + temp = dev.temperature() + result = abs(temp - 25.08) < 0.1 + expect_true: true + mode: [mock] + + - name: "Pressure returns correct value from raw data" + action: script + script: | + press = dev.pressure_hpa() + result = abs(press - 1009.21) < 0.5 + expect_true: true + mode: [mock] + + - name: "Humidity returns correct value from raw data" + action: script + script: | + hum = dev.humidity() + result = abs(hum - 50.57) < 0.5 + expect_true: true + mode: [mock] + + - name: "read() returns (temp, press, hum) tuple" + action: script + script: | + t, p, h = dev.read() + result = ( + abs(t - 25.08) < 0.1 + and abs(p - 1009.21) < 0.5 + and abs(h - 50.57) < 0.5 + ) + expect_true: true + mode: [mock] + + - name: "Humidity valid when t_fine equals 76800" + action: script + script: | + # Regression: _compensate_humidity() must not return 0 when t_fine - 76800 == 0 + dev.t_fine = 76800 + raw_hum = 28680 + hum_q22 = dev._compensate_humidity(raw_hum) + result = hum_q22 > 0 + expect_true: true + mode: [mock] + + - name: "t_fine updated after temperature read" + action: script + script: | + dev.temperature() + result = dev.t_fine == 128422 + expect_true: true + mode: [mock] + + - name: "trigger_one_shot writes forced mode to ctrl_meas" + action: script + script: | + i2c.clear_write_log() + dev.trigger_one_shot() + log = i2c.get_write_log() + wrote_forced = any( + reg == 0xF4 and (data[0] & 0x03) == 0x01 + for reg, data in log + ) + result = wrote_forced + expect_true: true + mode: [mock] + + - name: "read_one_shot triggers and reads" + action: script + script: | + i2c.clear_write_log() + t, p, h = dev.read_one_shot() + log = i2c.get_write_log() + triggered = any(reg == 0xF4 for reg, data in log) + result = triggered and abs(t - 25.08) < 0.1 + expect_true: true + mode: [mock]