Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions core/src/main/java/org/apache/iceberg/ColumnFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.types.Types;

/** Information about a column file. */
interface ColumnFile {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Efficient Column Updates proposal had sequence_number at the column file level. Is that stale? ie are we dropping per-file granularity?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is stale, we decided not to have per-column file granularity for sequence numbers, not snapshots. Tracking will track a single snapshot ID for the latest column file, that behaves similarly as dv_snapshot_id, and for sequence number (e.g. to answer last_updated_sequence_number) we'll repurpose the data_sequence_number from Tracking. Note this requires us to bump that sequence number whenever adding a new column file.

Types.NestedField FIELD_IDS =
Types.NestedField.required(
161,
"field_ids",
Types.ListType.ofRequired(162, Types.IntegerType.get()),
"Field IDs this column file contains");
Types.NestedField LOCATION =
Types.NestedField.required(
163, "location", Types.StringType.get(), "Location of the column file");
Types.NestedField FILE_SIZE_IN_BYTES =
Types.NestedField.required(
164, "file_size_in_bytes", Types.LongType.get(), "Total column file size in bytes");

static Types.StructType schema() {
return Types.StructType.of(FIELD_IDS, LOCATION, FILE_SIZE_IN_BYTES);
}

/** Returns the field IDs contained in this column file. */
List<Integer> fieldIds();

/** Returns the location of the column file. */
String location();

/** Returns the total size of the column file in bytes. */
long fileSizeInBytes();

/** Copies this column file. */
ColumnFile copy();
}
171 changes: 171 additions & 0 deletions core/src/main/java/org/apache/iceberg/ColumnFileStruct.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.avro.SupportsIndexProjection;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;

