diff --git a/parquet/pom.xml b/parquet/pom.xml
index b848ece6..aa2c64e6 100644
--- a/parquet/pom.xml
+++ b/parquet/pom.xml
@@ -29,6 +29,18 @@
zstd-jni
+
+
+ io.github.dfa1.vortex
+ vortex-reader
+ test
+
+
+
+ io.airlift
+ aircompressor-v3
+ test
+
org.junit.jupiter
junit-jupiter
diff --git a/parquet/src/main/java/io/github/dfa1/vortex/parquet/ParquetImporter.java b/parquet/src/main/java/io/github/dfa1/vortex/parquet/ParquetImporter.java
index 4b7ed2b7..4a4384f9 100644
--- a/parquet/src/main/java/io/github/dfa1/vortex/parquet/ParquetImporter.java
+++ b/parquet/src/main/java/io/github/dfa1/vortex/parquet/ParquetImporter.java
@@ -115,7 +115,7 @@ public static void importParquet(Path parquetPath, Path vortexPath, ImportOption
}
}
- private static DType mapDType(ColumnSchema col) {
+ static DType mapDType(ColumnSchema col) {
boolean nullable = col.repetitionType() == RepetitionType.OPTIONAL;
return switch (col.type()) {
case BOOLEAN -> new DType.Bool(nullable);
@@ -239,7 +239,7 @@ private static Map buildChunk(List columns, List filterColumns(List all, List names) {
+ static List filterColumns(List all, List names) {
List result = new ArrayList<>(names.size());
for (String name : names) {
boolean found = false;
diff --git a/parquet/src/test/java/io/github/dfa1/vortex/parquet/ParquetImporterTest.java b/parquet/src/test/java/io/github/dfa1/vortex/parquet/ParquetImporterTest.java
new file mode 100644
index 00000000..35ec9f74
--- /dev/null
+++ b/parquet/src/test/java/io/github/dfa1/vortex/parquet/ParquetImporterTest.java
@@ -0,0 +1,296 @@
+package io.github.dfa1.vortex.parquet;
+
+import dev.hardwood.metadata.FieldPath;
+import dev.hardwood.metadata.LogicalType;
+import dev.hardwood.metadata.PhysicalType;
+import dev.hardwood.metadata.RepetitionType;
+import dev.hardwood.schema.ColumnSchema;
+import io.github.dfa1.vortex.core.DType;
+import io.github.dfa1.vortex.core.PType;
+import io.github.dfa1.vortex.reader.Chunk;
+import io.github.dfa1.vortex.reader.ScanIterator;
+import io.github.dfa1.vortex.reader.ScanOptions;
+import io.github.dfa1.vortex.reader.VortexReader;
+import io.github.dfa1.vortex.reader.array.LongArray;
+import io.github.dfa1.vortex.reader.array.VarBinArray;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class ParquetImporterTest {
+
+ private static ColumnSchema col(String name, PhysicalType type, RepetitionType rep, LogicalType logical) {
+ return new ColumnSchema(FieldPath.of(name), type, rep, null, 0, 0, 0, logical);
+ }
+
+ @Nested
+ class TypeMapping {
+
+ @Test
+ void boolean_mapsToBool_carryingNullability() {
+ // Given / When / Then — REQUIRED is non-null, OPTIONAL is nullable
+ assertThat(ParquetImporter.mapDType(col("b", PhysicalType.BOOLEAN, RepetitionType.REQUIRED, null)))
+ .isEqualTo(new DType.Bool(false));
+ assertThat(ParquetImporter.mapDType(col("b", PhysicalType.BOOLEAN, RepetitionType.OPTIONAL, null)))
+ .isEqualTo(new DType.Bool(true));
+ }
+
+ @Test
+ void int32_withoutAnnotation_mapsToI32() {
+ // When
+ DType result = ParquetImporter.mapDType(col("i", PhysicalType.INT32, RepetitionType.REQUIRED, null));
+
+ // Then
+ assertThat(result).isEqualTo(new DType.Primitive(PType.I32, false));
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "8, true, I8",
+ "8, false, U8",
+ "16, true, I16",
+ "16, false, U16",
+ "32, true, I32",
+ "32, false, U32",
+ })
+ void int32_withIntAnnotation_mapsToSizedPType(int bitWidth, boolean signed, PType expected) {
+ // Given — INT32 carrying a width/sign annotation selects the narrow PType
+ ColumnSchema schema = col("i", PhysicalType.INT32, RepetitionType.REQUIRED,
+ new LogicalType.IntType(bitWidth, signed));
+
+ // When
+ DType result = ParquetImporter.mapDType(schema);
+
+ // Then
+ assertThat(result).isEqualTo(new DType.Primitive(expected, false));
+ }
+
+ @Test
+ void int64_signedAndUnsigned_mapToI64AndU64() {
+ // Given / When / Then
+ assertThat(ParquetImporter.mapDType(col("l", PhysicalType.INT64, RepetitionType.REQUIRED, null)))
+ .isEqualTo(new DType.Primitive(PType.I64, false));
+ assertThat(ParquetImporter.mapDType(col("l", PhysicalType.INT64, RepetitionType.REQUIRED,
+ new LogicalType.IntType(64, true)))).isEqualTo(new DType.Primitive(PType.I64, false));
+ assertThat(ParquetImporter.mapDType(col("l", PhysicalType.INT64, RepetitionType.REQUIRED,
+ new LogicalType.IntType(64, false)))).isEqualTo(new DType.Primitive(PType.U64, false));
+ }
+
+ @ParameterizedTest
+ @CsvSource({"MILLIS", "MICROS", "NANOS"})
+ void int64_timestamp_mapsToTimestampExtensionOverI64(LogicalType.TimeUnit unit) {
+ // Given — a TIMESTAMP-annotated INT64
+ ColumnSchema schema = col("ts", PhysicalType.INT64, RepetitionType.OPTIONAL,
+ new LogicalType.TimestampType(true, unit));
+
+ // When
+ DType result = ParquetImporter.mapDType(schema);
+
+ // Then — vortex.timestamp extension over nullable I64 storage
+ assertThat(result).isInstanceOf(DType.Extension.class);
+ DType.Extension ext = (DType.Extension) result;
+ assertThat(ext.extensionId()).isEqualTo("vortex.timestamp");
+ assertThat(ext.storageDType()).isEqualTo(new DType.Primitive(PType.I64, true));
+ assertThat(ext.nullable()).isTrue();
+ }
+
+ @Test
+ void float_and_double_mapToF32AndF64() {
+ // Given / When / Then
+ assertThat(ParquetImporter.mapDType(col("f", PhysicalType.FLOAT, RepetitionType.REQUIRED, null)))
+ .isEqualTo(new DType.Primitive(PType.F32, false));
+ assertThat(ParquetImporter.mapDType(col("d", PhysicalType.DOUBLE, RepetitionType.REQUIRED, null)))
+ .isEqualTo(new DType.Primitive(PType.F64, false));
+ }
+
+ @Test
+ void byteArray_stringLikeAnnotations_mapToUtf8() {
+ // Given — STRING / ENUM / JSON are all logical strings
+ for (LogicalType logical : List.of(new LogicalType.StringType(),
+ new LogicalType.EnumType(), new LogicalType.JsonType())) {
+ // When
+ DType result = ParquetImporter.mapDType(
+ col("s", PhysicalType.BYTE_ARRAY, RepetitionType.OPTIONAL, logical));
+
+ // Then
+ assertThat(result).as("logical %s", logical).isEqualTo(new DType.Utf8(true));
+ }
+ }
+
+ @Test
+ void byteArray_withoutStringAnnotation_throws() {
+ // Given — raw BYTE_ARRAY with no string logical type is unsupported
+ ColumnSchema schema = col("blob", PhysicalType.BYTE_ARRAY, RepetitionType.REQUIRED, null);
+
+ // When / Then
+ assertThatThrownBy(() -> ParquetImporter.mapDType(schema))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("blob");
+ }
+
+ @ParameterizedTest
+ @CsvSource({"INT96", "FIXED_LEN_BYTE_ARRAY"})
+ void unsupportedPhysicalType_throws(PhysicalType type) {
+ // Given
+ ColumnSchema schema = col("x", type, RepetitionType.REQUIRED, null);
+
+ // When / Then
+ assertThatThrownBy(() -> ParquetImporter.mapDType(schema))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("unsupported Parquet physical type");
+ }
+ }
+
+ @Nested
+ class FilterColumns {
+
+ @Test
+ void keepsRequestedColumnsInRequestedOrder() {
+ // Given — schema a, b, c; request c, a
+ List all = List.of(
+ col("a", PhysicalType.INT32, RepetitionType.REQUIRED, null),
+ col("b", PhysicalType.INT32, RepetitionType.REQUIRED, null),
+ col("c", PhysicalType.INT32, RepetitionType.REQUIRED, null));
+
+ // When
+ List result = ParquetImporter.filterColumns(all, List.of("c", "a"));
+
+ // Then — projection order wins over schema order
+ assertThat(result).extracting(ColumnSchema::name).containsExactly("c", "a");
+ }
+
+ @Test
+ void unknownColumn_throws() {
+ // Given
+ List all = List.of(col("a", PhysicalType.INT32, RepetitionType.REQUIRED, null));
+
+ // When / Then
+ assertThatThrownBy(() -> ParquetImporter.filterColumns(all, List.of("missing")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("missing");
+ }
+ }
+
+ @Nested
+ class Import {
+
+ @Test
+ void importsFixture_schemaAndRowCount(@TempDir Path tmp) throws Exception {
+ // Given — 100-row TPC-DS customer fixture (INT64 + STRING, all nullable)
+ Path vortex = tmp.resolve("out.vortex");
+
+ // When
+ ParquetImporter.importParquet(fixture(), vortex);
+
+ // Then
+ try (VortexReader reader = VortexReader.open(vortex)) {
+ assertThat(reader.dtype()).isInstanceOf(DType.Struct.class);
+ DType.Struct schema = (DType.Struct) reader.dtype();
+ assertThat(schema.fieldNames()).contains("c_customer_sk", "c_first_name");
+ assertThat(countRows(reader)).isEqualTo(100L);
+ }
+ }
+
+ @Test
+ void importsFixture_columnValuesRoundTrip(@TempDir Path tmp) throws Exception {
+ // Given
+ Path vortex = tmp.resolve("out.vortex");
+
+ // When
+ ParquetImporter.importParquet(fixture(), vortex);
+
+ // Then — known first three values of each column
+ try (VortexReader reader = VortexReader.open(vortex);
+ ScanIterator iter = reader.scan(ScanOptions.all())) {
+ assertThat(iter.hasNext()).isTrue();
+ try (Chunk first = iter.next()) {
+ LongArray sk = first.column("c_customer_sk");
+ assertThat(sk.getLong(0)).isEqualTo(100L);
+ assertThat(sk.getLong(1)).isEqualTo(99L);
+ assertThat(sk.getLong(2)).isEqualTo(98L);
+
+ VarBinArray name = first.column("c_first_name");
+ assertThat(name.getString(0)).isEqualTo("Jeannette");
+ assertThat(name.getString(1)).isEqualTo("Austin");
+ assertThat(name.getString(2)).isEqualTo("David");
+ }
+ }
+ }
+
+ @Test
+ void projection_importsOnlyRequestedColumns(@TempDir Path tmp) throws Exception {
+ // Given — project a single column out of the fixture
+ Path vortex = tmp.resolve("out.vortex");
+ ImportOptions options = ImportOptions.defaults().withColumns(List.of("c_customer_sk"));
+
+ // When
+ ParquetImporter.importParquet(fixture(), vortex, options);
+
+ // Then — only the projected column survives
+ try (VortexReader reader = VortexReader.open(vortex)) {
+ DType.Struct schema = (DType.Struct) reader.dtype();
+ assertThat(schema.fieldNames()).containsExactly("c_customer_sk");
+ assertThat(countRows(reader)).isEqualTo(100L);
+ }
+ }
+
+ @Test
+ void smallChunkSize_splitsIntoMultipleChunks(@TempDir Path tmp) throws Exception {
+ // Given — chunk size 30 forces 4 chunks over 100 rows (exercises trim + chunk flush)
+ Path vortex = tmp.resolve("out.vortex");
+ ImportOptions options = ImportOptions.defaults().withChunkSize(30);
+
+ // When
+ ParquetImporter.importParquet(fixture(), vortex, options);
+
+ // Then — row count is preserved across the chunk boundaries
+ try (VortexReader reader = VortexReader.open(vortex);
+ ScanIterator iter = reader.scan(ScanOptions.all())) {
+ long chunks = 0;
+ long rows = 0;
+ while (iter.hasNext()) {
+ try (Chunk c = iter.next()) {
+ chunks++;
+ rows += c.rowCount();
+ }
+ }
+ assertThat(rows).isEqualTo(100L);
+ assertThat(chunks).isGreaterThan(1L);
+ }
+ }
+
+ @Test
+ void projection_unknownColumn_throws(@TempDir Path tmp) {
+ // Given
+ Path vortex = tmp.resolve("out.vortex");
+ ImportOptions options = ImportOptions.defaults().withColumns(List.of("does_not_exist"));
+
+ // When / Then
+ assertThatThrownBy(() -> ParquetImporter.importParquet(fixture(), vortex, options))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("does_not_exist");
+ }
+ }
+
+ private static Path fixture() throws Exception {
+ return Path.of(ParquetImporterTest.class
+ .getResource("/fixtures/delta_encoding_optional_column.parquet").toURI());
+ }
+
+ private static long countRows(VortexReader reader) {
+ AtomicLong total = new AtomicLong();
+ try (ScanIterator iter = reader.scan(ScanOptions.all())) {
+ iter.forEachRemaining(c -> total.addAndGet(c.rowCount()));
+ }
+ return total.get();
+ }
+}
diff --git a/parquet/src/test/resources/fixtures/delta_encoding_optional_column.parquet b/parquet/src/test/resources/fixtures/delta_encoding_optional_column.parquet
new file mode 100644
index 00000000..3b06caae
Binary files /dev/null and b/parquet/src/test/resources/fixtures/delta_encoding_optional_column.parquet differ