-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Core: Basic fields and schemas for column files #16285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg; | ||
|
|
||
| import java.util.List; | ||
| import org.apache.iceberg.types.Types; | ||
|
|
||
| /** Information about a column file. */ | ||
| interface ColumnFile { | ||
| Types.NestedField FIELD_IDS = | ||
| Types.NestedField.required( | ||
| 161, | ||
| "field_ids", | ||
| Types.ListType.ofRequired(162, Types.IntegerType.get()), | ||
| "Field IDs this column file contains"); | ||
| Types.NestedField LOCATION = | ||
| Types.NestedField.required( | ||
| 163, "location", Types.StringType.get(), "Location of the column file"); | ||
| Types.NestedField FILE_SIZE_IN_BYTES = | ||
| Types.NestedField.required( | ||
| 164, "file_size_in_bytes", Types.LongType.get(), "Total column file size in bytes"); | ||
|
|
||
| static Types.StructType schema() { | ||
| return Types.StructType.of(FIELD_IDS, LOCATION, FILE_SIZE_IN_BYTES); | ||
| } | ||
|
|
||
| /** Returns the field IDs contained in this column file. */ | ||
| List<Integer> fieldIds(); | ||
|
|
||
| /** Returns the location of the column file. */ | ||
| String location(); | ||
|
|
||
| /** Returns the total size of the column file in bytes. */ | ||
| long fileSizeInBytes(); | ||
|
|
||
| /** Copies this column file. */ | ||
| ColumnFile copy(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,171 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg; | ||
|
|
||
| import java.io.Serializable; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import org.apache.iceberg.avro.SupportsIndexProjection; | ||
| import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.apache.iceberg.util.ArrayUtil; | ||
|
|
||
| /** Mutable {@link StructLike} implementation of {@link ColumnFile}. */ | ||
| class ColumnFileStruct extends SupportsIndexProjection implements ColumnFile, Serializable { | ||
| private static final Types.StructType BASE_TYPE = | ||
| Types.StructType.of(ColumnFile.FIELD_IDS, ColumnFile.LOCATION, ColumnFile.FILE_SIZE_IN_BYTES); | ||
|
|
||
| private int[] fieldIds = null; | ||
| private String location = null; | ||
| private long fileSizeInBytes = -1L; | ||
|
|
||
| /** Used by internal readers to instantiate this class with a projection schema. */ | ||
| ColumnFileStruct(Types.StructType projection) { | ||
| super(BASE_TYPE, projection); | ||
| } | ||
|
|
||
| private ColumnFileStruct(int[] fieldIds, String location, long fileSizeInBytes) { | ||
| super(BASE_TYPE.fields().size()); | ||
| this.fieldIds = fieldIds; | ||
| this.location = location; | ||
| this.fileSizeInBytes = fileSizeInBytes; | ||
| } | ||
|
|
||
| /** Copy constructor. */ | ||
| private ColumnFileStruct(ColumnFileStruct toCopy) { | ||
| super(toCopy); | ||
| this.fieldIds = | ||
| toCopy.fieldIds != null ? Arrays.copyOf(toCopy.fieldIds, toCopy.fieldIds.length) : null; | ||
| this.location = toCopy.location; | ||
| this.fileSizeInBytes = toCopy.fileSizeInBytes; | ||
| } | ||
|
|
||
| /** Constructor for Java serialization. */ | ||
| ColumnFileStruct() { | ||
| super(BASE_TYPE.fields().size()); | ||
| } | ||
|
|
||
| @Override | ||
| public List<Integer> fieldIds() { | ||
| return fieldIds != null ? ArrayUtil.toUnmodifiableIntList(fieldIds) : null; | ||
| } | ||
|
|
||
| @Override | ||
| public String location() { | ||
| return location; | ||
| } | ||
|
|
||
| @Override | ||
| public long fileSizeInBytes() { | ||
| return fileSizeInBytes; | ||
| } | ||
|
|
||
| @Override | ||
| public ColumnFile copy() { | ||
| return new ColumnFileStruct(this); | ||
| } | ||
|
|
||
| @Override | ||
| protected <T> T internalGet(int pos, Class<T> javaClass) { | ||
| return javaClass.cast(getByPos(pos)); | ||
| } | ||
|
|
||
| private Object getByPos(int pos) { | ||
| switch (pos) { | ||
| case 0: | ||
| return fieldIds(); | ||
| case 1: | ||
| return location; | ||
| case 2: | ||
| return fileSizeInBytes; | ||
| default: | ||
| throw new UnsupportedOperationException("Unknown field ordinal: " + pos); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| @SuppressWarnings("unchecked") | ||
| protected <T> void internalSet(int pos, T value) { | ||
| switch (pos) { | ||
| case 0: | ||
| this.fieldIds = ArrayUtil.toIntArray((List<Integer>) value); | ||
| break; | ||
| case 1: | ||
| // always coerce to String for Serializable | ||
| this.location = value.toString(); | ||
| break; | ||
| case 2: | ||
| this.fileSizeInBytes = (long) value; | ||
| break; | ||
| default: | ||
| // ignore the object, it must be from a newer version of the format | ||
| } | ||
| } | ||
|
|
||
| static Builder builder() { | ||
| return new Builder(); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return MoreObjects.toStringHelper(this) | ||
| .add("field_ids", fieldIds) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Does this print the list object instead of values?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does print the values in the list. Just checked the output:
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ToStringHelper() should handle arrays just fine. No need to add a test for this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the confirmation, @anoopj ! |
||
| .add("location", location) | ||
| .add("file_size_in_bytes", fileSizeInBytes) | ||
| .toString(); | ||
| } | ||
|
|
||
| static class Builder { | ||
| private int[] fieldIds = null; | ||
| private String location = null; | ||
| private Long fileSizeInBytes = null; | ||
|
|
||
| Builder fieldIds(List<Integer> newFieldIds) { | ||
| Preconditions.checkArgument(newFieldIds != null, "Invalid field IDs: null"); | ||
| Preconditions.checkArgument(!newFieldIds.isEmpty(), "Invalid field IDs: empty"); | ||
| this.fieldIds = ArrayUtil.toIntArray(newFieldIds); | ||
| return this; | ||
| } | ||
|
|
||
| Builder location(String newLocation) { | ||
| Preconditions.checkArgument(newLocation != null, "Invalid location: null"); | ||
| Preconditions.checkArgument(!newLocation.isEmpty(), "Invalid location: empty"); | ||
| this.location = newLocation; | ||
| return this; | ||
| } | ||
|
|
||
| Builder fileSizeInBytes(long newFileSizeInBytes) { | ||
| Preconditions.checkArgument( | ||
| newFileSizeInBytes >= 0, | ||
| "Invalid file size in bytes: %s (must be >= 0)", | ||
| newFileSizeInBytes); | ||
| this.fileSizeInBytes = newFileSizeInBytes; | ||
| return this; | ||
| } | ||
|
|
||
| ColumnFile build() { | ||
| Preconditions.checkArgument(fieldIds != null, "Missing required value: fieldIds"); | ||
| Preconditions.checkArgument(location != null, "Missing required value: location"); | ||
| Preconditions.checkArgument( | ||
| fileSizeInBytes != null, "Missing required value: fileSizeInBytes"); | ||
| return new ColumnFileStruct(fieldIds, location, fileSizeInBytes); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,8 +19,12 @@ | |
| package org.apache.iceberg; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
|
|
||
| class TrackedFileBuilder { | ||
| private final long snapshotId; | ||
|
|
@@ -43,12 +47,14 @@ class TrackedFileBuilder { | |
| private ByteBuffer keyMetadata = null; | ||
| private List<Long> splitOffsets = null; | ||
| private List<Integer> equalityIds = null; | ||
| private List<ColumnFile> columnFiles = null; | ||
|
|
||
| // tracking-related fields | ||
| private Tracking sourceTracking = null; | ||
| private boolean dvUpdated = false; | ||
| private ByteBuffer deletedPositions = null; | ||
| private ByteBuffer replacedPositions = null; | ||
| private boolean columnFilesUpdated = false; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to self: When #16964 is merged, we can drop this and call |
||
|
|
||
| /** | ||
| * Creates a builder for a newly added data file entry. | ||
|
|
@@ -142,7 +148,8 @@ private static TrackedFile terminal(TrackedFile source, Tracking tracking) { | |
| source.manifestInfo(), | ||
| source.keyMetadata(), | ||
| source.splitOffsets(), | ||
| source.equalityIds()); | ||
| source.equalityIds(), | ||
| source.columnFiles()); | ||
| } | ||
|
|
||
| private TrackedFileBuilder(FileContent contentType, long snapshotId) { | ||
|
|
@@ -168,6 +175,7 @@ private TrackedFileBuilder(TrackedFile source, long snapshotId) { | |
| this.splitOffsets = source.splitOffsets(); | ||
| this.equalityIds = source.equalityIds(); | ||
| this.sourceTracking = source.tracking(); | ||
| this.columnFiles = source.columnFiles(); | ||
| } | ||
|
|
||
| TrackedFileBuilder formatVersion(int newFormatVersion) { | ||
|
|
@@ -304,6 +312,54 @@ TrackedFileBuilder replacedPositions(ByteBuffer newReplacedPositions) { | |
| return this; | ||
| } | ||
|
|
||
| TrackedFileBuilder columnFiles(List<ColumnFile> newColumnFiles) { | ||
| Preconditions.checkArgument(newColumnFiles != null, "Invalid column files: null"); | ||
| Preconditions.checkArgument(!newColumnFiles.isEmpty(), "Invalid column files: empty"); | ||
| Preconditions.checkArgument( | ||
| contentType == FileContent.DATA || contentType == FileContent.DATA_MANIFEST, | ||
| "Cannot add column files for file with content: %s", | ||
| contentType); | ||
| validateColumnFiles(newColumnFiles); | ||
| Preconditions.checkArgument( | ||
| !addingSameColumnFiles(newColumnFiles), "The same column files already added"); | ||
|
|
||
| this.columnFiles = newColumnFiles; | ||
| this.columnFilesUpdated = true; | ||
| return this; | ||
| } | ||
|
|
||
| private boolean addingSameColumnFiles(List<ColumnFile> newColumnFiles) { | ||
| if (columnFiles == null || columnFiles.size() != newColumnFiles.size()) { | ||
| return false; | ||
| } | ||
|
|
||
| Set<String> existingLocations = | ||
| columnFiles.stream().map(ColumnFile::location).collect(Collectors.toSet()); | ||
| return newColumnFiles.stream().map(ColumnFile::location).allMatch(existingLocations::contains); | ||
| } | ||
|
|
||
| private void validateColumnFiles(List<ColumnFile> newColumnFiles) { | ||
| Set<String> locations = Sets.newHashSet(); | ||
| Set<Integer> allFieldIds = Sets.newHashSet(); | ||
| for (ColumnFile columnFile : newColumnFiles) { | ||
| Preconditions.checkArgument(columnFile != null, "Invalid column file: null"); | ||
|
|
||
| String columnFileLocation = columnFile.location(); | ||
| List<Integer> fieldIds = columnFile.fieldIds(); | ||
|
|
||
| Preconditions.checkArgument( | ||
| !locations.contains(columnFileLocation), | ||
| "Invalid column files: duplicate column file, location: %s", | ||
| columnFileLocation); | ||
| Preconditions.checkArgument( | ||
| Collections.disjoint(allFieldIds, fieldIds), | ||
| "Invalid column files: overlapping field IDs across column files"); | ||
|
|
||
| allFieldIds.addAll(fieldIds); | ||
| locations.add(columnFileLocation); | ||
| } | ||
| } | ||
|
|
||
| private static boolean isLeafManifest(FileContent contentType) { | ||
| return contentType == FileContent.DATA_MANIFEST || contentType == FileContent.DELETE_MANIFEST; | ||
| } | ||
|
|
@@ -340,6 +396,10 @@ TrackedFile build() { | |
| trackingBuilder.replacedPositions(replacedPositions); | ||
| } | ||
|
|
||
| if (columnFilesUpdated) { | ||
| trackingBuilder.columnFilesUpdated(); | ||
| } | ||
|
|
||
| return new TrackedFileStruct( | ||
| trackingBuilder.build(), | ||
| contentType, | ||
|
|
@@ -356,6 +416,7 @@ TrackedFile build() { | |
| manifestInfo, | ||
| keyMetadata, | ||
| splitOffsets, | ||
| equalityIds); | ||
| equalityIds, | ||
| columnFiles); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Efficient Column Updates proposal had
sequence_numberat the column file level. Is that stale? ie are we dropping per-file granularity?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is stale, we decided not to have per-column file granularity for sequence numbers, not snapshots.
Trackingwill track a single snapshot ID for the latest column file, that behaves similarly asdv_snapshot_id, and for sequence number (e.g. to answerlast_updated_sequence_number) we'll repurpose the data_sequence_number fromTracking. Note this requires us to bump that sequence number whenever adding a new column file.