Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,16 @@ acceptedBreaks:
old: "method void org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan::<init>(org.apache.iceberg.Table,\
\ org.apache.iceberg.Schema, org.apache.iceberg.TableScanContext)"
justification: "Removing deprecated code"
"1.5.2.2":
com.linkedin.iceberg:iceberg-core:
- code: "java.method.addedToInterface"
new: "method org.apache.hadoop.conf.Configuration org.apache.hadoop.conf.Configurable::getConf()\
\ @ org.apache.iceberg.hadoop.HadoopConfigurable"
justification: "This break is ok"
- code: "java.method.addedToInterface"
new: "method void org.apache.hadoop.conf.Configurable::setConf(org.apache.hadoop.conf.Configuration)\
\ @ org.apache.iceberg.hadoop.HadoopConfigurable"
justification: "This break is ok"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
15 changes: 15 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ default InputFile newInputFile(ManifestFile manifest) {
/** Get a {@link OutputFile} instance to write bytes to the file at the given path. */
OutputFile newOutputFile(String path);

/**
* Get a {@link OutputFile} instance to write bytes to the file at the given path with specific
* properties.
*
* @param path the file path
* @param properties the properties for the output file
* @return an OutputFile for writing data
*/
default OutputFile newOutputFile(String path, Map<String, String> properties) {
throw new UnsupportedOperationException(
String.format(
"Creating output file at: %s with properties: %s is not " + "supported",
path, properties));
}
Comment thread
shanthoosh marked this conversation as resolved.

/** Delete the file at the given path. */
void deleteFile(String path);

Expand Down
5 changes: 5 additions & 0 deletions baseline.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,9 @@ subprojects {
quiet = false
}
}

// Apply revapi plugin to specific projects for API compatibility checking
if (project.name in ['iceberg-api', 'iceberg-core', 'iceberg-parquet', 'iceberg-orc', 'iceberg-common', 'iceberg-data']) {
apply plugin: 'com.palantir.revapi'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public OutputFile newOutputFile(String path) {
return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get());
}

@Override
public OutputFile newOutputFile(String path, Map<String, String> fileProperties) {
return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get(), fileProperties);
}

