Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions api/src/main/java/org/apache/iceberg/variants/SerializedArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,46 @@ static SerializedArray from(VariantMetadata metadata, byte[] bytes) {
return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}

@VisibleForTesting
static SerializedArray from(VariantMetadata metadata, ByteBuffer value, int header) {
return from(metadata, value, header, 0);
}

static SerializedArray from(VariantMetadata metadata, ByteBuffer value, int header, int depth) {
Comment thread
nssalian marked this conversation as resolved.
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
BasicType basicType = VariantUtil.basicType(header);
Preconditions.checkArgument(
basicType == BasicType.ARRAY, "Invalid array, basic type: " + basicType);
return new SerializedArray(metadata, value, header);
return new SerializedArray(metadata, value, header, depth);
}

private final VariantMetadata metadata;
private final ByteBuffer value;
private final int offsetSize;
private final int offsetListOffset;
private final int dataOffset;
private final int depth;
private final VariantValue[] array;

private SerializedArray(VariantMetadata metadata, ByteBuffer value, int header) {
private SerializedArray(VariantMetadata metadata, ByteBuffer value, int header, int depth) {
this.metadata = metadata;
this.value = value;
this.depth = depth;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
Preconditions.checkArgument(
value.remaining() >= HEADER_SIZE + numElementsSize,
"Invalid variant array: buffer too small for element count field");
int numElements = ByteBuffers.readLittleEndianUnsigned(value, HEADER_SIZE, numElementsSize);
Preconditions.checkArgument(
numElements >= 0, "Invalid variant array: negative element count %s", numElements);
this.offsetListOffset = HEADER_SIZE + numElementsSize;
long offsetTableEnd = (long) offsetListOffset + ((long) numElements + 1L) * offsetSize;
Preconditions.checkArgument(
offsetTableEnd <= value.remaining(),
"Invalid variant array: element count %s exceeds buffer",
numElements);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just realised that this and the parquet-java hardening don't worry about leftover data. "don't do that" is implicit the policy there, being as it is useless.

I wonder what the rust parquet reader does.

this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataOffset here is assigned with plain-int arithmetic, but we just computed the same bound in long as offsetTableEnd and guarded against it.

For offsetSize = 4, (1 + numElements) * offsetSize overflows int around numElements ~536M — a valid 4-byte count. The long guard above still passes, so dataOffset wraps negative and we hit a ByteBuffer underflow at access time instead of the IllegalArgumentException this check is meant to produce.

I'd just reuse the value we already have: this.dataOffset = Math.toIntExact(offsetTableEnd). wdyt?

this.array = new VariantValue[numElements];
}
Expand All @@ -76,8 +93,16 @@ public VariantValue get(int index) {
int next =
ByteBuffers.readLittleEndianUnsigned(
value, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
long dataLen = value.remaining() - (long) dataOffset;
Preconditions.checkArgument(
offset >= 0 && next >= offset && next <= dataLen,
"Invalid variant array: offset range [%s, %s] out of data region [0, %s]",
offset,
next,
dataLen);
array[index] =
VariantValue.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
VariantUtil.fromBuffer(
metadata, VariantUtil.slice(value, dataOffset + offset, next - offset), depth + 1);
}
return array[index];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,32 @@ static SerializedMetadata from(ByteBuffer metadata) {
private SerializedMetadata(ByteBuffer metadata, int header) {
this.isSorted = (header & SORTED_STRINGS) == SORTED_STRINGS;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
Preconditions.checkArgument(
metadata.remaining() >= HEADER_SIZE + offsetSize,
"Invalid variant metadata: buffer too small for dictionary size field");
int dictSize = ByteBuffers.readLittleEndianUnsigned(metadata, HEADER_SIZE, offsetSize);
this.dict = new String[dictSize];
Preconditions.checkArgument(
dictSize >= 0, "Invalid variant metadata: negative dictionary size %s", dictSize);
this.offsetListOffset = HEADER_SIZE + offsetSize;
long offsetTableEnd = (long) offsetListOffset + ((long) dictSize + 1L) * offsetSize;
Preconditions.checkArgument(
offsetTableEnd <= metadata.remaining(),
"Invalid variant metadata: dictionary size %s exceeds buffer",
dictSize);
this.dict = new String[dictSize];
this.dataOffset = offsetListOffset + ((1 + dictSize) * offsetSize);
int endOffset =
dataOffset
+ ByteBuffers.readLittleEndianUnsigned(
metadata, offsetListOffset + (offsetSize * dictSize), offsetSize);
if (endOffset < metadata.limit()) {
int lastOffset =
ByteBuffers.readLittleEndianUnsigned(
metadata, offsetListOffset + (offsetSize * dictSize), offsetSize);
Preconditions.checkArgument(
lastOffset >= 0, "Invalid variant metadata: negative end offset %s", lastOffset);
long endOffsetLong = (long) dataOffset + lastOffset;
Preconditions.checkArgument(
endOffsetLong <= metadata.remaining(),
"Invalid variant metadata: end offset %s exceeds buffer",
endOffsetLong);
int endOffset = (int) endOffsetLong;
if (endOffset < metadata.remaining()) {
this.metadata = VariantUtil.slice(metadata, 0, endOffset);
} else {
this.metadata = metadata;
Expand Down Expand Up @@ -112,6 +129,12 @@ public String get(int index) {
int next =
ByteBuffers.readLittleEndianUnsigned(
metadata, offsetListOffset + (offsetSize * (1 + index)), offsetSize);
Preconditions.checkArgument(
offset >= 0 && next >= offset && (long) dataOffset + next <= metadata.remaining(),
"Invalid variant metadata: dict entry %s offset range [%s, %s] invalid",
index,
offset,
next);
dict[index] = VariantUtil.readString(metadata, dataOffset + offset, next - offset);
}
return dict[index];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ static SerializedObject from(VariantMetadata metadata, byte[] bytes) {
}

static SerializedObject from(VariantMetadata metadata, ByteBuffer value, int header) {
return from(metadata, value, header, 0);
}

static SerializedObject from(VariantMetadata metadata, ByteBuffer value, int header, int depth) {
Preconditions.checkArgument(
value.order() == ByteOrder.LITTLE_ENDIAN, "Unsupported byte order: big endian");
BasicType basicType = VariantUtil.basicType(header);
Preconditions.checkArgument(
basicType == BasicType.OBJECT, "Invalid object, basic type: " + basicType);
return new SerializedObject(metadata, value, header);
return new SerializedObject(metadata, value, header, depth);
}

private final VariantMetadata metadata;
Expand All @@ -60,18 +64,33 @@ static SerializedObject from(VariantMetadata metadata, ByteBuffer value, int hea
private final int[] offsets;
private final int[] lengths;
private final int dataOffset;
private final int depth;
private final VariantValue[] values;

private SerializedObject(VariantMetadata metadata, ByteBuffer value, int header) {
private SerializedObject(VariantMetadata metadata, ByteBuffer value, int header, int depth) {
this.metadata = metadata;
this.value = value;
this.depth = depth;
this.offsetSize = 1 + ((header & OFFSET_SIZE_MASK) >> OFFSET_SIZE_SHIFT);
this.fieldIdSize = 1 + ((header & FIELD_ID_SIZE_MASK) >> FIELD_ID_SIZE_SHIFT);
int numElementsSize = ((header & IS_LARGE) == IS_LARGE) ? 4 : 1;
Preconditions.checkArgument(
value.remaining() >= HEADER_SIZE + numElementsSize,
"Invalid variant object: buffer too small for element count field");
int numElements = ByteBuffers.readLittleEndianUnsigned(value, HEADER_SIZE, numElementsSize);
Preconditions.checkArgument(
numElements >= 0, "Invalid variant object: negative element count %s", numElements);
this.fieldIdListOffset = HEADER_SIZE + numElementsSize;
this.fieldIds = new Integer[numElements];
long dataStart =
(long) fieldIdListOffset
+ (long) numElements * fieldIdSize
+ ((long) numElements + 1L) * offsetSize;
Preconditions.checkArgument(
dataStart <= value.remaining(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same int-overflow as the array side: offsetListOffset and dataOffset below are assigned in plain int while dataStart here is long. numElements * fieldIdSize and (1 + numElements) * offsetSize can overflow before the long guard's bound is ever applied to the int fields, wrapping the data offset negative for a large-but-valid count.

I'd derive both offsets from the long intermediates and Math.toIntExact them so the guard actually protects the value we use.

"Invalid variant object: element count %s exceeds buffer",
numElements);
this.offsetListOffset = fieldIdListOffset + (numElements * fieldIdSize);
this.fieldIds = new Integer[numElements];
this.offsets = new int[numElements];
this.lengths = new int[numElements];
this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
Expand All @@ -96,11 +115,26 @@ private void initOffsetsAndLengths(int numElements) {
int dataLength =
ByteBuffers.readLittleEndianUnsigned(
value, offsetListOffset + (numElements * offsetSize), offsetSize);
long dataLen = value.remaining() - (long) dataOffset;
Preconditions.checkArgument(
dataLength >= 0 && dataLength <= dataLen,
"Invalid variant object: data length %s out of data region [0, %s]",
dataLength,
dataLen);
for (int index = 0; index < numElements; index += 1) {
Preconditions.checkArgument(
offsets[index] >= 0 && offsets[index] <= dataLength,
"Invalid variant object: offset %s out of declared data length %s",
offsets[index],
dataLength);
}
offsetToLength.put(dataLength, 0);

// populate lengths list by sorting offsets
List<Integer> sortedOffsets =
offsetToLength.keySet().stream().sorted().collect(Collectors.toList());
Preconditions.checkArgument(
sortedOffsets.size() == numElements + 1, "Invalid variant object: duplicate field offsets");
for (int index = 0; index < numElements; index += 1) {
int offset = sortedOffsets.get(index);
int length = sortedOffsets.get(index + 1) - offset;
Expand Down Expand Up @@ -163,9 +197,16 @@ public String next() {

private int id(int index) {
if (null == fieldIds[index]) {
fieldIds[index] =
int dictSize = metadata.dictionarySize();
int id =
ByteBuffers.readLittleEndianUnsigned(
value, fieldIdListOffset + (index * fieldIdSize), fieldIdSize);
Preconditions.checkArgument(
id >= 0 && id < dictSize,
"Invalid variant object: field id %s out of range [0, %s)",
id,
dictSize);
fieldIds[index] = id;
}

return fieldIds[index];
Expand All @@ -182,8 +223,10 @@ public VariantValue get(String name) {

if (null == values[index]) {
values[index] =
VariantValue.from(
metadata, VariantUtil.slice(value, dataOffset + offsets[index], lengths[index]));
VariantUtil.fromBuffer(
metadata,
VariantUtil.slice(value, dataOffset + offsets[index], lengths[index]),
depth + 1);
}

return values[index];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,56 @@ static SerializedPrimitive from(ByteBuffer value, int header) {
private SerializedPrimitive(ByteBuffer value, int header) {
this.value = value;
this.type = PhysicalType.from(header >> PRIMITIVE_TYPE_SHIFT);
long requiredBytes = PRIMITIVE_OFFSET + payloadSize(type, value);
Preconditions.checkArgument(
requiredBytes <= value.remaining(),
"Invalid variant primitive: %s payload extends past buffer",
type);
}

private static long payloadSize(PhysicalType type, ByteBuffer value) {
switch (type) {
case NULL:
case BOOLEAN_TRUE:
case BOOLEAN_FALSE:
return 0;
case INT8:
return 1;
case INT16:
return 2;
case INT32:
case DATE:
case FLOAT:
return 4;
case INT64:
case TIMESTAMPTZ:
case TIMESTAMPNTZ:
case TIME:
case TIMESTAMPTZ_NANOS:
case TIMESTAMPNTZ_NANOS:
case DOUBLE:
return 8;
case DECIMAL4:
return 5;
case DECIMAL8:
return 9;
case DECIMAL16:
return 17;
case UUID:
return 16;
case BINARY:
case STRING:
Preconditions.checkArgument(
PRIMITIVE_OFFSET + 4 <= value.remaining(),
"Invalid variant primitive: %s size field extends past buffer",
type);
int size = ByteBuffers.readLittleEndianInt32(value, PRIMITIVE_OFFSET);
Preconditions.checkArgument(
size >= 0, "Invalid variant primitive: negative %s size %s", type, size);
return 4L + size;
}

throw new UnsupportedOperationException("Unsupported primitive type: " + type);
}

private Object read() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ static SerializedShortString from(ByteBuffer value, int header) {
private SerializedShortString(ByteBuffer value, int header) {
this.value = value;
this.length = ((header & LENGTH_MASK) >> LENGTH_SHIFT);
Preconditions.checkArgument(
HEADER_SIZE + length <= value.remaining(),
"Invalid variant short string: length %s exceeds buffer",
length);
}

@Override
Expand Down
32 changes: 32 additions & 0 deletions api/src/main/java/org/apache/iceberg/variants/VariantUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.ByteBuffers;

class VariantUtil {
private static final int BASIC_TYPE_MASK = 0b11;
Expand All @@ -30,8 +32,38 @@ class VariantUtil {
private static final int BASIC_TYPE_OBJECT = 2;
private static final int BASIC_TYPE_ARRAY = 3;

/**
* Maximum permitted nesting depth of a Variant value. The top-level value is depth 0, so a
* Variant may contain up to {@code MAX_VARIANT_DEPTH} nested levels.
*/
static final int MAX_VARIANT_DEPTH = 500;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a non-spec limit — the Parquet VariantEncoding spec doesn't cap nesting depth and Iceberg defers to it. A valid variant with >500 levels written by Spark or another client would read fine elsewhere but throw here at scan time, which reads as silent data loss rather than malformed-input rejection.

What did parquet-java#3562 settle on? If it picked a different value (or no cap), we'd diverge on file interop. I'd at minimum cross-reference that decision in a comment here and confirm 500 is high enough to never reject real data (it's below Jackson's default of 2000).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no real reason, except it was what was in org.apache.parquet.variant.VariantJsonParser

how about we discuss on parquet dev list?

I don't really care, just that there's some limit and that it's broadly consistent.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've started a discussion on the parquet dev list, please get involved. Looking at jackson, seems like 1000 is their limit right now, so copying that would be consistent. Let's see what the mailing list discussion outcome is though and we can all go with that, and add to the test dataset.
FasterXML/jackson-core#943

private VariantUtil() {}

/** Parses a variant value from {@code value} using {@code metadata} for field-name resolution. */
static VariantValue fromBuffer(VariantMetadata metadata, ByteBuffer value, int depth) {
Comment thread
nssalian marked this conversation as resolved.
Preconditions.checkArgument(
depth <= MAX_VARIANT_DEPTH,
"Invalid variant: nesting depth %s exceeds maximum %s",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc on MAX_VARIANT_DEPTH says a variant "may contain up to MAX_VARIANT_DEPTH nested levels," but depth <= MAX_VARIANT_DEPTH with the top-level at depth 0 actually admits 501 levels (depths 0 through 500). It's internally consistent, just off-by-one against the comment.

I'd either flip this to depth < MAX_VARIANT_DEPTH or update the Javadoc to state the counting model exactly. A depth >= 0 lower bound here would also close the direct-call path where depth + 1 could overflow past the cap.

depth,
MAX_VARIANT_DEPTH);
Preconditions.checkArgument(value.remaining() >= 1, "Invalid variant: empty value buffer");
int header = ByteBuffers.readByte(value, 0);
BasicType basicType = basicType(header);
switch (basicType) {
case PRIMITIVE:
return SerializedPrimitive.from(value, header);
case SHORT_STRING:
return SerializedShortString.from(value, header);
case OBJECT:
return SerializedObject.from(metadata, value, header, depth);
case ARRAY:
return SerializedArray.from(metadata, value, header, depth);
}

throw new UnsupportedOperationException("Unsupported basic type: " + basicType);
}

static float readFloat(ByteBuffer buffer, int offset) {
return buffer.getFloat(buffer.position() + offset);
}
Expand Down
16 changes: 1 addition & 15 deletions api/src/main/java/org/apache/iceberg/variants/VariantValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.iceberg.variants;

import java.nio.ByteBuffer;
import org.apache.iceberg.util.ByteBuffers;

/** A variant value. */
public interface VariantValue {
Expand Down Expand Up @@ -62,19 +61,6 @@ default VariantArray asArray() {
}

static VariantValue from(VariantMetadata metadata, ByteBuffer value) {
int header = ByteBuffers.readByte(value, 0);
BasicType basicType = VariantUtil.basicType(header);
switch (basicType) {
case PRIMITIVE:
return SerializedPrimitive.from(value, header);
case SHORT_STRING:
return SerializedShortString.from(value, header);
case OBJECT:
return SerializedObject.from(metadata, value, header);
case ARRAY:
return SerializedArray.from(metadata, value, header);
}

throw new UnsupportedOperationException("Unsupported basic type: " + basicType);
return VariantUtil.fromBuffer(metadata, value, 0);
}
}
Loading