Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions src/emonhub_interfacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ def __init__(self, name):
self.missed = {}
self.rx_msg = {}

def processRxc(self, rxc):
if rxc:
rxc = self._process_rx(rxc)
if rxc:
for channel in self._settings["pubchannels"]:

# Initialise channel if needed
if channel not in self._pub_channels:
self._pub_channels[channel] = []

# Add cargo item to channel
self._pub_channels[channel].append(rxc)



@log_exceptions_from_class_method
def run(self):
"""
Expand All @@ -102,21 +117,16 @@ def run(self):
# Only read if there is a pub channel defined for the interfacer
if len(self._settings["pubchannels"]):
# Read the input and process data if available
rxc = self.read()
if rxc:
rxc = self._process_rx(rxc)
if rxc:
for channel in self._settings["pubchannels"]:
self._log.debug("%d Sent to channel(start)' : %s", rxc.uri, channel)

# Initialise channel if needed
if channel not in self._pub_channels:
self._pub_channels[channel] = []

# Add cargo item to channel
self._pub_channels[channel].append(rxc)

self._log.debug("%d Sent to channel(end)' : %s", rxc.uri, channel)
result = self.read()

if isinstance(result, list):
for rxc in result:
self.processRxc(rxc)
elif isinstance(result, dict):
for rxc in result:
self.processRxc(result[rxc])
else:
self.processRxc(result)

# Subscriber channels
for channel in self._settings["subchannels"]:
Expand Down Expand Up @@ -291,12 +301,11 @@ def _process_rx(self, cargo):
if 'datalength' in ehc.nodelist[node]:
del ehc.nodelist[node]['datalength']


# If not in nodelist and pass through disabled return false
if node not in ehc.nodelist and self._settings['nodelistonly']:
self._log.warning("%d Discarded RX frame not in nodelist, node:%s, length:%s bytes", cargo.uri, node, len(rxc.realdata))
return False

# Data whitening uses for ensuring rfm sync
if node in ehc.nodelist and 'rx' in ehc.nodelist[node] and 'whitening' in ehc.nodelist[node]['rx']:
whitening = ehc.nodelist[node]['rx']['whitening']
Expand Down
179 changes: 124 additions & 55 deletions src/interfacers/EmonHubMBUSInterfacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

class EmonHubMBUSInterfacer(EmonHubInterfacer):

def __init__(self, name, device="/dev/ttyUSB0", device_vid=False, device_pid=False, baud=2400, use_meterbus_lib=False):
def __init__(self, name, device="/dev/ttyUSB0", device_vid=False, device_pid=False, baud=2400, use_meterbus_lib=True):
"""Initialize Interfacer

"""
Expand Down Expand Up @@ -63,21 +63,37 @@ def __init__(self, name, device="/dev/ttyUSB0", device_vid=False, device_pid=Fal
self.invalid_count = 0

# Only load module if it is installed
self.connect()

# If use_meterbus_lib is true, try to load module
# pip3 install pyMeterBus
self.use_meterbus_lib = False
if use_meterbus_lib:
try:
from pyMeterBus import meterbus
self.meterbus = meterbus

try:
# If we need a socket connection, use meterbus_lib
# pip3 install pyMeterBus
if (device.index("socket://")>=0):
self._log.info("Connecting using meterbus_lib:" + device)
self.ser=serial.serial_for_url(device, str(baud), 8, 'E', 1, timeout=1)
self.use_meterbus_lib = True
except ModuleNotFoundError as err:
self._log.error(err)
self.use_meterbus_lib = False
if use_meterbus_lib:
try:
self._log.info("importing mertbus_lib")
import meterbus
self.meterbus = meterbus
self.use_meterbus_lib = True
except ModuleNotFoundError as err:
self._log.error(err)
self.use_meterbus_lib = False
else:
self.connect()



if self.ping_address(self.ser, 1, 3):
self._log.info("ok ping")
else:
self._log.info("no reply ping")

except ModuleNotFoundError as err:
self._log.error(err)
self.ser = False

def connect(self):
"""Connect to MBUS

Expand Down Expand Up @@ -127,6 +143,21 @@ def connect(self):
except Exception:
self._log.error("Could not connect to MBUS serial")
self.ser = False


