diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java index b437b0bbf51c..9cc8b0dfafc8 100644 --- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java @@ -20,6 +20,8 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -157,6 +159,12 @@ public static Object generatePrimitive(Type.PrimitiveType primitive, Random rand BigDecimal bigDecimal = new BigDecimal(unscaled, type.scale()); return negate(choice) ? bigDecimal.negate() : bigDecimal; + case GEOMETRY: + case GEOGRAPHY: + // geometry and geography values are stored as WKB + return wkbPoint( + (random.nextDouble() * 360.0) - 180.0, (random.nextDouble() * 180.0) - 90.0); + default: throw new IllegalArgumentException( "Cannot generate random value for unknown type: " + primitive); @@ -202,12 +210,27 @@ public static Object generateDictionaryEncodablePrimitive( byte[] uuidBytes = new byte[16]; random.nextBytes(uuidBytes); return uuidBytes; + case GEOMETRY: + case GEOGRAPHY: + // a small set of distinct points so the WKB column stays dictionary encodable + return wkbPoint(value, value); default: throw new IllegalArgumentException( "Cannot generate random value for unknown type: " + primitive); } } + /** Encodes a point as little-endian WKB, the on-disk representation for geo values. */ + private static byte[] wkbPoint(double xCoord, double yCoord) { + return ByteBuffer.allocate(21) + .order(ByteOrder.LITTLE_ENDIAN) + .put((byte) 1) // byte order: little endian + .putInt(1) // WKB geometry type: Point + .putDouble(xCoord) + .putDouble(yCoord) + .array(); + } + private static final long FIFTY_YEARS_IN_MICROS = (50L * (365 * 3 + 366) * 24 * 60 * 60 * 1_000_000) / 4; private static final long ABOUT_TEN_YEARS_IN_NANOS = 10L * 365 * 24 * 60 * 60 * 1_000_000_000; diff --git a/core/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/core/src/test/java/org/apache/iceberg/data/DataTestHelpers.java index f2e2b4e7fa34..6321d876a985 100644 --- a/core/src/test/java/org/apache/iceberg/data/DataTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/data/DataTestHelpers.java @@ -128,6 +128,8 @@ private static void assertEquals(Type type, Object expected, Object actual) { case UUID: case BINARY: case DECIMAL: + case GEOMETRY: + case GEOGRAPHY: assertThat(actual) .as("Primitive value should be equal to expected for type " + type) .isEqualTo(expected); diff --git a/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java b/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java index 4963052e0877..583513d3f90d 100644 --- a/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java @@ -268,6 +268,8 @@ public Object primitive(Type.PrimitiveType primitive) { Object result = randomValue(primitive, random); switch (primitive.typeId()) { case BINARY: + case GEOMETRY: + case GEOGRAPHY: return ByteBuffer.wrap((byte[]) result); case UUID: return UUID.nameUUIDFromBytes((byte[]) result); diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java index 8c0e2e903ab7..63b6e4769631 100644 --- a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java @@ -68,6 +68,11 @@ protected boolean supportsRowLineage() { return true; } + @Override + protected boolean supportsGeospatial() { + return true; + } + @Override protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, schema); diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index b1fd8f43a578..89024951ccaa 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -224,6 +224,20 @@ public Optional> visit( return Optional.of(ParquetValueReaders.byteBuffers(desc)); } + @Override + public Optional> visit( + LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + // geometry values are pure WKB stored in a BINARY column + return Optional.of(ParquetValueReaders.byteBuffers(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.GeographyLogicalTypeAnnotation geographyLogicalType) { + // geography values are pure WKB stored in a BINARY column + return Optional.of(ParquetValueReaders.byteBuffers(desc)); + } + @Override public Optional> visit( LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java index 1f3c7ab31f1c..dcc93f939d8e 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -268,17 +268,15 @@ public Optional> visit( @Override public Optional> visit( LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryType) { - // reject geometry so it does not silently fall through to the generic binary writer; the - // geospatial value path is a separate follow-up - throw new UnsupportedOperationException("Cannot write geometry value to Parquet"); + // geometry values are pure WKB stored in a BINARY column + return Optional.of(ParquetValueWriters.byteBuffers(desc)); } @Override public Optional> visit( LogicalTypeAnnotation.GeographyLogicalTypeAnnotation geographyType) { - // reject geography so it does not silently fall through to the generic binary writer; the - // geospatial value path is a separate follow-up - throw new UnsupportedOperationException("Cannot write geography value to Parquet"); + // geography values are pure WKB stored in a BINARY column + return Optional.of(ParquetValueWriters.byteBuffers(desc)); } @Override diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java index 0c17a871677b..d4091b520dff 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java @@ -20,10 +20,10 @@ import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.file.Path; import java.util.List; import java.util.Optional; @@ -104,36 +104,71 @@ public void testDataWriter() throws IOException { } @Test - public void testGeospatialWriteIsRejected() { - Schema geometrySchema = + public void testGeospatialRoundTrip() throws IOException { + Schema schema = new Schema( Types.NestedField.required(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "geom", Types.GeometryType.crs84())); - assertThatThrownBy( - () -> - Parquet.writeData(Files.localOutput(createTempFile(temp))) - .schema(geometrySchema) - .createWriterFunc(GenericParquetWriter::create) - .overwrite() - .withSpec(PartitionSpec.unpartitioned()) - .build()) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessage("Cannot write geometry value to Parquet"); - - Schema geographySchema = - new Schema( - Types.NestedField.required(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "geog", Types.GeographyType.crs84())); - assertThatThrownBy( - () -> - Parquet.writeData(Files.localOutput(createTempFile(temp))) - .schema(geographySchema) - .createWriterFunc(GenericParquetWriter::create) - .overwrite() - .withSpec(PartitionSpec.unpartitioned()) - .build()) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessage("Cannot write geography value to Parquet"); + Types.NestedField.optional(2, "geom", Types.GeometryType.crs84()), + Types.NestedField.optional(3, "geog", Types.GeographyType.crs84())); + + GenericRecord record = GenericRecord.create(schema); + List geoRecords = + ImmutableList.of( + record.copy( + ImmutableMap.of("id", 1L, "geom", wkbPoint(30, 10), "geog", wkbPoint(-5, 40))), + // geog is left null + record.copy(ImmutableMap.of("id", 2L, "geom", wkbPoint(0, 0))), + // both geo columns are left null + record.copy(ImmutableMap.of("id", 3L))); + + OutputFile file = Files.localOutput(createTempFile(temp)); + DataWriter dataWriter = + Parquet.writeData(file) + .schema(schema) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build(); + try (dataWriter) { + for (Record geoRecord : geoRecords) { + dataWriter.write(geoRecord); + } + } + + assertThat(dataWriter.toDataFile().recordCount()).isEqualTo(geoRecords.size()); + + List writtenRecords; + try (CloseableIterable reader = + Parquet.read(file.toInputFile()) + .project(schema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build()) { + writtenRecords = Lists.newArrayList(reader); + } + + assertThat(writtenRecords).hasSameSizeAs(geoRecords); + for (int i = 0; i < geoRecords.size(); i++) { + assertThat(writtenRecords.get(i).getField("id")).isEqualTo(geoRecords.get(i).getField("id")); + assertThat(writtenRecords.get(i).getField("geom")) + .as("geometry WKB should round-trip unchanged") + .isEqualTo(geoRecords.get(i).getField("geom")); + assertThat(writtenRecords.get(i).getField("geog")) + .as("geography WKB should round-trip unchanged") + .isEqualTo(geoRecords.get(i).getField("geog")); + } + } + + private static ByteBuffer wkbPoint(double xCoord, double yCoord) { + // little-endian WKB encoding of a point + byte[] wkb = + ByteBuffer.allocate(21) + .order(ByteOrder.LITTLE_ENDIAN) + .put((byte) 1) // byte order: little endian + .putInt(1) // WKB geometry type: Point + .putDouble(xCoord) + .putDouble(yCoord) + .array(); + return ByteBuffer.wrap(wkb); } private void testDataWriter(Schema schema, VariantShreddingFunction variantShreddingFunc)