diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionTarget.java new file mode 100644 index 000000000..e18f6c9f0 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionTarget.java @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import lombok.Getter; +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; + +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import com.google.common.annotations.VisibleForTesting; + +import io.delta.kernel.Operation; +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.Transaction; +import io.delta.kernel.TransactionBuilder; +import io.delta.kernel.TransactionCommitResult; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.TableNotFoundException; +import io.delta.kernel.hook.PostCommitHook; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.RowBackedAction; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterable; +import io.delta.kernel.utils.CloseableIterator; + +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.exception.UpdateException; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.metadata.TableSyncMetadata; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.InternalFilesDiff; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.spi.sync.ConversionTarget; + +/** + * Implementation of {@link ConversionTarget} for Delta Lake using the Delta Kernel API. + * + *

This implementation uses Delta Kernel (io.delta.kernel) instead of Delta Standalone for write + * operations, providing better compatibility with cloud storage (S3, GCS, Azure Blob Storage, HDFS) + * and improved support for Delta Lake 3.x features. + * + *

Initialization: This class supports two initialization patterns: + * + *

+ * + *

Important: Do not mix initialization patterns. If you use the parameterized + * constructor, do not call {@link #init(TargetTable, Configuration)} afterward, as it will + * overwrite the custom Engine. + * + *

Exception Handling: This implementation only catches {@link + * io.delta.kernel.exceptions.TableNotFoundException} when checking for table existence, allowing + * other exceptions (network errors, permission issues, corrupted metadata) to propagate rather than + * being silently masked. This ensures real errors are visible and fail fast. + * + *

Known Limitations: + * + *

+ * + *

Implementation Choice: Delta Kernel API was chosen over Delta Standalone to: + * + *

+ * + * @see ConversionTarget + * @see io.delta.kernel.Table + * @see io.delta.kernel.Transaction + */ +@Log4j2 +public class DeltaKernelConversionTarget implements ConversionTarget { + private DeltaKernelSchemaExtractor schemaExtractor; + private DeltaKernelPartitionExtractor partitionExtractor; + private DeltaKernelDataFileUpdatesExtractor dataKernelFileUpdatesExtractor; + + private String basePath; + private long logRetentionInHours; + private DeltaKernelConversionTarget.TransactionState transactionState; + private Engine engine; + + /** + * No-arg constructor for ServiceLoader instantiation. Must call {@link #init(TargetTable, + * Configuration)} before use. + */ + public DeltaKernelConversionTarget() {} + + /** + * Creates a fully initialized DeltaKernelConversionTarget with custom Engine. Typically used in + * tests. Do not call {@link #init(TargetTable, Configuration)} after this. + * + * @param targetTable the target table configuration + * @param engine custom Delta Kernel engine instance + */ + public DeltaKernelConversionTarget(TargetTable targetTable, Engine engine) { + this( + targetTable.getBasePath(), + targetTable.getMetadataRetention().toHours(), + engine, + DeltaKernelSchemaExtractor.getInstance(), + DeltaKernelPartitionExtractor.getInstance(), + DeltaKernelDataFileUpdatesExtractor.builder() + .engine(engine) + .basePath(targetTable.getBasePath()) + // Column statistics are not needed for conversion operations + .includeColumnStats(false) + .build()); + } + + @VisibleForTesting + DeltaKernelConversionTarget( + String tableDataPath, + long logRetentionInHours, + Engine engine, + DeltaKernelSchemaExtractor schemaExtractor, + DeltaKernelPartitionExtractor partitionExtractor, + DeltaKernelDataFileUpdatesExtractor dataKernelFileUpdatesExtractor) { + _init( + tableDataPath, + logRetentionInHours, + engine, + schemaExtractor, + partitionExtractor, + dataKernelFileUpdatesExtractor); + } + + /** + * Private initialization helper to avoid code duplication between constructor and init() paths. + */ + private void _init( + String tableDataPath, + long logRetentionInHours, + Engine engine, + DeltaKernelSchemaExtractor schemaExtractor, + DeltaKernelPartitionExtractor partitionExtractor, + DeltaKernelDataFileUpdatesExtractor dataKernelFileUpdatesExtractor) { + this.basePath = tableDataPath; + this.schemaExtractor = schemaExtractor; + this.partitionExtractor = partitionExtractor; + this.dataKernelFileUpdatesExtractor = dataKernelFileUpdatesExtractor; + this.engine = engine; + this.logRetentionInHours = logRetentionInHours; + } + + @Override + public void init(TargetTable targetTable, Configuration configuration) { + Engine engine = DefaultEngine.create(configuration); + + _init( + targetTable.getBasePath(), + targetTable.getMetadataRetention().toHours(), + engine, + DeltaKernelSchemaExtractor.getInstance(), + DeltaKernelPartitionExtractor.getInstance(), + DeltaKernelDataFileUpdatesExtractor.builder() + .engine(engine) + .basePath(targetTable.getBasePath()) + // Column statistics are not needed for conversion operations + .includeColumnStats(false) + .build()); + } + + @Override + public void beginSync(InternalTable table) { + this.transactionState = + new DeltaKernelConversionTarget.TransactionState(engine, logRetentionInHours); + } + + @Override + public void syncSchema(InternalSchema schema) { + transactionState.setLatestSchema(schema); + } + + @Override + public void syncPartitionSpec(List partitionSpec) { + if (partitionSpec != null) { + Map spec = + partitionExtractor.convertToDeltaPartitionFormat(partitionSpec); + for (Map.Entry partitionEntry : spec.entrySet()) { + String partitionColumnName = partitionEntry.getKey(); + StructField partitionField = partitionEntry.getValue(); + + transactionState.addPartitionColumn(partitionColumnName); + if (partitionField != null + && transactionState.getLatestSchema().fields().stream() + .noneMatch(field -> field.getName().equals(partitionField.getName()))) { + // add generated columns to schema. + transactionState.addColumn(partitionField); + } + } + } + } + + @Override + public void syncMetadata(TableSyncMetadata metadata) { + transactionState.setMetadata(metadata); + } + + @Override + public void syncFilesForSnapshot(List partitionedDataFiles) { + Table table = Table.forPath(engine, basePath); + transactionState.setActions( + dataKernelFileUpdatesExtractor.applySnapshot( + table, partitionedDataFiles, transactionState.getLatestSchemaInternal())); + } + + @Override + public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) { + Table table = Table.forPath(engine, basePath); + transactionState.setActions( + dataKernelFileUpdatesExtractor.applyDiff( + internalFilesDiff, + transactionState.getLatestSchemaInternal(), + table.getPath(engine), + table.getLatestSnapshot(engine).getSchema())); + } + + @Override + public void completeSync() { + transactionState.commitTransaction(); + transactionState = null; + } + + @Override + public Optional getTableMetadata() { + Table table = Table.forPath(engine, basePath); + Snapshot snapshot = table.getLatestSnapshot(engine); + + // WORKAROUND: Cast to SnapshotImpl (internal class) to access metadata configuration. + // Delta Kernel 4.0.0 does not provide a public API to access table metadata/configuration. + // This cast is brittle and may break on Kernel version upgrades. + Metadata metadata = ((SnapshotImpl) snapshot).getMetadata(); + + // Get configuration from metadata + Map configuration = metadata.getConfiguration(); + String metadataJson = configuration.get(TableSyncMetadata.XTABLE_METADATA); + + return TableSyncMetadata.fromJson(metadataJson); + } + + @Override + public String getTableFormat() { + return TableFormat.DELTA; + } + + @Override + public Optional getTargetCommitIdentifier(String sourceIdentifier) { + // Delta Kernel 4.0.0 does not support commit tags in commitInfo, which are required for + // source-to-target commit identifier mapping. This limitation is documented in: + // https://github.com/delta-io/delta/issues/6167 + // + // Unlike DeltaConversionTarget (which uses Delta Standalone with commit tag support), + // DeltaKernelConversionTarget cannot retrieve commit tags from Delta Kernel's API. + // Rather than silently scanning all commits (O(n) performance cost) and always returning + // empty, we explicitly throw an exception to indicate this feature is unsupported. + // + // When Delta Kernel adds commit tag support, this method can be reimplemented to: + // 1. Scan commit history using tableImpl.getChanges(engine, 0, currentVersion, actionSet) + // 2. Extract tags from CommitInfo.tags MapValue + // 3. Parse XTABLE_METADATA from tags and match sourceIdentifier + throw new NotSupportedException( + "Source-to-target commit identifier mapping is not supported in DeltaKernelConversionTarget. " + + "Delta Kernel 4.0.0 does not support commit tags in commitInfo. " + + "See: https://github.com/delta-io/delta/issues/6167"); + } + + private class TransactionState { + private final Engine engine; + private final long retentionInHours; + private final List partitionColumns; + @Getter private StructType latestSchema; + @Getter private InternalSchema latestSchemaInternal; + private TableSyncMetadata metadata; + private List actions; + + private TransactionState(Engine engine, long retentionInHours) { + this.engine = engine; + this.partitionColumns = new ArrayList<>(); + this.retentionInHours = retentionInHours; + + try { + Table table = Table.forPath(engine, basePath); + this.latestSchema = table.getLatestSnapshot(engine).getSchema(); + } catch (TableNotFoundException e) { + // Expected: table doesn't exist yet on first sync + this.latestSchema = null; + } + // Let other exceptions propagate (network issues, permissions, corrupted metadata, etc.) + } + + /** + * Adds a partition column name to the list. Package-private to allow access from outer class. + */ + void addPartitionColumn(String columnName) { + partitionColumns.add(columnName); + } + + void setMetadata(TableSyncMetadata metadata) { + this.metadata = metadata; + } + + /** + * Sets the actions to be committed. Converts from Scala Seq to Java List for internal storage. + */ + void setActions(Seq scalaActions) { + this.actions = JavaConverters.seqAsJavaList(scalaActions); + } + + private void addColumn(StructField field) { + latestSchema = latestSchema.add(field); + latestSchemaInternal = schemaExtractor.toInternalSchema(latestSchema); + } + + private void setLatestSchema(InternalSchema schema) { + this.latestSchemaInternal = schema; + this.latestSchema = schemaExtractor.fromInternalSchema(schema); + } + + private void commitTransaction() { + boolean tableExists = checkTableExists(); + + Operation operation = tableExists ? Operation.WRITE : Operation.CREATE_TABLE; + + if (!tableExists) { + File tableDir = new File(basePath); + if (!tableDir.exists()) { + tableDir.mkdirs(); + } + } + + Table table = Table.forPath(engine, basePath); + TransactionBuilder txnBuilder = + table.createTransactionBuilder(engine, "XTable Delta Sync", operation); + + // Schema evolution for existing tables is handled via Metadata actions manually + // as Delta Kernel 4.0.0 doesn't support schema evolution via withSchema + if (!tableExists) { + txnBuilder = txnBuilder.withSchema(engine, latestSchema); + + if (!partitionColumns.isEmpty()) { + txnBuilder = txnBuilder.withPartitionColumns(engine, partitionColumns); + } + } + + Map tableProperties = getConfigurationsForDeltaSync(); + txnBuilder = txnBuilder.withTableProperties(engine, tableProperties); + + Transaction txn = txnBuilder.build(engine); + List allActionRows = new ArrayList<>(); + + // Iterate through actions (Java List) and convert to Row format + for (RowBackedAction action : actions) { + + if (action instanceof io.delta.kernel.internal.actions.AddFile) { + io.delta.kernel.internal.actions.AddFile addFile = + (io.delta.kernel.internal.actions.AddFile) action; + Row wrappedRow = + io.delta.kernel.internal.actions.SingleAction.createAddFileSingleAction( + addFile.toRow()); + allActionRows.add(wrappedRow); + } else if (action instanceof io.delta.kernel.internal.actions.RemoveFile) { + io.delta.kernel.internal.actions.RemoveFile removeFile = + (io.delta.kernel.internal.actions.RemoveFile) action; + Row wrappedRow = + io.delta.kernel.internal.actions.SingleAction.createRemoveFileSingleAction( + removeFile.toRow()); + allActionRows.add(wrappedRow); + } + } + + CloseableIterator allActionsIterator = + new CloseableIterator() { + private int currentIndex = 0; + + @Override + public boolean hasNext() { + return currentIndex < allActionRows.size(); + } + + @Override + public Row next() { + return allActionRows.get(currentIndex++); + } + + @Override + public void close() {} + }; + + CloseableIterable dataActions = + io.delta.kernel.utils.CloseableIterable.inMemoryIterable(allActionsIterator); + + try { + TransactionCommitResult result = txn.commit(engine, dataActions); + + // Execute PostCommitHooks to create checkpoints and _last_checkpoint metadata file + List hooks = result.getPostCommitHooks(); + if (hooks != null && !hooks.isEmpty()) { + for (PostCommitHook hook : hooks) { + try { + hook.threadSafeInvoke(engine); + } catch (Exception hookEx) { + // Post-commit hooks are optimizations; log but don't fail the transaction + log.warn("Post-commit hook failed but transaction succeeded", hookEx); + } + } + } + } catch (Exception e) { + throw new UpdateException("Failed to commit Delta Kernel transaction", e); + } + + // NOTE: Delta Kernel API limitations compared to Delta Standalone: + // - Commit tags (like XTABLE_METADATA in commitInfo.tags) are not yet supported + // - Operation type metadata (like DeltaOperations.Update) is simplified to + // Operation.WRITE/CREATE_TABLE + // - The commit timestamp is managed by Delta Kernel automatically + } + + private boolean checkTableExists() { + return DeltaKernelUtils.tableExists(engine, basePath); + } + + private Map getConfigurationsForDeltaSync() { + Map configMap = new HashMap<>(); + + configMap.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson()); + configMap.put( + "delta.logRetentionDuration", String.format("interval %d hours", retentionInHours)); + + return configMap; + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileUpdatesExtractor.java new file mode 100644 index 000000000..c5e5630e7 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileUpdatesExtractor.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; + +import lombok.Builder; + +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.data.MapValue; +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.RowBackedAction; +import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.types.StructType; + +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.ReadException; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.FilesDiff; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFile; +import org.apache.xtable.model.storage.InternalFilesDiff; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.paths.PathUtils; +import org.apache.xtable.spi.extractor.DataFileIterator; + +@Builder +public class DeltaKernelDataFileUpdatesExtractor { + @Builder.Default + private final DeltaKernelStatsExtractor deltaStatsExtractor = + DeltaKernelStatsExtractor.getInstance(); + + @Builder.Default + private final DeltaKernelPartitionExtractor deltaKernelPartitionExtractor = + DeltaKernelPartitionExtractor.getInstance(); + + @Builder.Default + private final DeltaKernelDataFileExtractor dataFileExtractor = + DeltaKernelDataFileExtractor.builder().build(); + + private final Engine engine; + private final String basePath; + private final boolean includeColumnStats; + + public Seq applySnapshot( + Table table, List partitionedDataFiles, InternalSchema tableSchema) { + + // all files in the current delta snapshot are potential candidates for remove actions, i.e. if + // the file is not present in the new snapshot (addedFiles) then the file is considered removed + Map previousFiles = new HashMap<>(); + StructType physicalSchema; + + // Check if table exists by checking if _delta_log directory exists + boolean tableExists = checkTableExists(table); + + if (tableExists) { + Snapshot snapshot = table.getLatestSnapshot(engine); + + // Reuse DeltaKernelDataFileExtractor to iterate through existing files + // This avoids duplicating the scan logic for reading Delta files + try (DataFileIterator fileIterator = + dataFileExtractor.iterator(snapshot, table, engine, tableSchema)) { + + while (fileIterator.hasNext()) { + InternalDataFile internalFile = fileIterator.next(); + + // Convert InternalDataFile back to AddFile to create RemoveFile action + AddFile addFile = createAddFileFromInternalDataFile(internalFile, snapshot.getSchema()); + RemoveFile removeFile = + new RemoveFile(addFile.toRemoveFileRow(false, Optional.of(snapshot.getVersion()))); + String fullPath = + DeltaKernelActionsConverter.getFullPathToFile(removeFile.getPath(), table); + previousFiles.put(fullPath, removeFile); + } + } catch (Exception e) { + throw new ReadException("Failed to scan existing Delta files", e); + } + + physicalSchema = snapshot.getSchema(); + + } else { + + // Table doesn't exist yet - no previous files to remove + // Convert InternalSchema to StructType for physical schema + DeltaKernelSchemaExtractor schemaExtractor = DeltaKernelSchemaExtractor.getInstance(); + physicalSchema = schemaExtractor.fromInternalSchema(tableSchema); + } + + FilesDiff diff = + InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); + + return applyDiff( + diff.getFilesAdded(), + diff.getFilesRemoved(), + tableSchema, + table.getPath(engine), + physicalSchema); + } + + private boolean checkTableExists(Table table) { + return DeltaKernelUtils.tableExists(engine, table.getPath(engine)); + } + + /** + * Converts an InternalDataFile back to Delta Kernel's AddFile action. This is needed to create + * RemoveFile actions from existing files. + */ + private AddFile createAddFileFromInternalDataFile( + InternalDataFile internalFile, StructType physicalSchema) { + // Extract partition values from InternalDataFile using existing logic + Map partitionValuesMap = + deltaKernelPartitionExtractor.partitionValueSerialization(internalFile); + MapValue partitionValues = convertToMapValue(partitionValuesMap); + + // Create AddFile Row using the same pattern as createAddFileAction + Row addFileRow = + AddFile.createAddFileRow( + physicalSchema, + PathUtils.getRelativePath(internalFile.getPhysicalPath(), basePath), + partitionValues, + internalFile.getFileSizeBytes(), + internalFile.getLastModified(), + true, // dataChange - assume true for existing files + Optional.empty(), // deletionVector + Optional.empty(), // tags + Optional.empty(), // baseRowId + Optional.empty(), // defaultRowCommitVersion + Optional.empty()); // stats - set to empty since we're creating RemoveFile + + // Wrap the Row back into an AddFile object + return new AddFile(addFileRow); + } + + public Seq applyDiff( + InternalFilesDiff internalFilesDiff, + InternalSchema tableSchema, + String tableBasePath, + StructType physicalSchema) { + List removeActions = + internalFilesDiff.dataFilesRemoved().stream() + .map(dFile -> createAddFileAction(dFile, tableBasePath, physicalSchema)) + .map(addFile -> new RemoveFile(addFile.toRemoveFileRow(false, Optional.empty()))) + .collect(CustomCollectors.toList(internalFilesDiff.dataFilesRemoved().size())); + return applyDiff( + internalFilesDiff.dataFilesAdded(), + removeActions, + tableSchema, + tableBasePath, + physicalSchema); + } + + private Seq applyDiff( + Set filesAdded, + Collection removeFileActions, + InternalSchema tableSchema, + String tableBasePath, + StructType physicalSchema) { + Stream addActions = + filesAdded.stream() + .filter(InternalDataFile.class::isInstance) + .map(file -> (InternalDataFile) file) + .map(dFile -> createAddFileAction(dFile, tableBasePath, physicalSchema)); + int totalActions = filesAdded.size() + removeFileActions.size(); + List allActions = + Stream.concat(addActions, removeFileActions.stream()) + .collect(CustomCollectors.toList(totalActions)); + return JavaConverters.asScalaBuffer(allActions).toSeq(); + } + + private AddFile createAddFileAction( + InternalDataFile dataFile, String tableBasePath, StructType physicalSchema) { + // Convert partition values from Map to MapValue + Map partitionValuesMap = + deltaKernelPartitionExtractor.partitionValueSerialization(dataFile); + MapValue partitionValues = convertToMapValue(partitionValuesMap); + + Row addFileRow = + AddFile.createAddFileRow( + physicalSchema, + // Delta Lake supports relative and absolute paths in theory but relative paths seem + // more commonly supported by query engines in our testing + PathUtils.getRelativePath(dataFile.getPhysicalPath(), tableBasePath), + partitionValues, + dataFile.getFileSizeBytes(), + dataFile.getLastModified(), + true, // dataChange + Optional.empty(), // deletionVector + Optional.empty(), // tags + Optional.empty(), // baseRowId + Optional.empty(), // defaultRowCommitVersion + Optional.empty() // stats - TODO: convert column stats to DataFileStatistics + ); + + // Wrap the Row back into an AddFile object so we can use its methods + return new AddFile(addFileRow); + } + + private MapValue convertToMapValue(Map map) { + return VectorUtils.stringStringMapValue(map); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelSchemaExtractor.java index e3da2e7d2..0f0364ba6 100644 --- a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelSchemaExtractor.java @@ -37,12 +37,14 @@ import io.delta.kernel.types.LongType; import io.delta.kernel.types.MapType; import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructType; import io.delta.kernel.types.TimestampNTZType; import io.delta.kernel.types.TimestampType; import org.apache.xtable.collectors.CustomCollectors; import org.apache.xtable.delta.DeltaPartitionExtractor; +import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; @@ -66,9 +68,6 @@ public InternalSchema toInternalSchema(StructType structType) { return toInternalSchema(structType, null, false, null, null); } - String trimmedTypeName = ""; - InternalType type = null; - private InternalSchema toInternalSchema( DataType dataType, String parentPath, @@ -76,6 +75,8 @@ private InternalSchema toInternalSchema( String comment, FieldMetadata originalMetadata) { + String trimmedTypeName = ""; + InternalType type = null; Map metadata = null; List fields = null; @@ -229,4 +230,118 @@ private InternalSchema toInternalSchema( .fields(fields) .build(); } + + /** + * Converts an InternalSchema to Delta Kernel StructType. + * + * @param internalSchema the internal schema representation + * @return Delta Kernel StructType + */ + public StructType fromInternalSchema(InternalSchema internalSchema) { + List fields = + internalSchema.getFields().stream() + .map( + field -> + new StructField( + field.getName(), + convertFieldType(field), + field.getSchema().isNullable(), + getFieldMetadata(field.getSchema()))) + .collect(CustomCollectors.toList(internalSchema.getFields().size())); + return new StructType(fields); + } + + /** + * Converts an InternalField to Delta Kernel DataType. + * + * @param field the internal field + * @return Delta Kernel DataType + */ + private DataType convertFieldType(InternalField field) { + switch (field.getSchema().getDataType()) { + case STRING: + case ENUM: + return StringType.STRING; + case INT: + return IntegerType.INTEGER; + case LONG: + return LongType.LONG; + case BYTES: + case FIXED: + case UUID: + return BinaryType.BINARY; + case BOOLEAN: + return BooleanType.BOOLEAN; + case FLOAT: + return FloatType.FLOAT; + case DATE: + return DateType.DATE; + case TIMESTAMP: + return TimestampType.TIMESTAMP; + case TIMESTAMP_NTZ: + return TimestampNTZType.TIMESTAMP_NTZ; + case DOUBLE: + return DoubleType.DOUBLE; + case DECIMAL: + int precision = + (int) field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION); + int scale = + (int) field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE); + return new DecimalType(precision, scale); + case RECORD: + return fromInternalSchema(field.getSchema()); + case MAP: + InternalField key = + field.getSchema().getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Invalid map schema")); + InternalField value = + field.getSchema().getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Invalid map schema")); + return new MapType( + convertFieldType(key), convertFieldType(value), value.getSchema().isNullable()); + case LIST: + InternalField element = + field.getSchema().getFields().stream() + .filter( + arrayField -> + InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals( + arrayField.getName())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Invalid array schema")); + return new ArrayType(convertFieldType(element), element.getSchema().isNullable()); + default: + throw new NotSupportedException("Unsupported type: " + field.getSchema().getDataType()); + } + } + + /** + * Creates Delta Kernel FieldMetadata from InternalSchema. + * + * @param schema the internal schema + * @return Delta Kernel FieldMetadata + */ + private FieldMetadata getFieldMetadata(InternalSchema schema) { + FieldMetadata.Builder metadataBuilder = FieldMetadata.builder(); + + // Handle UUID type + InternalType type = schema.getDataType(); + if (type == InternalType.UUID) { + metadataBuilder.putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid"); + } + + // Handle comment + if (schema.getComment() != null) { + metadataBuilder.putString("comment", schema.getComment()); + } + + return metadataBuilder.build(); + } } diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelStatsExtractor.java index a1ff2b599..50ec9269e 100644 --- a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelStatsExtractor.java @@ -265,22 +265,27 @@ private void collectUnsupportedStats(Map additionalStats) { */ private Map flattenStatMap(Map statMap) { Map result = new HashMap<>(); + // Return empty map if input is null + if (statMap == null) { + return result; + } Queue statFieldQueue = new ArrayDeque<>(); statFieldQueue.add(StatField.of("", statMap)); while (!statFieldQueue.isEmpty()) { StatField statField = statFieldQueue.poll(); String prefix = statField.getParentPath().isEmpty() ? "" : statField.getParentPath() + "."; - statField - .getValues() - .forEach( - (fieldName, value) -> { - String fullName = prefix + fieldName; - if (value instanceof Map) { - statFieldQueue.add(StatField.of(fullName, (Map) value)); - } else { - result.put(fullName, value); - } - }); + Map values = statField.getValues(); + if (values != null) { + values.forEach( + (fieldName, value) -> { + String fullName = prefix + fieldName; + if (value instanceof Map) { + statFieldQueue.add(StatField.of(fullName, (Map) value)); + } else { + result.put(fullName, value); + } + }); + } } return result; } diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelUtils.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelUtils.java new file mode 100644 index 000000000..2939bc18d --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelUtils.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import lombok.experimental.UtilityClass; + +import io.delta.kernel.Table; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.TableNotFoundException; + +/** + * Utility methods for working with Delta Kernel API. + * + *

This class provides common helper methods used across Delta Kernel integration components to + * avoid code duplication and ensure consistent behavior. + */ +@UtilityClass +public class DeltaKernelUtils { + + /** + * Checks if a Delta table exists at the specified path. + * + *

This method only catches {@link TableNotFoundException}, allowing other exceptions (network + * errors, permission issues, corrupted metadata) to propagate. This ensures real errors are + * visible rather than being silently masked. + * + * @param engine the Delta Kernel engine to use + * @param basePath the path to the Delta table + * @return true if the table exists, false if it doesn't exist + * @throws RuntimeException if there's an error other than table not found (e.g., network issues, + * permissions) + */ + public static boolean tableExists(Engine engine, String basePath) { + try { + Table table = Table.forPath(engine, basePath); + table.getLatestSnapshot(engine); + return true; + } catch (TableNotFoundException e) { + // Expected: table doesn't exist yet + return false; + } + // Let other exceptions propagate (network issues, permissions, corrupted metadata, etc.) + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelDataFileUpdatesExtractor.java b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelDataFileUpdatesExtractor.java new file mode 100644 index 000000000..46ffab752 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelDataFileUpdatesExtractor.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import scala.collection.JavaConverters; + +import io.delta.kernel.Table; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.RowBackedAction; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.PartitionFileGroup; + +public class TestDeltaKernelDataFileUpdatesExtractor { + + @TempDir private Path tempDir; + + private Engine engine; + private DeltaKernelDataFileUpdatesExtractor extractor; + private InternalSchema testSchema; + private StructType physicalSchema; + + @BeforeEach + public void setup() { + Configuration hadoopConf = new Configuration(); + engine = DefaultEngine.create(hadoopConf); + + // Create test schema + testSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + InternalField.builder() + .name("id") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("name") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .build())) + .build(); + + // Create physical schema + physicalSchema = + new StructType() + .add(new StructField("id", IntegerType.INTEGER, false)) + .add(new StructField("name", StringType.STRING, true)); + + // Initialize extractor + extractor = + DeltaKernelDataFileUpdatesExtractor.builder() + .engine(engine) + .basePath(tempDir.toString()) + .includeColumnStats(false) + .build(); + } + + @Test + public void testCreateAddFileAction() throws IOException { + // Create a test data file + String testFilePath = tempDir.resolve("test_data.parquet").toString(); + Files.createFile(Paths.get(testFilePath)); + + InternalDataFile dataFile = + InternalDataFile.builder() + .physicalPath(testFilePath) + .fileSizeBytes(1024L) + .lastModified(Instant.now().toEpochMilli()) + .recordCount(100L) + .partitionValues(Collections.emptyList()) + .columnStats(Collections.emptyList()) + .build(); + + // Create a simple Delta table for testing + Table table = createSimpleDeltaTable(); + + List partitionedDataFiles = + Collections.singletonList( + PartitionFileGroup.builder() + .files(Collections.singletonList(dataFile)) + .partitionValues(Collections.emptyList()) + .build()); + + // Execute applySnapshot + scala.collection.Seq actions = + extractor.applySnapshot(table, partitionedDataFiles, testSchema); + + // Verify actions are created + assertNotNull(actions); + List actionList = JavaConverters.seqAsJavaList(actions); + assertFalse(actionList.isEmpty(), "Should have at least one action"); + + // Verify we have AddFile actions + boolean hasAddFile = actionList.stream().anyMatch(action -> action instanceof AddFile); + assertTrue(hasAddFile, "Should contain AddFile actions"); + } + + @Test + public void testApplySnapshotWithPartitionedData() throws IOException { + // Create test data files with partitions + String testFilePath1 = tempDir.resolve("partition1/test_data1.parquet").toString(); + String testFilePath2 = tempDir.resolve("partition2/test_data2.parquet").toString(); + Files.createDirectories(Paths.get(testFilePath1).getParent()); + Files.createDirectories(Paths.get(testFilePath2).getParent()); + Files.createFile(Paths.get(testFilePath1)); + Files.createFile(Paths.get(testFilePath2)); + + InternalPartitionField partitionField = + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("partition_col") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .build()) + .build()) + .transformType(PartitionTransformType.VALUE) + .build(); + + PartitionValue partitionValue1 = + PartitionValue.builder() + .partitionField(partitionField) + .range(Range.scalar("partition1")) + .build(); + + PartitionValue partitionValue2 = + PartitionValue.builder() + .partitionField(partitionField) + .range(Range.scalar("partition2")) + .build(); + + InternalDataFile dataFile1 = + InternalDataFile.builder() + .physicalPath(testFilePath1) + .fileSizeBytes(1024L) + .lastModified(Instant.now().toEpochMilli()) + .recordCount(50L) + .partitionValues(Collections.singletonList(partitionValue1)) + .columnStats(Collections.emptyList()) + .build(); + + InternalDataFile dataFile2 = + InternalDataFile.builder() + .physicalPath(testFilePath2) + .fileSizeBytes(2048L) + .lastModified(Instant.now().toEpochMilli()) + .recordCount(75L) + .partitionValues(Collections.singletonList(partitionValue2)) + .columnStats(Collections.emptyList()) + .build(); + + Table table = createSimpleDeltaTable(); + + List partitionedDataFiles = + Arrays.asList( + PartitionFileGroup.builder() + .files(Collections.singletonList(dataFile1)) + .partitionValues(Collections.singletonList(partitionValue1)) + .build(), + PartitionFileGroup.builder() + .files(Collections.singletonList(dataFile2)) + .partitionValues(Collections.singletonList(partitionValue2)) + .build()); + + // Execute applySnapshot + scala.collection.Seq actions = + extractor.applySnapshot(table, partitionedDataFiles, testSchema); + + // Verify + assertNotNull(actions); + List actionList = JavaConverters.seqAsJavaList(actions); + assertFalse(actionList.isEmpty(), "Should have actions for partitioned data"); + + // Should have AddFile actions for new files + long addFileCount = actionList.stream().filter(action -> action instanceof AddFile).count(); + assertTrue(addFileCount >= 2, "Should have at least 2 AddFile actions"); + } + + @Test + public void testDifferentialSyncWithExistingData() throws IOException { + // This test simulates a real differential sync scenario: + // 1. Delta table has existing files: file1.parquet, file2.parquet + // 2. New sync brings: file2.parquet (unchanged), file3.parquet (new) + // 3. Expected result: AddFile for file3, RemoveFile for file1 + + // Step 1: Create a Delta table with existing data + Path tablePath = tempDir.resolve("delta_table_with_data"); + Files.createDirectories(tablePath); + Path deltaLogPath = tablePath.resolve("_delta_log"); + Files.createDirectories(deltaLogPath); + + // Create existing data files + Path existingFile1 = tablePath.resolve("file1.parquet"); + Path existingFile2 = tablePath.resolve("file2.parquet"); + Files.createFile(existingFile1); + Files.createFile(existingFile2); + + // Create initial commit with file1 and file2 + Path initialCommit = deltaLogPath.resolve("00000000000000000000.json"); + String initialCommitJson = + "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n" + + "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"" + + physicalSchema.toJson().replace("\"", "\\\"") + + "\",\"partitionColumns\":[],\"configuration\":{},\"createdTime\":" + + System.currentTimeMillis() + + "}}\n" + + "{\"add\":{\"path\":\"file1.parquet\",\"partitionValues\":{},\"size\":1024,\"modificationTime\":" + + Instant.now().toEpochMilli() + + ",\"dataChange\":true,\"stats\":\"{}\"}}\n" + + "{\"add\":{\"path\":\"file2.parquet\",\"partitionValues\":{},\"size\":2048,\"modificationTime\":" + + Instant.now().toEpochMilli() + + ",\"dataChange\":true,\"stats\":\"{}\"}}\n"; + Files.write(initialCommit, initialCommitJson.getBytes(StandardCharsets.UTF_8)); + + // Create the table + Table table = Table.forPath(engine, tablePath.toString()); + assertNotNull(table); + + // Step 2: Prepare new sync data - file2 (unchanged) + file3 (new) + Path newFile3 = tablePath.resolve("file3.parquet"); + Files.createFile(newFile3); + + InternalDataFile dataFile2 = + InternalDataFile.builder() + .physicalPath(existingFile2.toString()) + .fileSizeBytes(2048L) + .lastModified(Instant.now().toEpochMilli()) + .recordCount(100L) + .partitionValues(Collections.emptyList()) + .columnStats(Collections.emptyList()) + .build(); + + InternalDataFile dataFile3 = + InternalDataFile.builder() + .physicalPath(newFile3.toString()) + .fileSizeBytes(3072L) + .lastModified(Instant.now().toEpochMilli()) + .recordCount(150L) + .partitionValues(Collections.emptyList()) + .columnStats(Collections.emptyList()) + .build(); + + List newPartitionedDataFiles = + Collections.singletonList( + PartitionFileGroup.builder() + .files(Arrays.asList(dataFile2, dataFile3)) + .partitionValues(Collections.emptyList()) + .build()); + + // Step 3: Apply snapshot (differential sync) + DeltaKernelDataFileUpdatesExtractor syncExtractor = + DeltaKernelDataFileUpdatesExtractor.builder() + .engine(engine) + .basePath(tablePath.toString()) + .includeColumnStats(false) + .build(); + + scala.collection.Seq actions = + syncExtractor.applySnapshot(table, newPartitionedDataFiles, testSchema); + + // Step 4: Verify the differential sync results + assertNotNull(actions, "Actions should not be null"); + List actionList = JavaConverters.seqAsJavaList(actions); + assertFalse(actionList.isEmpty(), "Should have actions for differential sync"); + + // Count AddFile and RemoveFile actions + long addFileCount = actionList.stream().filter(action -> action instanceof AddFile).count(); + long removeFileCount = + actionList.stream().filter(action -> action instanceof RemoveFile).count(); + + // Verify: Should have AddFile for file3 (new file) + assertTrue(addFileCount >= 1, "Should have at least 1 AddFile action for new file (file3)"); + + // Verify: Should have RemoveFile for file1 (removed from new sync) + assertTrue( + removeFileCount >= 1, + "Should have at least 1 RemoveFile action for file1 that's not in new sync"); + + // Verify specific files in actions + boolean hasFile3Add = + actionList.stream() + .filter(action -> action instanceof AddFile) + .map(action -> (AddFile) action) + .anyMatch(addFile -> addFile.getPath().contains("file3.parquet")); + + assertTrue(hasFile3Add, "Should have AddFile action for file3.parquet"); + + // Note: file2 should not appear in actions as it's unchanged + // file1 should appear as RemoveFile as it's not in the new sync + System.out.println( + "Differential sync completed: " + + addFileCount + + " files added, " + + removeFileCount + + " files removed"); + } + + private Table createSimpleDeltaTable() { + try { + // Create a simple Delta table directory structure + Path tablePath = tempDir.resolve("delta_table"); + Files.createDirectories(tablePath); + Path deltaLogPath = tablePath.resolve("_delta_log"); + Files.createDirectories(deltaLogPath); + + // Create an empty commit file to make it a valid Delta table + Path commitFile = deltaLogPath.resolve("00000000000000000000.json"); + String commitJson = + "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n" + + "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"" + + physicalSchema.toJson().replace("\"", "\\\"") + + "\",\"partitionColumns\":[],\"configuration\":{},\"createdTime\":" + + System.currentTimeMillis() + + "}}\n"; + Files.write(commitFile, commitJson.getBytes(StandardCharsets.UTF_8)); + + return Table.forPath(engine, tablePath.toString()); + } catch (IOException e) { + throw new RuntimeException("Failed to create test Delta table", e); + } + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelReadWriteIntegration.java b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelReadWriteIntegration.java new file mode 100644 index 000000000..9a24fc009 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelReadWriteIntegration.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.SnapshotImpl; + +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.spi.sync.TableFormatSync; + +/** + * Comprehensive end-to-end integration test for Delta Kernel read and write operations. + * + *

This test validates: 1. Writing data to Delta tables using DeltaKernelConversionTarget 2. + * Reading data from Delta tables using DeltaKernelConversionSource 3. Round-trip data integrity + * (write → read → validate) 4. Partitioned tables 5. Incremental updates (add/remove files) 6. Time + * travel (version-based reads) 7. Empty table handling + */ +public class TestDeltaKernelReadWriteIntegration { + private static final Random RANDOM = new Random(); + private static final Instant LAST_COMMIT_TIME = Instant.now(); + + @TempDir public Path tempDir; + private Engine engine; + + @BeforeEach + public void setup() { + Configuration hadoopConf = new Configuration(); + engine = DefaultEngine.create(hadoopConf); + } + + /** + * Test 1: Basic Write and Read Validates that data written to Delta can be read back correctly. + */ + @Test + public void testBasicWriteAndRead() throws Exception { + String tableName = "test_basic_" + UUID.randomUUID(); + Path basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + + // === WRITE PHASE === + InternalSchema schema = createSimpleSchema(); + DeltaKernelConversionTarget writer = createWriter(tableName, basePath); + + // Create test data files + InternalDataFile file1 = createDataFile(1, Collections.emptyList(), basePath); + InternalDataFile file2 = createDataFile(2, Collections.emptyList(), basePath); + + // Write data to Delta table + InternalTable writeTable = createInternalTable(tableName, basePath, schema, null); + InternalSnapshot snapshot = buildSnapshot(writeTable, "0", file1, file2); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot); + + // Verify Delta table was created + assertTrue(Files.exists(basePath.resolve("_delta_log")), "Delta log directory should exist"); + + // === READ PHASE === + DeltaKernelConversionSource reader = createReader(tableName, basePath); + + // Read current table metadata + InternalTable readTable = reader.getCurrentTable(); + assertNotNull(readTable, "Should be able to read table"); + assertEquals(tableName, readTable.getName()); + + // Normalize paths for comparison (handle file:// prefix differences) + String expectedPath = basePath.toString(); + String actualPath = readTable.getBasePath().replace("file://", "").replace("file:", ""); + assertTrue( + actualPath.endsWith(expectedPath) || actualPath.equals(expectedPath), + "Base path should match. Expected: " + expectedPath + ", Actual: " + actualPath); + + // Verify schema + InternalSchema readSchema = readTable.getReadSchema(); + assertNotNull(readSchema); + assertEquals(schema.getFields().size(), readSchema.getFields().size()); + + // Read current snapshot + InternalSnapshot readSnapshot = reader.getCurrentSnapshot(); + assertNotNull(readSnapshot); + + // Extract data files from partition groups (files with same partition values are grouped) + List dataFiles = extractDataFiles(readSnapshot); + assertEquals(2, dataFiles.size(), "Should have 2 files in snapshot"); + + // Compare by physical path to uniquely identify files (not by size which could be duplicated) + assertTrue( + dataFiles.stream().anyMatch(f -> f.getPhysicalPath().contains("data_1.parquet")), + "Should contain file1 (data_1.parquet)"); + assertTrue( + dataFiles.stream().anyMatch(f -> f.getPhysicalPath().contains("data_2.parquet")), + "Should contain file2 (data_2.parquet)"); + } + + /** + * Test 2: Partitioned Table Write and Read Validates partition handling in both write and read + * operations. + */ + @Test + public void testPartitionedTableRoundTrip() throws Exception { + String tableName = "test_partitioned_" + UUID.randomUUID(); + Path basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + + DeltaKernelConversionTarget writer = createWriter(tableName, basePath); + DeltaKernelConversionSource reader = createReader(tableName, basePath); + + // Define partition field + InternalPartitionField partitionField = + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("string_field") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .build()) + .build()) + .transformType(PartitionTransformType.VALUE) + .build(); + + // === WRITE PHASE === + InternalSchema schema = createSimpleSchema(); + InternalTable table = + createInternalTable(tableName, basePath, schema, Collections.singletonList(partitionField)); + + // Create partitioned data files + List partition1 = + Collections.singletonList( + PartitionValue.builder() + .partitionField(partitionField) + .range(Range.scalar("category_a")) + .build()); + List partition2 = + Collections.singletonList( + PartitionValue.builder() + .partitionField(partitionField) + .range(Range.scalar("category_b")) + .build()); + + InternalDataFile file1 = createDataFile(1, partition1, basePath); + InternalDataFile file2 = createDataFile(2, partition1, basePath); + InternalDataFile file3 = createDataFile(3, partition2, basePath); + + InternalSnapshot snapshot = buildSnapshot(table, "0", file1, file2, file3); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot); + + // === READ PHASE === + InternalTable readTable = reader.getCurrentTable(); + + // Verify partitioning + assertNotNull(readTable.getPartitioningFields()); + assertEquals(1, readTable.getPartitioningFields().size()); + assertEquals( + "string_field", readTable.getPartitioningFields().get(0).getSourceField().getName()); + + // Verify all files are present + InternalSnapshot readSnapshot = reader.getCurrentSnapshot(); + List dataFiles = extractDataFiles(readSnapshot); + assertEquals(3, dataFiles.size(), "Should have all 3 partitioned files"); + + // Verify partition columns in Delta metadata + Table deltaTable = Table.forPath(engine, basePath.toString()); + Snapshot deltaSnapshot = deltaTable.getLatestSnapshot(engine); + SnapshotImpl snapshotImpl = (SnapshotImpl) deltaSnapshot; + Set partitionColumns = snapshotImpl.getMetadata().getPartitionColNames(); + assertEquals(1, partitionColumns.size()); + assertTrue(partitionColumns.contains("string_field")); + } + + /** + * Test 3: Incremental Updates (Add/Remove Files) Validates that incremental changes are properly + * handled. + */ + @Test + public void testIncrementalUpdates() throws Exception { + String tableName = "test_incremental_" + UUID.randomUUID(); + Path basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + + DeltaKernelConversionTarget writer = createWriter(tableName, basePath); + DeltaKernelConversionSource reader = createReader(tableName, basePath); + + InternalSchema schema = createSimpleSchema(); + InternalTable table = createInternalTable(tableName, basePath, schema, null); + + // === SNAPSHOT 1: Initial files === + InternalDataFile file1 = createDataFile(1, Collections.emptyList(), basePath); + InternalDataFile file2 = createDataFile(2, Collections.emptyList(), basePath); + InternalSnapshot snapshot1 = buildSnapshot(table, "0", file1, file2); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot1); + + InternalSnapshot read1 = reader.getCurrentSnapshot(); + assertEquals(2, extractDataFiles(read1).size(), "Should have 2 files after first snapshot"); + + // === SNAPSHOT 2: Remove file1, keep file2, add file3 === + InternalDataFile file3 = createDataFile(3, Collections.emptyList(), basePath); + InternalSnapshot snapshot2 = buildSnapshot(table, "1", file2, file3); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot2); + + InternalSnapshot read2 = reader.getCurrentSnapshot(); + List files2 = extractDataFiles(read2); + assertEquals(2, files2.size(), "Should have 2 files after second snapshot"); + + // Verify correct files are present (compare by path, not size) + assertTrue( + files2.stream().anyMatch(f -> f.getPhysicalPath().contains("data_2.parquet")), + "file2 should be present"); + assertTrue( + files2.stream().anyMatch(f -> f.getPhysicalPath().contains("data_3.parquet")), + "file3 should be present"); + assertFalse( + files2.stream().anyMatch(f -> f.getPhysicalPath().contains("data_1.parquet")), + "file1 should be removed"); + + // === SNAPSHOT 3: Replace all files === + InternalDataFile file4 = createDataFile(4, Collections.emptyList(), basePath); + InternalSnapshot snapshot3 = buildSnapshot(table, "2", file4); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot3); + + InternalSnapshot read3 = reader.getCurrentSnapshot(); + List files3 = extractDataFiles(read3); + assertEquals(1, files3.size(), "Should have only 1 file after third snapshot"); + assertTrue( + files3.get(0).getPhysicalPath().contains("data_4.parquet"), + "Should contain file4 (data_4.parquet)"); + } + + /** Test 4: Read at Specific Version (Time Travel) Validates version-based reading. */ + @Test + public void testReadAtVersion() throws Exception { + String tableName = "test_versioned_" + UUID.randomUUID(); + Path basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + + DeltaKernelConversionTarget writer = createWriter(tableName, basePath); + DeltaKernelConversionSource reader = createReader(tableName, basePath); + + InternalSchema schema = createSimpleSchema(); + InternalTable table = createInternalTable(tableName, basePath, schema, null); + + // Write version 0 + InternalDataFile file1 = createDataFile(1, Collections.emptyList(), basePath); + InternalSnapshot snapshot0 = buildSnapshot(table, "0", file1); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot0); + + // Write version 1 + InternalDataFile file2 = createDataFile(2, Collections.emptyList(), basePath); + InternalSnapshot snapshot1 = buildSnapshot(table, "1", file1, file2); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot1); + + // Write version 2 + InternalDataFile file3 = createDataFile(3, Collections.emptyList(), basePath); + InternalSnapshot snapshot2 = buildSnapshot(table, "2", file2, file3); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot2); + + // Read at version 0 (should have only file1) + InternalTable tableV0 = reader.getTable(0L); + assertNotNull(tableV0); + + // Read at version 1 (should have file1 and file2) + InternalTable tableV1 = reader.getTable(1L); + assertNotNull(tableV1); + + // Read latest version (should have file2 and file3) + InternalSnapshot latestSnapshot = reader.getCurrentSnapshot(); + List latestFiles = extractDataFiles(latestSnapshot); + assertEquals(2, latestFiles.size()); + + // Verify latest version doesn't have file1 (compare by path, not size) + assertFalse( + latestFiles.stream().anyMatch(f -> f.getPhysicalPath().contains("data_1.parquet")), + "Latest version should not have file1"); + } + + /** Test 5: Empty Table Creation and Read Validates handling of empty tables. */ + @Test + public void testEmptyTableRoundTrip() throws Exception { + String tableName = "test_empty_" + UUID.randomUUID(); + Path basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + + DeltaKernelConversionTarget writer = createWriter(tableName, basePath); + DeltaKernelConversionSource reader = createReader(tableName, basePath); + + // Write empty table with just schema + InternalSchema schema = createSimpleSchema(); + InternalTable table = createInternalTable(tableName, basePath, schema, null); + InternalSnapshot emptySnapshot = buildSnapshot(table, "0"); // No files + + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), emptySnapshot); + + // Read back + InternalTable readTable = reader.getCurrentTable(); + assertNotNull(readTable); + assertEquals(schema.getFields().size(), readTable.getReadSchema().getFields().size()); + + InternalSnapshot readSnapshot = reader.getCurrentSnapshot(); + assertNotNull(readSnapshot); + assertEquals(0, readSnapshot.getPartitionedDataFiles().size(), "Should have no files"); + } + + // ==================== Helper Methods ==================== + + private DeltaKernelConversionTarget createWriter(String tableName, Path basePath) { + return new DeltaKernelConversionTarget( + TargetTable.builder() + .name(tableName) + .basePath(basePath.toString()) + .metadataRetention(Duration.of(1, ChronoUnit.HOURS)) + .formatName(TableFormat.DELTA) + .build(), + engine); + } + + private DeltaKernelConversionSource createReader(String tableName, Path basePath) { + return DeltaKernelConversionSource.builder() + .basePath(basePath.toString()) + .tableName(tableName) + .engine(engine) + .build(); + } + + private InternalSchema createSimpleSchema() { + Map timestampMetadata = new HashMap<>(); + timestampMetadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS); + + return InternalSchema.builder() + .dataType(InternalType.RECORD) + .name("test_schema") + .fields( + Arrays.asList( + InternalField.builder() + .name("id") + .schema( + InternalSchema.builder() + .name("long") + .dataType(InternalType.LONG) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("string_field") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .build(), + InternalField.builder() + .name("int_field") + .schema( + InternalSchema.builder() + .name("int") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build(), + InternalField.builder() + .name("timestamp_field") + .schema( + InternalSchema.builder() + .name("timestamp") + .dataType(InternalType.TIMESTAMP) + .isNullable(true) + .metadata(timestampMetadata) + .build()) + .build())) + .isNullable(false) + .build(); + } + + private InternalTable createInternalTable( + String tableName, + Path basePath, + InternalSchema schema, + List partitionFields) { + return InternalTable.builder() + .name(tableName) + .basePath(basePath.toUri().toString()) + .layoutStrategy(DataLayoutStrategy.FLAT) + .tableFormat(TableFormat.HUDI) + .readSchema(schema) + .partitioningFields(partitionFields) + .latestCommitTime(LAST_COMMIT_TIME) + .build(); + } + + private InternalSnapshot buildSnapshot( + InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) { + return InternalSnapshot.builder() + .table(table) + .partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles))) + .sourceIdentifier(sourceIdentifier) + .build(); + } + + private InternalDataFile createDataFile( + int index, List partitionValues, Path basePath) { + try { + Path filePath = basePath.resolve("data_" + index + ".parquet"); + Files.createFile(filePath); + + String physicalPath = new org.apache.hadoop.fs.Path(filePath.toUri()).toString(); + + return InternalDataFile.builder() + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(1000 + index) + .physicalPath(physicalPath) + .recordCount(100) + .partitionValues(partitionValues) + .columnStats(Collections.emptyList()) + .lastModified(Instant.now().toEpochMilli()) + .build(); + } catch (IOException e) { + throw new RuntimeException("Failed to create test data file", e); + } + } + + private List extractDataFiles(InternalSnapshot snapshot) { + List files = new ArrayList<>(); + for (PartitionFileGroup group : snapshot.getPartitionedDataFiles()) { + files.addAll(group.getDataFiles()); + } + return files; + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java index 4e242da1d..bb3146ca5 100644 --- a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java @@ -18,6 +18,10 @@ package org.apache.xtable.kernel; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -38,6 +42,7 @@ import io.delta.kernel.types.LongType; import io.delta.kernel.types.MapType; import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructType; import io.delta.kernel.types.TimestampNTZType; import io.delta.kernel.types.TimestampType; @@ -47,6 +52,11 @@ import org.apache.xtable.model.schema.InternalType; public class TestDeltaKernelSchemaExtractor { + + private final DeltaKernelSchemaExtractor extractor = DeltaKernelSchemaExtractor.getInstance(); + + // ========== Tests for toInternalSchema() ========== + @Test public void testPrimitiveTypes() { Map decimalMetadata = new HashMap<>(); @@ -873,4 +883,142 @@ public void testIcebergToDeltaUUIDSupport() { internalSchema, DeltaKernelSchemaExtractor.getInstance().toInternalSchema(structRepresentation)); } + + // ========== Tests for fromInternalSchema() - New Tests ========== + + @Test + public void testFromInternalSchemaSimpleTypes() { + // Create an InternalSchema with simple types + InternalField idField = + InternalField.builder() + .name("id") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build(); + + InternalField nameField = + InternalField.builder() + .name("name") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .build(); + + InternalField activeField = + InternalField.builder() + .name("active") + .schema( + InternalSchema.builder() + .name("boolean") + .dataType(InternalType.BOOLEAN) + .isNullable(false) + .build()) + .build(); + + InternalSchema internalSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields(Arrays.asList(idField, nameField, activeField)) + .build(); + + // Convert to Delta Kernel StructType + StructType deltaSchema = extractor.fromInternalSchema(internalSchema); + + // Verify + assertNotNull(deltaSchema); + assertEquals(3, deltaSchema.fields().size()); + + // Check id field + StructField idDeltaField = deltaSchema.fields().get(0); + assertEquals("id", idDeltaField.getName()); + assertEquals(IntegerType.INTEGER, idDeltaField.getDataType()); + assertEquals(false, idDeltaField.isNullable()); + + // Check name field + StructField nameDeltaField = deltaSchema.fields().get(1); + assertEquals("name", nameDeltaField.getName()); + assertEquals(StringType.STRING, nameDeltaField.getDataType()); + assertEquals(true, nameDeltaField.isNullable()); + + // Check active field + StructField activeDeltaField = deltaSchema.fields().get(2); + assertEquals("active", activeDeltaField.getName()); + assertEquals(BooleanType.BOOLEAN, activeDeltaField.getDataType()); + assertEquals(false, activeDeltaField.isNullable()); + } + + @Test + public void testFromInternalSchemaWithUUID() { + // Create an InternalSchema with UUID type + InternalField uuidField = + InternalField.builder() + .name("userId") + .schema( + InternalSchema.builder() + .name("binary") + .dataType(InternalType.UUID) + .isNullable(false) + .build()) + .build(); + + InternalSchema internalSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields(Collections.singletonList(uuidField)) + .build(); + + // Convert to Delta Kernel StructType + StructType deltaSchema = extractor.fromInternalSchema(internalSchema); + + // Verify + assertNotNull(deltaSchema); + assertEquals(1, deltaSchema.fields().size()); + + StructField uuidDeltaField = deltaSchema.fields().get(0); + assertEquals("userId", uuidDeltaField.getName()); + assertTrue(uuidDeltaField.getDataType() instanceof BinaryType); + assertEquals(false, uuidDeltaField.isNullable()); + + // Check metadata contains UUID marker + FieldMetadata metadata = uuidDeltaField.getMetadata(); + assertTrue(metadata.contains(InternalSchema.XTABLE_LOGICAL_TYPE)); + assertEquals("uuid", metadata.getString(InternalSchema.XTABLE_LOGICAL_TYPE)); + } + + @Test + public void testRoundTripConversion() { + // Create a Delta Kernel StructType + StructType originalDeltaSchema = + new StructType( + Arrays.asList( + new StructField("id", IntegerType.INTEGER, false), + new StructField("name", StringType.STRING, true), + new StructField("score", DoubleType.DOUBLE, false))); + + // Convert to InternalSchema + InternalSchema internalSchema = extractor.toInternalSchema(originalDeltaSchema); + // Convert back to Delta Kernel StructType + StructType convertedDeltaSchema = extractor.fromInternalSchema(internalSchema); + + // Verify structure matches + assertEquals(originalDeltaSchema.fields().size(), convertedDeltaSchema.fields().size()); + + for (int i = 0; i < originalDeltaSchema.fields().size(); i++) { + StructField original = originalDeltaSchema.fields().get(i); + StructField converted = convertedDeltaSchema.fields().get(i); + + assertEquals(original.getName(), converted.getName()); + assertEquals(original.getDataType(), converted.getDataType()); + assertEquals(original.isNullable(), converted.isNullable()); + } + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSync.java b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSync.java new file mode 100644 index 000000000..dec2daa04 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSync.java @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.ScanImpl; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.utils.CloseableIterator; + +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.metadata.TableSyncMetadata; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.spi.sync.TableFormatSync; + +/** + * Validates that Delta Kernel tables are properly created/updated using + * DeltaKernelConversionTarget. Tests partitioning, schema evolution, and metadata sync without + * Spark SQL dependencies. + */ +public class TestDeltaKernelSync { + private static final Instant LAST_COMMIT_TIME = Instant.ofEpochSecond(1000); + + @TempDir public Path tempDir; + private DeltaKernelConversionTarget conversionTarget; + private Path basePath; + private String tableName; + private Engine engine; + + @BeforeEach + public void setup() throws IOException { + tableName = "test-" + UUID.randomUUID(); + basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + + Configuration hadoopConf = new Configuration(); + engine = DefaultEngine.create(hadoopConf); + + conversionTarget = + new DeltaKernelConversionTarget( + TargetTable.builder() + .name(tableName) + .basePath(basePath.toString()) + .metadataRetention(Duration.of(1, ChronoUnit.HOURS)) + .formatName(TableFormat.DELTA) + .build(), + engine); + } + + @Test + public void testCreateSnapshotControlFlow() throws Exception { + InternalSchema schema1 = getInternalSchema(); + List fields2 = new ArrayList<>(schema1.getFields()); + fields2.add( + InternalField.builder() + .name("float_field") + .schema( + InternalSchema.builder() + .name("float") + .dataType(InternalType.FLOAT) + .isNullable(true) + .build()) + .build()); + InternalSchema schema2 = getInternalSchema().toBuilder().fields(fields2).build(); + InternalTable table1 = getInternalTable(tableName, basePath, schema1, null, LAST_COMMIT_TIME); + InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME); + + InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(), basePath); + InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), basePath); + InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), basePath); + + InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2); + InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2))); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3))); + } + + @Test + public void testFileRemovalWithCheckpoint() throws Exception { + // This test does 11 syncs to trigger checkpoint creation (happens at 10th commit) + // and verifies that file removal works correctly after checkpoint exists + String checkpointTableName = "test_table_checkpoint_" + UUID.randomUUID(); + Path checkpointTestPath = tempDir.resolve(checkpointTableName); + Files.createDirectories(checkpointTestPath); + + InternalSchema schema = getInternalSchema(); + InternalTable checkpointTable = + getInternalTable(checkpointTableName, checkpointTestPath, schema, null, LAST_COMMIT_TIME); + + DeltaKernelConversionTarget checkpointTarget = + new DeltaKernelConversionTarget( + TargetTable.builder() + .name(checkpointTableName) + .basePath(checkpointTestPath.toString()) + .metadataRetention(Duration.of(1, ChronoUnit.HOURS)) + .formatName(TableFormat.DELTA) + .build(), + engine); + + // Do 10 syncs to trigger checkpoint creation + for (int i = 0; i < 10; i++) { + InternalDataFile file1 = getDataFile(i * 2 + 1, Collections.emptyList(), checkpointTestPath); + InternalDataFile file2 = getDataFile(i * 2 + 2, Collections.emptyList(), checkpointTestPath); + + InternalSnapshot snapshot = buildSnapshot(checkpointTable, String.valueOf(i), file1, file2); + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(checkpointTarget), snapshot); + } + + // 11th sync: This triggers checkpoint creation at version 10 + InternalDataFile file21 = getDataFile(21, Collections.emptyList(), checkpointTestPath); + InternalDataFile file22 = getDataFile(22, Collections.emptyList(), checkpointTestPath); + InternalSnapshot snapshot11 = buildSnapshot(checkpointTable, "10", file21, file22); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(checkpointTarget), snapshot11); + + // Checkpoint is created synchronously via post-commit hooks + Path checkpointFile = + checkpointTestPath.resolve("_delta_log/00000000000000000010.checkpoint.parquet"); + assertTrue(Files.exists(checkpointFile), "Checkpoint file should exist after 10 commits"); + + // 12th sync: NOW checkpoint exists and can be used to detect file removals + InternalDataFile file23 = getDataFile(23, Collections.emptyList(), checkpointTestPath); + InternalDataFile file24 = getDataFile(24, Collections.emptyList(), checkpointTestPath); + InternalSnapshot snapshot12 = buildSnapshot(checkpointTable, "11", file23, file24); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(checkpointTarget), snapshot12); + + // Validate: Should only have file23 and file24 (file21/file22 should be removed) + validateDeltaTable(checkpointTestPath, new HashSet<>(Arrays.asList(file23, file24))); + } + + @Test + public void testPrimitiveFieldPartitioning() throws Exception { + InternalSchema schema = getInternalSchema(); + InternalPartitionField internalPartitionField = + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("string_field") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .build()) + .build()) + .transformType(PartitionTransformType.VALUE) + .build(); + InternalTable table = + getInternalTable( + tableName, + basePath, + schema, + Collections.singletonList(internalPartitionField), + LAST_COMMIT_TIME); + + List partitionValues1 = + Collections.singletonList( + PartitionValue.builder() + .partitionField(internalPartitionField) + .range(Range.scalar("level")) + .build()); + List partitionValues2 = + Collections.singletonList( + PartitionValue.builder() + .partitionField(internalPartitionField) + .range(Range.scalar("warning")) + .build()); + InternalDataFile dataFile1 = getDataFile(1, partitionValues1, basePath); + InternalDataFile dataFile2 = getDataFile(2, partitionValues1, basePath); + InternalDataFile dataFile3 = getDataFile(3, partitionValues2, basePath); + + InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); + + // Validate all files are present + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2, dataFile3))); + + // Verify partition columns are set + Table deltaTable = Table.forPath(engine, basePath.toString()); + Snapshot snapshot = deltaTable.getLatestSnapshot(engine); + SnapshotImpl snapshotImpl = (SnapshotImpl) snapshot; + Set partitionColumns = snapshotImpl.getMetadata().getPartitionColNames(); + assertEquals(1, partitionColumns.size()); + assertTrue(partitionColumns.contains("string_field")); + } + + @Test + public void testMultipleFieldPartitioning() throws Exception { + InternalSchema schema = getInternalSchema(); + InternalPartitionField internalPartitionField1 = + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("string_field") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .build()) + .build()) + .transformType(PartitionTransformType.VALUE) + .build(); + InternalPartitionField internalPartitionField2 = + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("int_field") + .schema(InternalSchema.builder().name("int").dataType(InternalType.INT).build()) + .build()) + .transformType(PartitionTransformType.VALUE) + .build(); + InternalTable table = + getInternalTable( + tableName, + basePath, + schema, + Arrays.asList(internalPartitionField1, internalPartitionField2), + LAST_COMMIT_TIME); + + List partitionValues1 = + Arrays.asList( + PartitionValue.builder() + .partitionField(internalPartitionField1) + .range(Range.scalar("level")) + .build(), + PartitionValue.builder() + .partitionField(internalPartitionField2) + .range(Range.scalar(10)) + .build()); + List partitionValues2 = + Arrays.asList( + PartitionValue.builder() + .partitionField(internalPartitionField1) + .range(Range.scalar("level")) + .build(), + PartitionValue.builder() + .partitionField(internalPartitionField2) + .range(Range.scalar(20)) + .build()); + List partitionValues3 = + Arrays.asList( + PartitionValue.builder() + .partitionField(internalPartitionField1) + .range(Range.scalar("warning")) + .build(), + PartitionValue.builder() + .partitionField(internalPartitionField2) + .range(Range.scalar(20)) + .build()); + + InternalDataFile dataFile1 = getDataFile(1, partitionValues1, basePath); + InternalDataFile dataFile2 = getDataFile(2, partitionValues2, basePath); + InternalDataFile dataFile3 = getDataFile(3, partitionValues3, basePath); + + InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2, dataFile3))); + + // Verify partition columns + Table deltaTable = Table.forPath(engine, basePath.toString()); + Snapshot snapshot = deltaTable.getLatestSnapshot(engine); + SnapshotImpl snapshotImpl = (SnapshotImpl) snapshot; + Set partitionColumns = snapshotImpl.getMetadata().getPartitionColNames(); + assertEquals(2, partitionColumns.size()); + assertTrue(partitionColumns.contains("string_field")); + assertTrue(partitionColumns.contains("int_field")); + } + + @Test + @Disabled( + "Disabled due to tags not present in commitinfo - https://github.com/delta-io/delta/issues/6167") + public void testSourceTargetIdMapping() throws Exception { + InternalSchema baseSchema = getInternalSchema(); + InternalTable sourceTable = + getInternalTable("source_table", basePath, baseSchema, null, LAST_COMMIT_TIME); + + InternalDataFile sourceDataFile1 = getDataFile(101, Collections.emptyList(), basePath); + InternalDataFile sourceDataFile2 = getDataFile(102, Collections.emptyList(), basePath); + InternalDataFile sourceDataFile3 = getDataFile(103, Collections.emptyList(), basePath); + + InternalSnapshot sourceSnapshot1 = + buildSnapshot(sourceTable, "0", sourceDataFile1, sourceDataFile2); + InternalSnapshot sourceSnapshot2 = + buildSnapshot(sourceTable, "1", sourceDataFile2, sourceDataFile3); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), sourceSnapshot1); + Optional mappedTargetId1 = + conversionTarget.getTargetCommitIdentifier(sourceSnapshot1.getSourceIdentifier()); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(sourceDataFile1, sourceDataFile2))); + assertTrue(mappedTargetId1.isPresent()); + assertEquals("0", mappedTargetId1.get()); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), sourceSnapshot2); + Optional mappedTargetId2 = + conversionTarget.getTargetCommitIdentifier(sourceSnapshot2.getSourceIdentifier()); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(sourceDataFile2, sourceDataFile3))); + assertTrue(mappedTargetId2.isPresent()); + assertEquals("1", mappedTargetId2.get()); + + Optional unmappedTargetId = conversionTarget.getTargetCommitIdentifier("s3"); + assertFalse(unmappedTargetId.isPresent()); + } + + @Test + public void testGetTargetCommitIdentifierWithNullSourceIdentifier() throws Exception { + InternalSchema baseSchema = getInternalSchema(); + InternalTable internalTable = + getInternalTable("source_table", basePath, baseSchema, null, LAST_COMMIT_TIME); + InternalDataFile sourceDataFile = getDataFile(101, Collections.emptyList(), basePath); + InternalSnapshot snapshot = buildSnapshot(internalTable, "0", sourceDataFile); + + // Mock the snapshot sync process + conversionTarget.beginSync(internalTable); + TableSyncMetadata tableSyncMetadata = + TableSyncMetadata.of( + internalTable.getLatestCommitTime(), new ArrayList<>(snapshot.getPendingCommits())); + conversionTarget.syncMetadata(tableSyncMetadata); + conversionTarget.syncSchema(internalTable.getReadSchema()); + conversionTarget.syncPartitionSpec(internalTable.getPartitioningFields()); + conversionTarget.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()); + conversionTarget.completeSync(); + + // getTargetCommitIdentifier is not supported in DeltaKernelConversionTarget + // because Delta Kernel 4.0.0 does not support commit tags + NotSupportedException exception = + assertThrows( + NotSupportedException.class, () -> conversionTarget.getTargetCommitIdentifier("0")); + assertTrue( + exception + .getMessage() + .contains("Source-to-target commit identifier mapping is not supported")); + } + + @Test + public void testGetTableMetadata() throws Exception { + InternalSchema schema = getInternalSchema(); + InternalTable table = getInternalTable(tableName, basePath, schema, null, LAST_COMMIT_TIME); + InternalDataFile dataFile = getDataFile(1, Collections.emptyList(), basePath); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot); + + Optional metadata = conversionTarget.getTableMetadata(); + assertTrue(metadata.isPresent(), "Metadata should be present after sync"); + TableSyncMetadata syncMetadata = metadata.get(); + assertNotNull(syncMetadata.getLastInstantSynced(), "Last instant synced should not be null"); + } + + private void validateDeltaTable(Path basePath, Set expectedFiles) + throws IOException { + Table table = Table.forPath(engine, basePath.toString()); + assertNotNull(table); + + Snapshot snapshot = table.getLatestSnapshot(engine); + assertNotNull(snapshot); + + // Scan all files + ScanImpl scan = (ScanImpl) snapshot.getScanBuilder().build(); + CloseableIterator scanFiles = scan.getScanFiles(engine, false); + + Map pathToFile = + expectedFiles.stream() + .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); + + int count = 0; + while (scanFiles.hasNext()) { + FilteredColumnarBatch batch = scanFiles.next(); + CloseableIterator rows = batch.getRows(); + + while (rows.hasNext()) { + Row scanFileRow = rows.next(); + AddFile addFile = + new AddFile(scanFileRow.getStruct(scanFileRow.getSchema().indexOf("add"))); + + String fullPath = + new org.apache.hadoop.fs.Path(basePath.resolve(addFile.getPath()).toUri()).toString(); + InternalDataFile expected = pathToFile.get(fullPath); + assertNotNull(expected, "Unexpected file in Delta table: " + fullPath); + assertEquals(addFile.getSize(), expected.getFileSizeBytes()); + count++; + } + } + + assertEquals( + expectedFiles.size(), count, "Number of files from Delta scan don't match expectation"); + } + + private InternalSnapshot buildSnapshot( + InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) { + return InternalSnapshot.builder() + .table(table) + .partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles))) + .sourceIdentifier(sourceIdentifier) + .build(); + } + + private InternalTable getInternalTable( + String tableName, + Path basePath, + InternalSchema schema, + List partitionFields, + Instant lastCommitTime) { + return InternalTable.builder() + .name(tableName) + .basePath(basePath.toUri().toString()) + .layoutStrategy(DataLayoutStrategy.FLAT) + .tableFormat(TableFormat.HUDI) + .readSchema(schema) + .partitioningFields(partitionFields) + .latestCommitTime(lastCommitTime) + .build(); + } + + private InternalDataFile getDataFile( + int index, List partitionValues, Path basePath) { + // Create actual physical file so Delta Kernel can reference it + try { + Path filePath = basePath.resolve("physical" + index + ".parquet"); + Files.createFile(filePath); + + String physicalPath = new org.apache.hadoop.fs.Path(filePath.toUri()).toString(); + + return InternalDataFile.builder() + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(1000L + (index * 100L)) // Deterministic size based on index + .physicalPath(physicalPath) + .recordCount(100L + (index * 10L)) // Deterministic record count based on index + .partitionValues(partitionValues) + .columnStats(Collections.emptyList()) + .lastModified(1000000000L + (index * 1000L)) // Deterministic timestamp based on index + .build(); + } catch (IOException e) { + throw new RuntimeException("Failed to create test data file", e); + } + } + + private InternalSchema getInternalSchema() { + Map timestampMetadata = new HashMap<>(); + timestampMetadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS); + return InternalSchema.builder() + .dataType(InternalType.RECORD) + .name("top_level_schema") + .fields( + Arrays.asList( + InternalField.builder() + .name("long_field") + .schema( + InternalSchema.builder() + .name("long") + .dataType(InternalType.LONG) + .isNullable(true) + .build()) + .build(), + InternalField.builder() + .name("string_field") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .build(), + InternalField.builder() + .name("int_field") + .schema( + InternalSchema.builder() + .name("int") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build(), + InternalField.builder() + .name("timestamp_field") + .schema( + InternalSchema.builder() + .name("time") + .dataType(InternalType.TIMESTAMP) + .isNullable(true) + .metadata(timestampMetadata) + .build()) + .build())) + .isNullable(false) + .build(); + } + + @Test + public void testTimestampNtz() throws Exception { + InternalSchema schema1 = getInternalSchemaWithTimestampNtz(); + List fields2 = new ArrayList<>(schema1.getFields()); + fields2.add( + InternalField.builder() + .name("float_field") + .schema( + InternalSchema.builder() + .name("float") + .dataType(InternalType.FLOAT) + .isNullable(true) + .build()) + .build()); + InternalSchema schema2 = getInternalSchema().toBuilder().fields(fields2).build(); + InternalTable table1 = getInternalTable(tableName, basePath, schema1, null, LAST_COMMIT_TIME); + InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME); + + InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(), basePath); + InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), basePath); + InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), basePath); + + InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2); + InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2))); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3))); + } + + private InternalSchema getInternalSchemaWithTimestampNtz() { + Map timestampMetadata = new HashMap<>(); + timestampMetadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); + List fields = new ArrayList<>(getInternalSchema().getFields()); + fields.add( + InternalField.builder() + .name("timestamp_ntz_field") + .schema( + InternalSchema.builder() + .name("time_ntz") + .dataType(InternalType.TIMESTAMP_NTZ) + .isNullable(true) + .metadata(timestampMetadata) + .build()) + .build()); + return getInternalSchema().toBuilder().fields(fields).build(); + } +}