/** Mutable {@link StructLike} implementation of {@link ColumnFile}. */
class ColumnFileStruct extends SupportsIndexProjection implements ColumnFile, Serializable {
private static final Types.StructType BASE_TYPE =
Types.StructType.of(ColumnFile.FIELD_IDS, ColumnFile.LOCATION, ColumnFile.FILE_SIZE_IN_BYTES);

private int[] fieldIds = null;
private String location = null;
private long fileSizeInBytes = -1L;

/** Used by internal readers to instantiate this class with a projection schema. */
ColumnFileStruct(Types.StructType projection) {
super(BASE_TYPE, projection);
}

private ColumnFileStruct(int[] fieldIds, String location, long fileSizeInBytes) {
super(BASE_TYPE.fields().size());
this.fieldIds = fieldIds;
this.location = location;
this.fileSizeInBytes = fileSizeInBytes;
}

/** Copy constructor. */
private ColumnFileStruct(ColumnFileStruct toCopy) {
super(toCopy);
this.fieldIds =
toCopy.fieldIds != null ? Arrays.copyOf(toCopy.fieldIds, toCopy.fieldIds.length) : null;
this.location = toCopy.location;
this.fileSizeInBytes = toCopy.fileSizeInBytes;
}

/** Constructor for Java serialization. */
ColumnFileStruct() {
super(BASE_TYPE.fields().size());
}

@Override
public List<Integer> fieldIds() {
return fieldIds != null ? ArrayUtil.toUnmodifiableIntList(fieldIds) : null;
}

@Override
public String location() {
return location;
}

@Override
public long fileSizeInBytes() {
return fileSizeInBytes;
}

@Override
public ColumnFile copy() {
return new ColumnFileStruct(this);
}

@Override
protected <T> T internalGet(int pos, Class<T> javaClass) {
return javaClass.cast(getByPos(pos));
}

private Object getByPos(int pos) {
switch (pos) {
case 0:
return fieldIds();
case 1:
return location;
case 2:
return fileSizeInBytes;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
}

@Override
@SuppressWarnings("unchecked")
protected <T> void internalSet(int pos, T value) {
switch (pos) {
case 0:
this.fieldIds = ArrayUtil.toIntArray((List<Integer>) value);
break;
case 1:
// always coerce to String for Serializable
this.location = value.toString();
break;
case 2:
this.fileSizeInBytes = (long) value;
break;
default:
// ignore the object, it must be from a newer version of the format
}
}

static Builder builder() {
return new Builder();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("field_ids", fieldIds)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Does this print the list object instead of values?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does print the values in the list. Just checked the output:
ColumnFileStruct{field_ids=[1, 2, 3], location=s3://bucket/data/column.parquet, file_size_in_bytes=1024}
I was hesitating to add test coverage for this, but I haven't seen anywhere else testing the output of a toSting function.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ToStringHelper() should handle arrays just fine. No need to add a test for this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the confirmation, @anoopj !

.add("location", location)
.add("file_size_in_bytes", fileSizeInBytes)
.toString();
}

static class Builder {
private int[] fieldIds = null;
private String location = null;
private Long fileSizeInBytes = null;

Builder fieldIds(List<Integer> newFieldIds) {
Preconditions.checkArgument(newFieldIds != null, "Invalid field IDs: null");
Preconditions.checkArgument(!newFieldIds.isEmpty(), "Invalid field IDs: empty");
this.fieldIds = ArrayUtil.toIntArray(newFieldIds);
return this;
}

Builder location(String newLocation) {
Preconditions.checkArgument(newLocation != null, "Invalid location: null");
Preconditions.checkArgument(!newLocation.isEmpty(), "Invalid location: empty");
this.location = newLocation;
return this;
}

Builder fileSizeInBytes(long newFileSizeInBytes) {
Preconditions.checkArgument(
newFileSizeInBytes >= 0,
"Invalid file size in bytes: %s (must be >= 0)",
newFileSizeInBytes);
this.fileSizeInBytes = newFileSizeInBytes;
return this;
}

ColumnFile build() {
Preconditions.checkArgument(fieldIds != null, "Missing required value: fieldIds");
Preconditions.checkArgument(location != null, "Missing required value: location");
Preconditions.checkArgument(
fileSizeInBytes != null, "Missing required value: fileSizeInBytes");
return new ColumnFileStruct(fieldIds, location, fileSizeInBytes);
}
}
}
12 changes: 11 additions & 1 deletion core/src/main/java/org/apache/iceberg/TrackedFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ interface TrackedFile {
"equality_ids",
Types.ListType.ofRequired(136, Types.IntegerType.get()),
"Field ids used to determine row equality in equality delete files");
Types.NestedField COLUMN_FILES =
Types.NestedField.optional(
158,
"column_files",
Types.ListType.ofRequired(159, ColumnFile.schema()),
"Column update files");

static Types.StructType schemaWithContentStats(
Types.StructType partitionType, Types.StructType contentStatsType) {
Expand All @@ -114,7 +120,8 @@ static Types.StructType schemaWithContentStats(
MANIFEST_INFO,
KEY_METADATA,
SPLIT_OFFSETS,
EQUALITY_IDS);
EQUALITY_IDS,
COLUMN_FILES);
}

/** Returns the tracking information for this entry. */
Expand Down Expand Up @@ -165,6 +172,9 @@ static Types.StructType schemaWithContentStats(
/** Returns the set of field IDs used for equality comparison in equality delete files. */
List<Integer> equalityIds();

/** Returns the column files for this file. */
List<ColumnFile> columnFiles();

/** Copies this tracked file. */
TrackedFile copy();

Expand Down
65 changes: 63 additions & 2 deletions core/src/main/java/org/apache/iceberg/TrackedFileBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

class TrackedFileBuilder {
private final long snapshotId;
Expand All @@ -43,12 +47,14 @@ class TrackedFileBuilder {
private ByteBuffer keyMetadata = null;
private List<Long> splitOffsets = null;
private List<Integer> equalityIds = null;
private List<ColumnFile> columnFiles = null;

// tracking-related fields
private Tracking sourceTracking = null;
private boolean dvUpdated = false;
private ByteBuffer deletedPositions = null;
private ByteBuffer replacedPositions = null;
private boolean columnFilesUpdated = false;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: When #16964 is merged, we can drop this and call TrackingBuilder.columnFilesUpdates() right away.


/**
* Creates a builder for a newly added data file entry.
Expand Down Expand Up @@ -142,7 +148,8 @@ private static TrackedFile terminal(TrackedFile source, Tracking tracking) {
source.manifestInfo(),
source.keyMetadata(),
source.splitOffsets(),
source.equalityIds());
source.equalityIds(),
source.columnFiles());
}

private TrackedFileBuilder(FileContent contentType, long snapshotId) {
Expand All @@ -168,6 +175,7 @@ private TrackedFileBuilder(TrackedFile source, long snapshotId) {
this.splitOffsets = source.splitOffsets();
this.equalityIds = source.equalityIds();
this.sourceTracking = source.tracking();
this.columnFiles = source.columnFiles();
}

TrackedFileBuilder formatVersion(int newFormatVersion) {
Expand Down Expand Up @@ -304,6 +312,54 @@ TrackedFileBuilder replacedPositions(ByteBuffer newReplacedPositions) {
return this;
}

TrackedFileBuilder columnFiles(List<ColumnFile> newColumnFiles) {
Preconditions.checkArgument(newColumnFiles != null, "Invalid column files: null");
Preconditions.checkArgument(!newColumnFiles.isEmpty(), "Invalid column files: empty");
Preconditions.checkArgument(
contentType == FileContent.DATA || contentType == FileContent.DATA_MANIFEST,
"Cannot add column files for file with content: %s",
contentType);
validateColumnFiles(newColumnFiles);
Preconditions.checkArgument(
!addingSameColumnFiles(newColumnFiles), "The same column files already added");

this.columnFiles = newColumnFiles;
this.columnFilesUpdated = true;
return this;
}

private boolean addingSameColumnFiles(List<ColumnFile> newColumnFiles) {
if (columnFiles == null || columnFiles.size() != newColumnFiles.size()) {
return false;
}

Set<String> existingLocations =
columnFiles.stream().map(ColumnFile::location).collect(Collectors.toSet());
return newColumnFiles.stream().map(ColumnFile::location).allMatch(existingLocations::contains);
}

private void validateColumnFiles(List<ColumnFile> newColumnFiles) {
Set<String> locations = Sets.newHashSet();
Set<Integer> allFieldIds = Sets.newHashSet();
for (ColumnFile columnFile : newColumnFiles) {
Preconditions.checkArgument(columnFile != null, "Invalid column file: null");

String columnFileLocation = columnFile.location();
List<Integer> fieldIds = columnFile.fieldIds();

Preconditions.checkArgument(
!locations.contains(columnFileLocation),
"Invalid column files: duplicate column file, location: %s",
columnFileLocation);
Preconditions.checkArgument(
Collections.disjoint(allFieldIds, fieldIds),
"Invalid column files: overlapping field IDs across column files");

allFieldIds.addAll(fieldIds);
locations.add(columnFileLocation);
}
}

private static boolean isLeafManifest(FileContent contentType) {
return contentType == FileContent.DATA_MANIFEST || contentType == FileContent.DELETE_MANIFEST;
}
Expand Down Expand Up @@ -340,6 +396,10 @@ TrackedFile build() {
trackingBuilder.replacedPositions(replacedPositions);
}

if (columnFilesUpdated) {
trackingBuilder.columnFilesUpdated();
}

return new TrackedFileStruct(
trackingBuilder.build(),
contentType,
Expand All @@ -356,6 +416,7 @@ TrackedFile build() {
manifestInfo,
keyMetadata,
splitOffsets,
equalityIds);
equalityIds,
columnFiles);
}
}
Loading