Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -157,6 +159,12 @@ public static Object generatePrimitive(Type.PrimitiveType primitive, Random rand
BigDecimal bigDecimal = new BigDecimal(unscaled, type.scale());
return negate(choice) ? bigDecimal.negate() : bigDecimal;

case GEOMETRY:
case GEOGRAPHY:
// geometry and geography values are stored as WKB
return wkbPoint(
(random.nextDouble() * 360.0) - 180.0, (random.nextDouble() * 180.0) - 90.0);

default:
throw new IllegalArgumentException(
"Cannot generate random value for unknown type: " + primitive);
Expand Down Expand Up @@ -202,12 +210,27 @@ public static Object generateDictionaryEncodablePrimitive(
byte[] uuidBytes = new byte[16];
random.nextBytes(uuidBytes);
return uuidBytes;
case GEOMETRY:
case GEOGRAPHY:
// a small set of distinct points so the WKB column stays dictionary encodable
return wkbPoint(value, value);
default:
throw new IllegalArgumentException(
"Cannot generate random value for unknown type: " + primitive);
}
}

/** Encodes a point as little-endian WKB, the on-disk representation for geo values. */
private static byte[] wkbPoint(double xCoord, double yCoord) {
return ByteBuffer.allocate(21)
.order(ByteOrder.LITTLE_ENDIAN)
.put((byte) 1) // byte order: little endian
.putInt(1) // WKB geometry type: Point
.putDouble(xCoord)
.putDouble(yCoord)
.array();
}

private static final long FIFTY_YEARS_IN_MICROS =
(50L * (365 * 3 + 366) * 24 * 60 * 60 * 1_000_000) / 4;
private static final long ABOUT_TEN_YEARS_IN_NANOS = 10L * 365 * 24 * 60 * 60 * 1_000_000_000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ private static void assertEquals(Type type, Object expected, Object actual) {
case UUID:
case BINARY:
case DECIMAL:
case GEOMETRY:
case GEOGRAPHY:
assertThat(actual)
.as("Primitive value should be equal to expected for type " + type)
.isEqualTo(expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ public Object primitive(Type.PrimitiveType primitive) {
Object result = randomValue(primitive, random);
switch (primitive.typeId()) {
case BINARY:
case GEOMETRY:
case GEOGRAPHY:
return ByteBuffer.wrap((byte[]) result);
case UUID:
return UUID.nameUUIDFromBytes((byte[]) result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ protected boolean supportsRowLineage() {
return true;
}

@Override
protected boolean supportsGeospatial() {
return true;
}

@Override
protected void writeAndValidate(Schema schema) throws IOException {
writeAndValidate(schema, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,20 @@ public Optional<ParquetValueReader<?>> visit(
return Optional.of(ParquetValueReaders.byteBuffers(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) {
// geometry values are pure WKB stored in a BINARY column
return Optional.of(ParquetValueReaders.byteBuffers(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.GeographyLogicalTypeAnnotation geographyLogicalType) {
// geography values are pure WKB stored in a BINARY column
return Optional.of(ParquetValueReaders.byteBuffers(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,15 @@ public Optional<ParquetValueWriter<?>> visit(
@Override
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryType) {
// reject geometry so it does not silently fall through to the generic binary writer; the
// geospatial value path is a separate follow-up
throw new UnsupportedOperationException("Cannot write geometry value to Parquet");
// geometry values are pure WKB stored in a BINARY column
return Optional.of(ParquetValueWriters.byteBuffers(desc));
}

@Override
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.GeographyLogicalTypeAnnotation geographyType) {
// reject geography so it does not silently fall through to the generic binary writer; the
// geospatial value path is a separate follow-up
throw new UnsupportedOperationException("Cannot write geography value to Parquet");
// geography values are pure WKB stored in a BINARY column
return Optional.of(ParquetValueWriters.byteBuffers(desc));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -104,36 +104,71 @@ public void testDataWriter() throws IOException {
}

@Test
public void testGeospatialWriteIsRejected() {
Schema geometrySchema =
public void testGeospatialRoundTrip() throws IOException {
Schema schema =
new Schema(
Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "geom", Types.GeometryType.crs84()));
assertThatThrownBy(
() ->
Parquet.writeData(Files.localOutput(createTempFile(temp)))
.schema(geometrySchema)
.createWriterFunc(GenericParquetWriter::create)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build())
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Cannot write geometry value to Parquet");

Schema geographySchema =
new Schema(
Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "geog", Types.GeographyType.crs84()));
assertThatThrownBy(
() ->
Parquet.writeData(Files.localOutput(createTempFile(temp)))
.schema(geographySchema)
.createWriterFunc(GenericParquetWriter::create)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build())
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Cannot write geography value to Parquet");
Types.NestedField.optional(2, "geom", Types.GeometryType.crs84()),
Types.NestedField.optional(3, "geog", Types.GeographyType.crs84()));

GenericRecord record = GenericRecord.create(schema);
List<Record> geoRecords =
ImmutableList.of(
record.copy(
ImmutableMap.of("id", 1L, "geom", wkbPoint(30, 10), "geog", wkbPoint(-5, 40))),
// geog is left null
record.copy(ImmutableMap.of("id", 2L, "geom", wkbPoint(0, 0))),
// both geo columns are left null
record.copy(ImmutableMap.of("id", 3L)));

OutputFile file = Files.localOutput(createTempFile(temp));
DataWriter<Record> dataWriter =
Parquet.writeData(file)
.schema(schema)
.createWriterFunc(GenericParquetWriter::create)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build();
try (dataWriter) {
for (Record geoRecord : geoRecords) {
dataWriter.write(geoRecord);
}
}

assertThat(dataWriter.toDataFile().recordCount()).isEqualTo(geoRecords.size());

List<Record> writtenRecords;
try (CloseableIterable<Record> reader =
Parquet.read(file.toInputFile())
.project(schema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build()) {
writtenRecords = Lists.newArrayList(reader);
}

assertThat(writtenRecords).hasSameSizeAs(geoRecords);
for (int i = 0; i < geoRecords.size(); i++) {
assertThat(writtenRecords.get(i).getField("id")).isEqualTo(geoRecords.get(i).getField("id"));
assertThat(writtenRecords.get(i).getField("geom"))
.as("geometry WKB should round-trip unchanged")
.isEqualTo(geoRecords.get(i).getField("geom"));
assertThat(writtenRecords.get(i).getField("geog"))
.as("geography WKB should round-trip unchanged")
.isEqualTo(geoRecords.get(i).getField("geog"));
}
}

private static ByteBuffer wkbPoint(double xCoord, double yCoord) {
// little-endian WKB encoding of a point
byte[] wkb =
ByteBuffer.allocate(21)
.order(ByteOrder.LITTLE_ENDIAN)
.put((byte) 1) // byte order: little endian
.putInt(1) // WKB geometry type: Point
.putDouble(xCoord)
.putDouble(yCoord)
.array();
return ByteBuffer.wrap(wkb);
}

private void testDataWriter(Schema schema, VariantShreddingFunction variantShreddingFunc)
Expand Down