Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 147 additions & 4 deletions lib/bme280/bme280/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
BME280_I2C_DEFAULT_ADDR,
CALIB_H_SIZE,
CALIB_TP_SIZE,
DATA_BLOCK_SIZE,
MODE_FORCED,
MODE_MASK,
MODE_SLEEP,
OSRS_P_SHIFT,
OSRS_T_SHIFT,
Expand All @@ -15,11 +18,13 @@
REG_CHIP_ID,
REG_CTRL_HUM,
REG_CTRL_MEAS,
REG_DATA_START,
REG_SOFT_RESET,
REG_STATUS,
RESET_DELAY_MS,
SOFT_RESET_CMD,
STATUS_IM_UPDATE,
STATUS_MEASURING,
)
from bme280.exceptions import BME280Error, BME280InvalidDevice, BME280NotFound

Expand Down Expand Up @@ -51,10 +56,6 @@ def _write_reg(self, reg, value):
"""Write a single byte to register."""
self.i2c.writeto_mem(self.address, reg, bytes([value]))

# --------------------------------------------------
# Device identification and initialization
# --------------------------------------------------

# --------------------------------------------------
# Calibration data
# --------------------------------------------------
Expand Down Expand Up @@ -137,3 +138,145 @@ def reset(self):
self.soft_reset()
self._read_calibration()
self._configure_default()

# --------------------------------------------------
# Status
# --------------------------------------------------

def _status(self):
"""Return the raw STATUS register value."""
return self._read_reg(REG_STATUS)

def data_ready(self):
"""Return True when all measurements are available."""
return not (self._status() & STATUS_MEASURING)

def temperature_ready(self):
"""Return True when temperature data is available."""
return self.data_ready()

def pressure_ready(self):
"""Return True when pressure data is available."""
return self.data_ready()

def humidity_ready(self):
"""Return True when humidity data is available."""
return self.data_ready()

# --------------------------------------------------
# Forced measurement trigger
# --------------------------------------------------

def trigger_one_shot(self):
"""Trigger a single forced measurement. Poll data_ready() for completion."""
ctrl = self._read_reg(REG_CTRL_MEAS)
self._write_reg(REG_CTRL_MEAS, (ctrl & ~MODE_MASK) | MODE_FORCED)

def _wait_measurement(self, timeout_ms=100):
"""Wait for measurement to complete. Raises on timeout."""
for _ in range(timeout_ms // 5):
if self.data_ready():
return
sleep_ms(5)
raise BME280Error("BME280 measurement timeout")

# --------------------------------------------------
# Raw data burst read
# --------------------------------------------------

def _read_raw(self):
"""Read raw ADC values via burst read (0xF7-0xFE, 8 bytes).

Returns (raw_temp, raw_press, raw_hum) as 20-bit/20-bit/16-bit integers.
"""
data = self._read_block(REG_DATA_START, DATA_BLOCK_SIZE)
raw_press = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
raw_temp = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
raw_hum = (data[6] << 8) | data[7]
return raw_temp, raw_press, raw_hum

# --------------------------------------------------
# Compensation formulas (BME280 datasheet section 4.2.3)
# --------------------------------------------------

def _compensate_temperature(self, raw_temp):
"""Compute compensated temperature in hundredths of °C. Updates t_fine."""
var1 = (((raw_temp >> 3) - (self.dig_T1 << 1)) * self.dig_T2) >> 11
var2 = (
(((raw_temp >> 4) - self.dig_T1) * ((raw_temp >> 4) - self.dig_T1)) >> 12
) * self.dig_T3 >> 14
self.t_fine = var1 + var2
return (self.t_fine * 5 + 128) >> 8

def _compensate_pressure(self, raw_press):
"""Compute compensated pressure in Pa (Q24.8 fixed point)."""
var1 = self.t_fine - 128000
var2 = var1 * var1 * self.dig_P6
var2 = var2 + ((var1 * self.dig_P5) << 17)
var2 = var2 + (self.dig_P4 << 35)
var1 = ((var1 * var1 * self.dig_P3) >> 8) + ((var1 * self.dig_P2) << 12)
var1 = ((1 << 47) + var1) * self.dig_P1 >> 33
if var1 == 0:
return 0
p = 1048576 - raw_press
p = (((p << 31) - var2) * 3125) // var1
var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25
var2 = (self.dig_P8 * p) >> 19
return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)

def _compensate_humidity(self, raw_hum):
"""Compute compensated humidity in Q22.10 fixed point."""
h = self.t_fine - 76800
h = (
(((raw_hum << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) + 16384)
>> 15
) * (
(
(
(((h * self.dig_H6) >> 10) * (((h * self.dig_H3) >> 11) + 32768))
>> 10
)
+ 2097152
)
* self.dig_H2
+ 8192
) >> 14
h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)
h = max(h, 0)
h = min(h, 419430400)
return h >> 12

# --------------------------------------------------
# Public measurement methods
# --------------------------------------------------

def temperature(self):
"""Return compensated temperature in °C (float)."""
raw_temp, _, _ = self._read_raw()
return self._compensate_temperature(raw_temp) / 100.0

def pressure_hpa(self):
"""Return compensated pressure in hPa (float)."""
raw_temp, raw_press, _ = self._read_raw()
self._compensate_temperature(raw_temp)
return self._compensate_pressure(raw_press) / 25600.0

def humidity(self):
"""Return compensated relative humidity in %RH (float)."""
raw_temp, _, raw_hum = self._read_raw()
self._compensate_temperature(raw_temp)
return self._compensate_humidity(raw_hum) / 1024.0