@Override
public void deleteFile(String path) {
Path toDelete = new Path(path);
Expand Down
65 changes: 61 additions & 4 deletions core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.iceberg.hadoop;

import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -29,14 +31,20 @@
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PositionOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** {@link OutputFile} implementation using the Hadoop {@link FileSystem} API. */
public class HadoopOutputFile implements OutputFile, NativelyEncryptedFile {

private static final Logger LOG = LoggerFactory.getLogger(HadoopOutputFile.class);
private static final short DEFAULT_REPLICATION_FACTOR = 3;
private final FileSystem fs;
private final Path path;
private final Configuration conf;
private final short replication;
private NativeFileCryptoParameters nativeEncryptionParameters;

public static OutputFile fromLocation(CharSequence location, Configuration conf) {
Expand All @@ -54,24 +62,61 @@ public static OutputFile fromPath(Path path, Configuration conf) {
return fromPath(path, fs, conf);
}

public static OutputFile fromPath(Path path, Configuration conf, Map<String, String> properties) {
short replicationFactor = DEFAULT_REPLICATION_FACTOR;
if (properties != null) {
String replicationFactorAsString = properties.get(OutputFileFactory.FILE_REPLICATION_FACTOR);
if (replicationFactorAsString != null) {
try {
replicationFactor = Short.parseShort(replicationFactorAsString);
} catch (NumberFormatException e) {
LOG.warn(
"Failed to parse replication factor: {}, defaulting to {}",
replicationFactorAsString,
DEFAULT_REPLICATION_FACTOR,
e);
}
}
}
return fromPath(path, conf, replicationFactor);
}

public static OutputFile fromPath(Path path, Configuration conf, short replication) {
FileSystem fs = Util.getFs(path, conf);
return new HadoopOutputFile(fs, path, conf, replication);
}

public static OutputFile fromPath(Path path, FileSystem fs) {
return fromPath(path, fs, fs.getConf());
}

public static OutputFile fromPath(Path path, FileSystem fs, Configuration conf) {
return new HadoopOutputFile(fs, path, conf);
return new HadoopOutputFile(fs, path, conf, (short) -1);
}

private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) {
private HadoopOutputFile(FileSystem fs, Path path, Configuration conf, short replication) {
this.fs = fs;
this.path = path;
this.conf = conf;
this.replication = replication;
}

@Override
public PositionOutputStream create() {
try {
return HadoopStreams.wrap(fs.create(path, false /* createOrOverwrite */));
if (replication > 0) {
return HadoopStreams.wrap(
fs.create(
path,
false /* overwrite */,
conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
Comment thread
sumedhsakdeo marked this conversation as resolved.
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
replication,
fs.getDefaultBlockSize(path)));
Comment thread
sumedhsakdeo marked this conversation as resolved.
} else {
return HadoopStreams.wrap(fs.create(path, false /* createOrOverwrite */));
}
} catch (FileAlreadyExistsException e) {
throw new AlreadyExistsException(e, "Path already exists: %s", path);
} catch (IOException e) {
Expand All @@ -82,7 +127,19 @@ public PositionOutputStream create() {
@Override
public PositionOutputStream createOrOverwrite() {
try {
return HadoopStreams.wrap(fs.create(path, true /* createOrOverwrite */));
if (replication > 0) {
return HadoopStreams.wrap(
fs.create(
path,
true /* overwrite */,
conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
Comment thread
sumedhsakdeo marked this conversation as resolved.
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
replication,
fs.getDefaultBlockSize(path)));
} else {
return HadoopStreams.wrap(fs.create(path, true /* createOrOverwrite */));
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create file: %s", path);
}
Expand Down
45 changes: 41 additions & 4 deletions core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
Expand All @@ -30,9 +32,11 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

/** Factory responsible for generating unique but recognizable data/delete file names. */
public class OutputFileFactory {
public static final String FILE_REPLICATION_FACTOR = "file-replication-factor";
private final PartitionSpec defaultSpec;
private final FileFormat format;
private final LocationProvider locations;
Expand All @@ -48,6 +52,7 @@ public class OutputFileFactory {
private final String operationId;
private final AtomicInteger fileCount = new AtomicInteger(0);
private final String suffix;
private final Optional<Short> replicationFactor;

/**
* Constructor with specific operationId. The [partitionId, taskId, operationId] triplet has to be
Expand All @@ -63,6 +68,7 @@ public class OutputFileFactory {
* @param taskId Second part of the file name
* @param operationId Third part of the file name
* @param suffix Suffix part of the file name
* @param replicationFactor the replication factor of output file
*/
private OutputFileFactory(
PartitionSpec spec,
Expand All @@ -73,7 +79,8 @@ private OutputFileFactory(
int partitionId,
long taskId,
String operationId,
String suffix) {
String suffix,
Optional<Short> replicationFactor) {
Comment thread
shanthoosh marked this conversation as resolved.
this.defaultSpec = spec;
this.format = format;
this.locations = locations;
Expand All @@ -83,6 +90,7 @@ private OutputFileFactory(
this.taskId = taskId;
this.operationId = operationId;
this.suffix = suffix;
this.replicationFactor = replicationFactor;
}

public static Builder builderFor(Table table, int partitionId, long taskId) {
Expand All @@ -102,10 +110,25 @@ private String generateFilename() {

/** Generates an {@link EncryptedOutputFile} for unpartitioned writes. */
public EncryptedOutputFile newOutputFile() {
OutputFile file = ioSupplier.get().newOutputFile(locations.newDataLocation(generateFilename()));
OutputFile file;
if (replicationFactor.isPresent()) {
file =
ioSupplier
.get()
.newOutputFile(locations.newDataLocation(generateFilename()), getProperties());
} else {
file = ioSupplier.get().newOutputFile(locations.newDataLocation(generateFilename()));
}
return encryptionManager.encrypt(file);
}

private Map<String, String> getProperties() {
Map<String, String> properties = Maps.newHashMap();
replicationFactor.ifPresent(
replication -> properties.put(FILE_REPLICATION_FACTOR, String.valueOf(replication)));
return properties;
}

/** Generates an {@link EncryptedOutputFile} for partitioned writes in the default spec. */
public EncryptedOutputFile newOutputFile(StructLike partition) {
return newOutputFile(defaultSpec, partition);
Expand All @@ -114,7 +137,12 @@ public EncryptedOutputFile newOutputFile(StructLike partition) {
/** Generates an {@link EncryptedOutputFile} for partitioned writes in a given spec. */
public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partition) {
String newDataLocation = locations.newDataLocation(spec, partition, generateFilename());
OutputFile rawOutputFile = ioSupplier.get().newOutputFile(newDataLocation);
OutputFile rawOutputFile;
if (replicationFactor.isPresent()) {
rawOutputFile = ioSupplier.get().newOutputFile(newDataLocation, getProperties());
} else {
rawOutputFile = ioSupplier.get().newOutputFile(newDataLocation);
}
return encryptionManager.encrypt(rawOutputFile);
}

Expand All @@ -127,6 +155,7 @@ public static class Builder {
private FileFormat format;
private String suffix;
private Supplier<FileIO> ioSupplier;
private Optional<Short> replicationFactorOptional = Optional.empty();

private Builder(Table table, int partitionId, long taskId) {
this.table = table;
Expand All @@ -151,6 +180,13 @@ public Builder operationId(String newOperationId) {
return this;
}

public Builder replicationFactor(short replicationFactor) {
if (replicationFactor > 0) {
this.replicationFactorOptional = Optional.of(replicationFactor);
}
return this;
}

public Builder format(FileFormat newFormat) {
this.format = newFormat;
return this;
Expand Down Expand Up @@ -185,7 +221,8 @@ public OutputFileFactory build() {
partitionId,
taskId,
operationId,
suffix);
suffix,
replicationFactorOptional);
}
}
}
Loading