Skip to content

arrow/parquet as a metadata format #5

@sjperkins

Description

@sjperkins

Following on from @bmerry's #4 (comment), I thought I'd spitball something arrow/parquet based for a couple of reasons:

  1. Internally, parquet chunks data by row groups.
  2. This allows readers to perform partial column reads

The following script spitballs placing MeerKAT sensor data into arrow tables an parquet files. It assumes that each row is associated with a dump -- perhaps this needs to be corrected? However, proceeding from this assumption, one can store baseline, channel or any other dimensionality data in each row.

What happens if data doesn't vary by row?: Dump it into the arrow table metadata as json/msgpack.

from itertools import product
from datetime import datetime
import json
from tempfile import TemporaryDirectory

import numpy as np
import pyarrow as pa
import pyarrow.compute as pac
import pyarrow.parquet as pq

ANTS = [f"m{x:03}" for x in range(7)]
POLS = ["hh", "hv", "vh", "vv"]
ANT1, ANT2 = np.triu_indices(len(ANTS), 0)
CORRPRODS = [(f"{ANTS[a1]}{p[0]}", f"{ANTS[a2]}{p[1]}") for (a1, a2), p in product(zip(ANT1, ANT2), POLS)]
START_TIME = datetime.now().timestamp()
NBL = len(ANT1)
NDUMPS = 100
DUMP_RATE = 8.0
NCHAN = 16

if __name__ == "__main__":
  time = START_TIME + np.arange(NDUMPS) * DUMP_RATE
  chan_freq = np.linspace(.856e9, 2 * .856e9, NCHAN)

  # timestamps are the table's primary row dimension
  telstate = pa.Table.from_pydict({
    "time": time,
    # Categorical data
    "state": pa.DictionaryArray.from_arrays(indices=np.random.randint(0, 4, NDUMPS), dictionary=pa.array(["slew", "track", "scan", "think"])),
    # Parquet can't write categorical of FSLA yet
    # "chan_freq": pa.DictionaryArray.from_arrays(indices=[0] * NDUMPS, dictionary=pa.FixedSizeListArray.from_arrays(chan_freq, NCHAN)),
    # Arrow can't create categorical of FSTA yet
    # "chan_freq": pa.DictionaryArray.from_arrays(indices=[0] * NDUMPS, dictionary=pa.FixedShapeTensorArray.from_numpy_ndarray(chan_freq.reshape((1, NCHAN)))),
    "m003_humidity": pa.FixedShapeTensorArray.from_numpy_ndarray(np.random.random((NDUMPS, NBL, NCHAN))),
    "m003_pointing_azel": pa.FixedShapeTensorArray.from_numpy_ndarray(np.random.random((NDUMPS, len(ANTS), NCHAN, 2)))
  },
  # Everything without a primary time dimension goes into the metadata section
  # This should probably be msgpack'd to avoid losing precision
  # If data is split across multiple parquet files, the metadata is replicated
  # in each, which may be undesirable
  metadata={
    "corrprods": json.dumps(CORRPRODS),
    "dump_rate": json.dumps(DUMP_RATE),
    "antenna": json.dumps(ANTS),
    "chan_freq": json.dumps(chan_freq.tolist()),
    "polarizations": json.dumps(POLS)
  })

  with TemporaryDirectory(prefix="telstate") as tmpdir:
    # Write out a single parquet file, in chunks of 10 rows
    pq.write_to_dataset(telstate, tmpdir, row_group_size=10)

    # Lazy view over the data
    ds = pq.ParquetDataset(
      tmpdir,
      filters=(
        (pac.field("time") > pac.scalar(time[3])) &
        (pac.field("time") < pac.scalar(time[-3]))
      )
    )

    # Perform projection, only read this column
    ds_table = ds.read(["m003_pointing_azel"])
    assert len(ds_table) == NDUMPS - 3 - 4

@ludwigschwardt Is there anything that wouldn't work in the above conceptualisation? I'd need to figure out if it's possible to remotely access a parquet file via http(s) (it's certainly possible to do so via S3 which is built on top of http(s) anyway.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions