Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ optional arguments:
-aux, --auxiliaries Output auxiliary data (internal and external temperatures).

## Embed in other Software
The class `ACS` provides key methods to handle the binary ACS data
The classes `ACSCompass` and `ACSInlinino` provides key methods to handle the binary ACS data from **Compass** and **Inlinino** respectively:
* `read_device_file`: Parse device file to be able to unpack and calibrate binary frame
* `find_frame`: Find registered ACS frames in bytearray
* `unpack_frame`: Unpack/Decode a binary frame into named tuple `FrameContainer`
* `calibrate_frame`: Convert a frame engineering units (counts) into scientific units (1/m for a and c)

The classes `BinReader` and `ConvertBinToCSV` are an illustration of the usage of the ACS class to parse binary files recorded with Compass software (.bin).
The classes `BinReader` and `ConvertBinToCSV` are an illustration of the usage of these classes to parse binary files (.bin).
10 changes: 8 additions & 2 deletions pyACS/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse
from pyACS import __version__
from pyACS.acs import ConvertBinToCSV
from pyACS.acs import ConvertBinToCSV, ACSInlinino, ACSCompass

# Argument Parser
parser = argparse.ArgumentParser(prog="python -m pyACS")
Expand All @@ -16,12 +16,18 @@
help="Destination file of decoded and calibrated data.")
parser.add_argument("-aux", "--auxiliaries", action="store_true",
help="Output auxiliary data (internal and external temperatures).")
parser.add_argument("--inlinino", action="store_true",
help="bin_file follows inlinino format.")
args = parser.parse_args()

# Decode and Calibrate binary file
if args.verbose:
print('Unpacking and calibrating ' + args.bin_file + ' ... ', end='', flush=True)
cbc = ConvertBinToCSV(args.device_file, args.bin_file, args.destination, args.auxiliaries)
if args.inlinino:
acs_parser = ACSInlinino(args.device_file)
else:
acs_parser = ACSCompass(args.device_file)
cbc = ConvertBinToCSV(acs_parser, args.bin_file, args.destination, args.auxiliaries)
if args.verbose:
print('DONE')
print('\tFrames extracted: ' + str(cbc.counter_good))
Expand Down
71 changes: 66 additions & 5 deletions pyACS/acs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import print_function

from datetime import datetime, timezone
import numpy as np
from math import log
from collections import namedtuple
Expand Down Expand Up @@ -124,13 +125,12 @@ def close(self):

class ConvertBinToCSV(BinReader):

def __init__(self, device_filename, bin_filename, csv_filename=None, write_auxiliaries=False):
def __init__(self, acs, bin_filename, csv_filename=None, write_auxiliaries=False):
if not csv_filename:
csv_filename = bin_filename + '.dat'
self.calibrate_auxiliaries = write_auxiliaries
self.counter_good = 0
self.counter_bad = 0
acs = ACS(device_filename)
self.csv = CSVWriter(acs.lambda_c, acs.lambda_a, write_auxiliaries)
self.csv.open(csv_filename)
super(ConvertBinToCSV, self).__init__(acs, bin_filename)
Expand Down Expand Up @@ -164,7 +164,7 @@ def __del__(self):
'flag_outside_calibration_range'])