def ping_address(self, ser, address, retries=5, read_echo=False):
for i in range(0, retries + 1):
self.meterbus.send_ping_frame(ser, address, read_echo)
try:
frame = self.meterbus.load(self.meterbus.recv_frame(ser, 1))
if isinstance(frame, self.meterbus.TelegramACK):
return True
except self.meterbus.MBusFrameDecodeError as e:
pass

time.sleep(0.5)

return False

def mbus_serial_write(self,data):
try:
Expand Down Expand Up @@ -228,7 +259,7 @@ def decodeInt(self,bytes):
return struct.unpack("i", bytearray(bytes))[0]
return False

def parse_frame(self,data,records):
def parse_frame(self,meter, data,records):
data_types = ['null','int','int','int','int','float','int','int','null','bcd','bcd','bcd','bcd','var','bcd','null']
data_lengths = [0,1,2,3,4,4,6,8,0,1,2,3,4,6,6,0]
vif = {
Expand Down Expand Up @@ -442,42 +473,50 @@ def parse_frame(self,data,records):

return result

def parse_frame_meterbus_lib(self,data,records):
def parse_frame_meterbus_lib(self,meter, data,records):
self._log.debug("parse_frame_meterbus_lib");
telegram = self.meterbus.load(data)
meterbus_obj = json.loads(telegram.to_JSON())

result = {}
idx = 0;
for record in meterbus_obj['body']['records']:
if type(record['value'])==int or type(record['value'])==float:
name = record['type'].replace('VIFUnit.','').replace('VIFUnitExt.','').lower()
if name in result:
name = name + str(idx)

value = record['value']
unit = record['unit'].replace('MeasureUnit.','')
result[name] = [value, unit]

idx = idx+1
return result

def request_data(self, address, records):
def request_data(self, meter, address, records):
for i in range(0,2):
self.mbus_short_frame(address, 0x5b)
if self.use_meterbus_lib:
self.meterbus.send_request_frame(self.ser, address)
else:
self.mbus_short_frame(address, 0x5b)
# time.sleep(1.0)
result = self.read_data_frame(records)
result = self.read_data_frame(meter, records)
if result!=None:
return result
else:
time.sleep(0.2)

def request_data_sdm120(self, address, records):
def request_data_sdm120(self, meter, address, records):
for i in range(0,2):
self.mbus_request_sdm120(address)
# time.sleep(1.0)
result = self.read_data_frame(records)
result = self.read_data_frame(meter, records)
if result!=None:
return result
else:
time.sleep(0.2)


def read_data_frame(self,records):
def read_data_frame(self,meter, records):
data = []
bid = 0
bid_end = 255
Expand Down Expand Up @@ -529,9 +568,9 @@ def read_data_frame(self,records):

if valid: # Parse frame if still valid
if self.use_meterbus_lib:
return self.parse_frame_meterbus_lib(data,records)
return self.parse_frame_meterbus_lib(meter, data,records)
else:
return self.parse_frame(data,records)
return self.parse_frame(meter, data,records)

bid += 1
time.sleep(0.1)
Expand All @@ -550,12 +589,24 @@ def read_data_frame(self,records):
self._log.debug("Invalid count = 10. Restarting MBUS serial connection on next read")
self.ser = False

def add_result_to_cargo(self,meter,c,result):
def add_result_to_cargo(self,meter,nodesName,c,result):
if result != None:
self._log.debug("Decoded MBUS data: " + json.dumps(result))

nodesNameHash = {}
for nameTranslator in nodesName:
self._log.debug("nameTranslator:" + nameTranslator);
nameTranslatorPart = nameTranslator.split(':')
nodesNameHash[nameTranslatorPart[0]]=nameTranslatorPart[1]
self._log.debug(nameTranslatorPart[0] + " <> " + nameTranslatorPart[1]);



for key in result:
c.names.append(meter+"_"+key)
key1=key
if key in nodesNameHash:
key1 = nodesNameHash[key]

c.names.append(key1+"_"+meter)
c.realdata.append(result[key][0])
c.units.append(result[key][1])
else:
Expand All @@ -576,59 +627,72 @@ def read(self):
self.next_interval = False

if not self.ser:
self.connect()

c = Cargo.new_cargo()
c.names = []
c.realdata = []
c.units = []
c.nodeid = self._settings['nodename']
try:
if use_meterbus_lib:
self._log.info("Connecting using meterbus_lib:" + device)
self.ser=serial.serial_for_url(device, str(baud), 8, 'E', 1, timeout=1)
else:
self.connect()
except Exception:
self._log.error("Could not connect to MBUS serial")
self.ser = False

res = []

# Support for multiple MBUS meters on a single bus
for meter in self._settings['meters']:
c = Cargo.new_cargo()
c.names = []
c.realdata = []
c.units = []

address = self._settings['meters'][meter]['address']
meter_type = self._settings['meters'][meter]['type']

if not self._settings['nodename']:
c.nodeid = meter
else:
c.nodeid = self._settings['nodename']

meterPrefix = self._settings['meters'][meter]['name'];
nodesName = self._settings['meters'][meter]['nodesName'];
res.append(c)

# Most mbus meters use standard request, page 0 or default, all records
if meter_type=="standard":
result = self.request_data(address,[])
self.add_result_to_cargo(meter,c,result)
result = self.request_data(meter, address,[])
self.add_result_to_cargo(meterPrefix, nodesName, c,result)

# Qalcosonic E3
if meter_type=="qalcosonic_e3":
result = self.request_data(address,[4,5,6,7,8,9,10,11,12,13,14,15])
self.add_result_to_cargo(meter,c,result)
result = self.request_data(meter, address,[4,5,6,7,8,9,10,11,12,13,14,15])
self.add_result_to_cargo(meterPrefix,nodesName,c,result)

# ------------------------------------------------------
# Sontex Multical 531
if meter_type=="sontex531":
# p1
self.set_page(address, 1)
result = self.request_data(address,[4,5])
self.add_result_to_cargo(meter,c,result)
result = self.request_data(meter, address,[4,5])
self.add_result_to_cargo(meterPrefix,nodesName,c,result)
# p3
self.set_page(address, 3)
result = self.request_data(address,[1,2,3,4])
self.add_result_to_cargo(meter,c,result)
result = self.request_data(meter, address,[1,2,3,4])
self.add_result_to_cargo(meterPrefix,nodesName,c,result)

# SDM120 special request command
elif meter_type=="sdm120":
# 1. Get energy data
result = self.request_data(address,[1])
self.add_result_to_cargo(meter,c,result)
result = self.request_data(meter, address,[1])
self.add_result_to_cargo(meterPrefix,nodesName,c,result)
# 2. Get instantaneous data
result = self.request_data_sdm120(address,[1,7,11,23])
self.add_result_to_cargo(meter,c,result)
result = self.request_data_sdm120(meter, address,[1,7,11,23])
self.add_result_to_cargo(meterPrefix,nodesName,c,result)
elif meter_type=="kamstrup403":
result = self.request_data(address,[1,4,7,8,9,10,11,12,14])
self.add_result_to_cargo(meter,c,result)
result = self.request_data(meter, address,[1,4,7,8,9,10,11,12,14])
self.add_result_to_cargo(meterPrefix,nodesName,c,result)
# ------------------------------------------------------



if len(c.realdata) > 0:
return c
return res

else:
self.next_interval = True
Expand Down Expand Up @@ -666,25 +730,30 @@ def set(self, **kwargs):
for meter in setting:
# default
address = 1
name=""
meter_type = "standard"
records = []

# address
if 'address' in setting[meter]:
address = int(setting[meter]['address'])
if 'name' in setting[meter]:
name = setting[meter]['name']
if 'nodesName' in setting[meter]:
nodesName = setting[meter]['nodesName']
# type e.g sdm
if 'type' in setting[meter]:
meter_type = str(setting[meter]['type'])
#assign
self._settings['meters'][meter] = {
'address':address,
'type':meter_type,
'name':name,
'nodesName':nodesName
}
continue
else:
self._log.warning("'%s' is not valid for %s: %s", setting, self.name, key)

# include kwargs from parent
super().set(**kwargs)