def read(self):
"""Return (temperature_c, pressure_hpa, humidity_rh) tuple."""
raw_temp, raw_press, raw_hum = self._read_raw()
temp_c = self._compensate_temperature(raw_temp) / 100.0
press_hpa = self._compensate_pressure(raw_press) / 25600.0
hum_rh = self._compensate_humidity(raw_hum) / 1024.0
return temp_c, press_hpa, hum_rh

def read_one_shot(self):
"""Trigger a forced measurement, wait, and return (temp_c, press_hpa, hum_rh)."""
self.trigger_one_shot()
self._wait_measurement()
return self.read()
19 changes: 19 additions & 0 deletions tests/fake_machine/i2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class FakeI2C:

def __init__(self, bus_id=None, *, registers=None, address=None, **kwargs):
self._registers = {}
self._sequences = {}
self._address = address
self._write_log = []
self._read_log = []
Expand All @@ -28,6 +29,12 @@ def __init__(self, bus_id=None, *, registers=None, address=None, **kwargs):
def readfrom_mem(self, addr, reg, nbytes, *, addrsize=8):
self._check_address(addr)
self._read_log.append(reg)
seq = self._sequences.get(reg)
if seq:
data = seq.pop(0)
if not seq:
del self._sequences[reg]
return data[:nbytes]
data = self._registers.get(reg, b"\x00" * nbytes)
return data[:nbytes]

Expand Down Expand Up @@ -74,6 +81,18 @@ def get_read_log(self):
def clear_read_log(self):
self._read_log.clear()

def set_register_sequence(self, reg, values):
"""Set a sequence of values for a register.

Each read pops the next value from the list. When the list is
exhausted, reads fall back to the static register value.

Args:
reg: register address.
values: list of bytes values to return on successive reads.
"""
self._sequences[reg] = [bytes(v) if not isinstance(v, bytes) else v for v in values]

def _check_address(self, addr):
if self._address is not None and addr != self._address:
raise OSError("I2C device not found at 0x{:02X}".format(addr))
117 changes: 117 additions & 0 deletions tests/scenarios/bme280.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ mock_registers:
0xD0: 0x60
# Status (not measuring, NVM copy done)
0xF3: 0x00

# Data registers 0xF7..0xFE (8 bytes): raw_press=415148, raw_temp=519888, raw_hum=28680
# Expected compensated: ~25.08°C, ~1009.21 hPa, ~50.57 %RH
0xF7: [0x65, 0x5A, 0xC0, 0x7E, 0xED, 0x00, 0x70, 0x08]
# ctrl_hum (default)
0xF2: 0x00
# ctrl_meas (default sleep mode)
Expand Down Expand Up @@ -157,3 +161,116 @@ tests:
)
expect_true: true
mode: [mock]

- name: "wait_boot polls STATUS until IM_UPDATE clears"
action: script
script: |
# Simulate NVM copy in progress: STATUS returns 0x01 twice, then 0x00
i2c.set_register_sequence(0xF3, [bytes([0x01]), bytes([0x01]), bytes([0x00])])
i2c.clear_read_log()
from bme280 import BME280
dev2 = BME280(i2c, address=0x76)
log = i2c.get_read_log()
# STATUS (0xF3) should have been read at least 3 times during boot
status_reads = [r for r in log if r == 0xF3]
result = len(status_reads) >= 3
expect_true: true
mode: [mock]

# ----- Status -----

- name: "data_ready returns True when not measuring"
action: script
script: |
result = dev.data_ready()
expect_true: true
mode: [mock]

- name: "All ready methods delegate to data_ready"
action: script
script: |
result = dev.temperature_ready() and dev.pressure_ready() and dev.humidity_ready()
expect_true: true
mode: [mock]

# ----- Compensated readings -----

- name: "Temperature returns correct value from raw data"
action: script
script: |
temp = dev.temperature()
result = abs(temp - 25.08) < 0.1
expect_true: true
mode: [mock]

- name: "Pressure returns correct value from raw data"
action: script
script: |
press = dev.pressure_hpa()
result = abs(press - 1009.21) < 0.5
expect_true: true
mode: [mock]

- name: "Humidity returns correct value from raw data"
action: script
script: |
hum = dev.humidity()
result = abs(hum - 50.57) < 0.5
expect_true: true
mode: [mock]

- name: "read() returns (temp, press, hum) tuple"
action: script
script: |
t, p, h = dev.read()
result = (
abs(t - 25.08) < 0.1
and abs(p - 1009.21) < 0.5
and abs(h - 50.57) < 0.5
)
expect_true: true
mode: [mock]

- name: "Humidity valid when t_fine equals 76800"
action: script
script: |
# Regression: _compensate_humidity() must not return 0 when t_fine - 76800 == 0
dev.t_fine = 76800
raw_hum = 28680
hum_q22 = dev._compensate_humidity(raw_hum)
result = hum_q22 > 0
expect_true: true
mode: [mock]

- name: "t_fine updated after temperature read"
action: script
script: |
dev.temperature()
result = dev.t_fine == 128422
expect_true: true
mode: [mock]

- name: "trigger_one_shot writes forced mode to ctrl_meas"
action: script
script: |
i2c.clear_write_log()
dev.trigger_one_shot()
log = i2c.get_write_log()
wrote_forced = any(
reg == 0xF4 and (data[0] & 0x03) == 0x01
for reg, data in log
)
result = wrote_forced
expect_true: true
mode: [mock]

- name: "read_one_shot triggers and reads"
action: script
script: |
i2c.clear_write_log()
t, p, h = dev.read_one_shot()
log = i2c.get_write_log()
triggered = any(reg == 0xF4 for reg, data in log)
result = triggered and abs(t - 25.08) < 0.1
expect_true: true
mode: [mock]
Loading