class ACS:
class ACSCompass:
"""
Unpack and calibrate ACS and AC9 meters attenuation (c) and absorption (a) engineering values
(sig, ref counts in binary) to scientific units (1/m).
Expand Down Expand Up @@ -509,8 +509,9 @@ def calibrate_frame(self, frame, get_external_temperature=False):
delta_t_c = self.f_delta_t_c(internal_temperature_su)
delta_t_a = self.f_delta_t_a(internal_temperature_su)
# Calibrate and apply temperature and clean water offset corrections
c = (self.offset_c - (1 / self.x) * np.log(frame.c_sig / frame.c_ref)) - delta_t_c
a = (self.offset_a - (1 / self.x) * np.log(frame.a_sig / frame.a_ref)) - delta_t_a
with np.errstate(divide="ignore"):
c = (self.offset_c - (1 / self.x) * np.log(frame.c_sig / frame.c_ref)) - delta_t_c
a = (self.offset_a - (1 / self.x) * np.log(frame.a_sig / frame.a_ref)) - delta_t_a
# Pack output in named tuple
if get_external_temperature:
return CalibratedFrameContainer(c=c, a=a,
Expand All @@ -522,3 +523,63 @@ def calibrate_frame(self, frame, get_external_temperature=False):
internal_temperature=internal_temperature_su,
external_temperature=None,
flag_outside_calibration_range=flag_outside_calibration_range)


class ACSInlinino(ACSCompass):
def set_frame_descriptor(self):
super().set_frame_descriptor()
self.frame_descriptor += "d"
self.frame_length += calcsize("d")

def find_frame(self, buffer):
"""
Find the first and complete frame from the buffer
:param buffer: byte array
:return: frame: first frame found
checksum: boolean indicating if valid or invalid frame
buffer_post_frame: buffer left after the frame
buffer_pre_frame: buffer preceding the first frame returned (likely unknown frame header)
"""
# Look for registration bytes
i = buffer.find(self.REGISTRATION_BYTES)
if i == -1:
# No registration byte found
return bytearray(), False, buffer, bytearray()
# Take care of special case when checksum + pad byte or just checksum = \xff\x00
# It's unlikely that the full packet length is equal to \xff\x00 = 65280
while buffer.find(self.REGISTRATION_BYTES, i + 2, i + 2 + self.REGISTRATION_BYTES_LENGTH) != -1:
i += 2
frame_end_index = i + self.frame_length
# Make sure the buffer contains the full frame
if len(buffer) < frame_end_index:
return bytearray(), False, buffer, bytearray()
return buffer[i:frame_end_index], True, buffer[frame_end_index:], buffer[:i]

def unpack_frame(self, frame):
"""
Convert frame from C structs to Python values
Assume valid frame
:param frame: byte array including registration bytes
:return: data: a frame container tuple with python values from the frame
"""
d = unpack_from(self.frame_descriptor, frame, offset=self.REGISTRATION_BYTES_LENGTH)
return RawFrameContainer(
frame_len=d[0], # packet length
frame_type=d[1], # packet type identifier
# data[] = d[2] # reserved for future use (1)
serial_number=hex(d[3]), # instrument Serial Number (Meter type (first 2 bytes))
a_ref_dark=d[4], # A reference dark counts (for diagnostic purpose)
p=d[5], # A/D counts from the pressure sensor circuitry
a_sig_dark=d[6], # A signal dark counts (for diagnostic purpose)
t_ext=d[7], # External temperature voltage counts
t_int=d[8], # unsigned integer: Internal temperature voltage counts
c_ref_dark=d[9], # C reference dark counts
c_sig_dark=d[10], # C signal dark counts
# data[] = d[12] # reserved for future use
output_wavelength=d[13], # number of output wavelength
c_ref=np.array(d[14:-1:4], dtype=np.uint16),
a_ref=np.array(d[15:-1:4], dtype=np.uint16),
c_sig=np.array(d[16:-1:4], dtype=np.uint16),
a_sig=np.array(d[17:-1:4], dtype=np.uint16),
time_stamp=datetime.fromtimestamp(d[-1], tz=timezone.utc), # float64: Time stamp (UTC)
)
57 changes: 29 additions & 28 deletions pyACS/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@

class TestACSFunctions(unittest.TestCase):
def test_compute_external_temperature(self):
from pyACS.acs import ACS
acs = ACS()
from pyACS.acs import ACSCompass
acs = ACSCompass()
counts = unpack('!H', b'\x7a\xe4')[0]
su = acs.compute_external_temperature(counts)
self.assertAlmostEqual(su, 22.14, 2)

def test_compute_internal_temperature(self):
from pyACS.acs import ACS
acs = ACS()
from pyACS.acs import ACSCompass
acs = ACSCompass()
# AC-S (from ACS User Manual)
counts = unpack('!H', b'\xb9\xd8')[0] # 16-bit unsigned integer == H (2 bytes unsigned short)
su = acs.compute_internal_temperature(counts)
Expand Down Expand Up @@ -137,17 +137,17 @@ def test_acs_calibrate_frame(self):
a_ref=np.array([counts[1],counts[1],counts[1]], dtype=np.uint16),
c_sig=counts[2],
a_sig=np.array([counts[3],counts[3],counts[3]], dtype=np.uint16),
frame_len=np.NaN, # packet length
frame_type=np.NaN, # Packet type identifier
a_ref_dark=np.NaN, # A reference dark counts (for diagnostic purpose)
p=np.NaN, # A/D counts from the pressure sensor circuitry
a_sig_dark=np.NaN, # A signal dark counts (for diagnostic purpose)
t_ext=np.NaN, # External temperature voltage counts
c_ref_dark=np.NaN, # C reference dark counts
c_sig_dark=np.NaN, # C signal dark counts
time_stamp=np.NaN) # unsigned integer: Time stamp (ms)
frame_len=np.nan, # packet length
frame_type=np.nan, # Packet type identifier
a_ref_dark=np.nan, # A reference dark counts (for diagnostic purpose)
p=np.nan, # A/D counts from the pressure sensor circuitry
a_sig_dark=np.nan, # A signal dark counts (for diagnostic purpose)
t_ext=np.nan, # External temperature voltage counts
c_ref_dark=np.nan, # C reference dark counts
c_sig_dark=np.nan, # C signal dark counts
time_stamp=np.nan) # unsigned integer: Time stamp (ms)
# Define ACS
acs = pa.ACS()
acs = pa.ACSCompass()
acs.serial_number = '0x5300012A'
acs.output_wavelength = 3
acs.t = np.array([27.75, 28.2625])
Expand Down Expand Up @@ -181,9 +181,9 @@ def test_acs_calibrate_frame(self):
self.assertAlmostEqual(float(cal.a[i]), -0.0409, 2)

def test_acs_find_frame(self):
from pyACS.acs import ACS
from pyACS.acs import ACSCompass

acs = ACS()
acs = ACSCompass()
acs.output_wavelength = 86
acs.set_frame_descriptor()

Expand All @@ -194,9 +194,9 @@ def test_acs_find_frame(self):
self.assertEqual(skipped, TEST_FRAME[:15])

def test_acs_valid_frame(self):
from pyACS.acs import ACS
from pyACS.acs import ACSCompass

acs = ACS()
acs = ACSCompass()
acs.output_wavelength = 86
acs.set_frame_descriptor()

Expand All @@ -205,9 +205,9 @@ def test_acs_valid_frame(self):
self.assertEqual(passed, True)

def test_acs_unpack_frame(self):
from pyACS.acs import ACS
from pyACS.acs import ACSCompass

acs = ACS()
acs = ACSCompass()
acs.output_wavelength = 86
acs.set_frame_descriptor()

Expand Down Expand Up @@ -238,9 +238,9 @@ def test_acs_unpack_frame(self):
self.assertEqual(data.a_sig[-1], unpack('!H', b'\x2c\x1c')[0])

def test_acs_check_data(self):
from pyACS.acs import BinReader, ACS, FrameLengthError
from pyACS.acs import ACSCompass

acs = ACS()
acs = ACSCompass()
acs.serial_number = '0x53000002'
acs.output_wavelength = 86
acs.set_frame_descriptor()
Expand All @@ -252,7 +252,7 @@ def test_acs_check_data(self):

@unittest.skip("skipping Compass Dataset test")
def test_compass_datasets(self):
from pyACS.acs import BinReader, ACS, FrameLengthError, FrameTypeError, SerialNumberError
from pyACS.acs import BinReader, ACSCompass, FrameLengthError, FrameTypeError, SerialNumberError
from tqdm import tqdm

class BinToDataFrame(BinReader):
Expand Down Expand Up @@ -352,7 +352,7 @@ def handle_frame(self, frame):
for d in tqdm(datasets):
path_to_dataset = os.path.join(TEST_COMPASS_DATA, d)
device_filename = [os.path.join(path_to_dataset, f) for f in os.listdir(path_to_dataset) if f.endswith('.dev')][0]
reader = BinToDataFrame(ACS(device_filename))
reader = BinToDataFrame(ACSCompass(device_filename))
bin_files = [os.path.join(path_to_dataset, f) for f in os.listdir(path_to_dataset) if f.endswith('.bin')]
# bin_files = bin_files[:2] # Only test with 2 first files of each subset (comment line for full test)
# bin_files = ['test_data/EXPORTS1_ACS298/acs298_20180815201617.bin']
Expand Down Expand Up @@ -395,7 +395,7 @@ def handle_frame(self, frame):

@unittest.skip("skipping convert bin to csv")
def test_convert_bin_to_csv(self):
from pyACS.acs import ConvertBinToCSV
from pyACS.acs import ConvertBinToCSV, ACSCompass

# Find data for test
test_data_set = [x for x in os.listdir(TEST_COMPASS_DATA) if '_ACS' in x][0]
Expand All @@ -406,7 +406,8 @@ def test_convert_bin_to_csv(self):

for write_aux in [True, False]:
# Run class to test
ConvertBinToCSV(device_file, bin_file, os.path.join(TEST_COMPASS_DATA, 'out.csv'), write_auxiliaries=write_aux)
acs_parser = ACSCompass(device_file)
ConvertBinToCSV(acs_parser, bin_file, os.path.join(TEST_COMPASS_DATA, 'out.csv'), write_auxiliaries=write_aux)
# Load Result and Truth
actual_df = pd.read_csv(os.path.join(TEST_COMPASS_DATA, 'out.csv'), delimiter=',')
actual_df['timestamp'] = actual_df['timestamp'] - actual_df['timestamp'][0]
Expand All @@ -426,15 +427,15 @@ def test_convert_bin_to_csv(self):
# ConvertBinToCSV(device_file, bin_file, os.path.join(TEST_COMPASS_DATA, 'out_with_aux.csv'), write_auxiliaries=False)

def test_acs_repr(self):
from pyACS.acs import ACS
from pyACS.acs import ACSCompass

# Find data for test
test_data_set = [x for x in os.listdir(TEST_COMPASS_DATA) if '_ACS' in x][0]
path_to_dataset = os.path.join(TEST_COMPASS_DATA, test_data_set)
device_file = [os.path.join(path_to_dataset, f) for f in os.listdir(path_to_dataset) if f.endswith('.dev')][0]

# Test function works
foo = repr(ACS(device_file))
foo = repr(ACSCompass(device_file))


def read_prep_acs_output(filename):
Expand Down