-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Following on from @bmerry's #4 (comment), I thought I'd spitball something arrow/parquet based for a couple of reasons:
- Internally, parquet chunks data by row groups.
- This allows readers to perform partial column reads
The following script spitballs placing MeerKAT sensor data into arrow tables an parquet files. It assumes that each row is associated with a dump -- perhaps this needs to be corrected? However, proceeding from this assumption, one can store baseline, channel or any other dimensionality data in each row.
What happens if data doesn't vary by row?: Dump it into the arrow table metadata as json/msgpack.
from itertools import product
from datetime import datetime
import json
from tempfile import TemporaryDirectory
import numpy as np
import pyarrow as pa
import pyarrow.compute as pac
import pyarrow.parquet as pq
ANTS = [f"m{x:03}" for x in range(7)]
POLS = ["hh", "hv", "vh", "vv"]
ANT1, ANT2 = np.triu_indices(len(ANTS), 0)
CORRPRODS = [(f"{ANTS[a1]}{p[0]}", f"{ANTS[a2]}{p[1]}") for (a1, a2), p in product(zip(ANT1, ANT2), POLS)]
START_TIME = datetime.now().timestamp()
NBL = len(ANT1)
NDUMPS = 100
DUMP_RATE = 8.0
NCHAN = 16
if __name__ == "__main__":
time = START_TIME + np.arange(NDUMPS) * DUMP_RATE
chan_freq = np.linspace(.856e9, 2 * .856e9, NCHAN)
# timestamps are the table's primary row dimension
telstate = pa.Table.from_pydict({
"time": time,
# Categorical data
"state": pa.DictionaryArray.from_arrays(indices=np.random.randint(0, 4, NDUMPS), dictionary=pa.array(["slew", "track", "scan", "think"])),
# Parquet can't write categorical of FSLA yet
# "chan_freq": pa.DictionaryArray.from_arrays(indices=[0] * NDUMPS, dictionary=pa.FixedSizeListArray.from_arrays(chan_freq, NCHAN)),
# Arrow can't create categorical of FSTA yet
# "chan_freq": pa.DictionaryArray.from_arrays(indices=[0] * NDUMPS, dictionary=pa.FixedShapeTensorArray.from_numpy_ndarray(chan_freq.reshape((1, NCHAN)))),
"m003_humidity": pa.FixedShapeTensorArray.from_numpy_ndarray(np.random.random((NDUMPS, NBL, NCHAN))),
"m003_pointing_azel": pa.FixedShapeTensorArray.from_numpy_ndarray(np.random.random((NDUMPS, len(ANTS), NCHAN, 2)))
},
# Everything without a primary time dimension goes into the metadata section
# This should probably be msgpack'd to avoid losing precision
# If data is split across multiple parquet files, the metadata is replicated
# in each, which may be undesirable
metadata={
"corrprods": json.dumps(CORRPRODS),
"dump_rate": json.dumps(DUMP_RATE),
"antenna": json.dumps(ANTS),
"chan_freq": json.dumps(chan_freq.tolist()),
"polarizations": json.dumps(POLS)
})
with TemporaryDirectory(prefix="telstate") as tmpdir:
# Write out a single parquet file, in chunks of 10 rows
pq.write_to_dataset(telstate, tmpdir, row_group_size=10)
# Lazy view over the data
ds = pq.ParquetDataset(
tmpdir,
filters=(
(pac.field("time") > pac.scalar(time[3])) &
(pac.field("time") < pac.scalar(time[-3]))
)
)
# Perform projection, only read this column
ds_table = ds.read(["m003_pointing_azel"])
assert len(ds_table) == NDUMPS - 3 - 4@ludwigschwardt Is there anything that wouldn't work in the above conceptualisation? I'd need to figure out if it's possible to remotely access a parquet file via http(s) (it's certainly possible to do so via S3 which is built on top of http(s) anyway.