Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
dfea6a4
single commit
vaibhavk1992 Apr 25, 2025
b75bc7c
adding delta kernel
vaibhavk1992 May 16, 2025
16134b3
adding the test file
vaibhavk1992 May 19, 2025
3929e95
adding workable code for iteration over data
vaibhavk1992 May 26, 2025
c6379b5
adding Kernel 4.0 code
vaibhavk1992 Jun 11, 2025
6deb5f7
adding the working code with xtable that check getcurrenttable
vaibhavk1992 Jun 24, 2025
05d9984
Merge branch 'main' into test-4
vaibhavk1992 Jun 30, 2025
c7ba4b9
adding the dependecies
vaibhavk1992 Jun 30, 2025
0ff36a5
adding getcurrentsnapshot code
vaibhavk1992 Jul 19, 2025
18ab9d6
spotless fix
vaibhavk1992 Jul 19, 2025
e906091
spotless fix 2
vaibhavk1992 Jul 19, 2025
e00241c
spotless fix 2
vaibhavk1992 Jul 19, 2025
3fdfd31
fixed partitioned test case
vaibhavk1992 Jul 26, 2025
e0102e3
setting junit parallel execution to true
vaibhavk1992 Jul 28, 2025
381722a
testInsertsUpsertsAndDeletes test case addition,internal datatype add…
vaibhavk1992 Aug 5, 2025
809bfe8
added the fix for table basepath listing wrong paths
vaibhavk1992 Aug 7, 2025
40172f2
added the fix for table basepath listing wrong paths
vaibhavk1992 Aug 7, 2025
e0b7829
adding all tests
vaibhavk1992 Aug 27, 2025
9ac022a
adding refactored code
vaibhavk1992 Aug 27, 2025
73f33b6
spotless fix
vaibhavk1992 Aug 27, 2025
bee3e8a
fix change extraction
Oct 5, 2025
e75bb55
adding the commitbacklog test cases changes
vaibhavk1992 Oct 7, 2025
21044af
Merge branch 'apache:main' into test-4
vaibhavk1992 Oct 7, 2025
e212f52
adding a test case testConvertFromDeltaPartitionFormat
vaibhavk1992 Oct 13, 2025
988cda1
adding a test case testConvertFromDeltaPartitionFormat
vaibhavk1992 Oct 13, 2025
1705ce4
adding the KernelPartitionExtractor test under kernel
vaibhavk1992 Oct 23, 2025
8f81109
commiting schema extractor and stats extrator
vaibhavk1992 Nov 11, 2025
49ebf21
adding unit test cases with the request changes on the PR
vaibhavk1992 Nov 17, 2025
70fe0e3
spotless fix
vaibhavk1992 Nov 17, 2025
fba7e0e
spotless fix
vaibhavk1992 Nov 17, 2025
bb4dc4e
Merge branch 'main' into test-4
vaibhavk1992 Nov 17, 2025
6b1be2d
adding haddop common in xtable service POM
vaibhavk1992 Nov 18, 2025
2f46699
changed map type to java and removed print commands
vaibhavk1992 Nov 22, 2025
70469fb
changed map type to java and removed print commands
vaibhavk1992 Nov 22, 2025
ae61a28
removing hadoop common from xtable service
vaibhavk1992 Nov 24, 2025
a6f86ac
fixing POM
vaibhavk1992 Nov 25, 2025
cd30bab
resolving some minor comments from review
vaibhavk1992 Nov 25, 2025
cecf300
changing constructor for Datafile extractor
vaibhavk1992 Nov 26, 2025
253de3f
add exclusion hadoop-client-runtime in POM
vaibhavk1992 Nov 27, 2025
019855b
removing unused code and string comparison method
vaibhavk1992 Dec 1, 2025
977df2f
removing while True condition
vaibhavk1992 Dec 2, 2025
3a1df45
Merge branch 'apache:main' into test-4
vaibhavk1992 Dec 29, 2025
2ea30a8
Add Delta Kernel integration with disabled tests
vaibhavk1992 Feb 6, 2026
5d695cc
Merge remote-tracking branch 'upstream/main' into test-4
vaibhavk1992 Feb 6, 2026
10f7fbe
adding the conversion target files
vaibhavk1992 Feb 9, 2026
de57c88
corrected the test cases
vaibhavk1992 Feb 13, 2026
0345140
spotless fix
vaibhavk1992 Feb 13, 2026
b9f27af
adding read write integration test case
vaibhavk1992 Feb 21, 2026
2af1236
addressed comments over PR
vaibhavk1992 Mar 2, 2026
2d0e16e
addressed comments over PR
vaibhavk1992 Mar 2, 2026
ab0417c
addressed comments over PR
vaibhavk1992 Mar 2, 2026
935d835
adding data types
vaibhavk1992 Mar 2, 2026
23f6321
dummy commit to trigger actions
vaibhavk1992 Mar 2, 2026
cf5029f
Merge upstream/main into test-4
vaibhavk1992 Mar 2, 2026
d622ae7
Apply spotless formatting to remove wildcard imports
vaibhavk1992 Mar 2, 2026
66eb9df
Fix missing imports after spotless wildcard removal
vaibhavk1992 Mar 2, 2026
d405870
adding read write integration test case
vaibhavk1992 Mar 2, 2026
37caddf
Fix exception handling, Scala/Java mixing, and test quality in Delta …
vaibhavk1992 Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.kernel;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import lombok.Builder;

import scala.collection.JavaConverters;
import scala.collection.Seq;

import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.RemoveFile;
import io.delta.kernel.internal.actions.RowBackedAction;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.StructType;

import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.FilesDiff;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalFile;
import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.paths.PathUtils;
import org.apache.xtable.spi.extractor.DataFileIterator;

@Builder
public class DeltaKernelDataFileUpdatesExtractor {
@Builder.Default
private final DeltaKernelStatsExtractor deltaStatsExtractor =
DeltaKernelStatsExtractor.getInstance();

@Builder.Default
private final DeltaKernelPartitionExtractor deltaKernelPartitionExtractor =
DeltaKernelPartitionExtractor.getInstance();

@Builder.Default
private final DeltaKernelDataFileExtractor dataFileExtractor =
DeltaKernelDataFileExtractor.builder().build();

private final Engine engine;
private final String basePath;
private final boolean includeColumnStats;

public Seq<RowBackedAction> applySnapshot(
Table table, List<PartitionFileGroup> partitionedDataFiles, InternalSchema tableSchema) {

// all files in the current delta snapshot are potential candidates for remove actions, i.e. if
// the file is not present in the new snapshot (addedFiles) then the file is considered removed
Map<String, RowBackedAction> previousFiles = new HashMap<>();
StructType physicalSchema;

// Check if table exists by checking if _delta_log directory exists
boolean tableExists = checkTableExists(table);

if (tableExists) {
Snapshot snapshot = table.getLatestSnapshot(engine);

// Reuse DeltaKernelDataFileExtractor to iterate through existing files
// This avoids duplicating the scan logic for reading Delta files
try (DataFileIterator fileIterator =
dataFileExtractor.iterator(snapshot, table, engine, tableSchema)) {

while (fileIterator.hasNext()) {
InternalDataFile internalFile = fileIterator.next();

// Convert InternalDataFile back to AddFile to create RemoveFile action
AddFile addFile = createAddFileFromInternalDataFile(internalFile, snapshot.getSchema());
RemoveFile removeFile =
new RemoveFile(addFile.toRemoveFileRow(false, Optional.of(snapshot.getVersion())));
String fullPath =
DeltaKernelActionsConverter.getFullPathToFile(removeFile.getPath(), table);
previousFiles.put(fullPath, removeFile);
}
} catch (Exception e) {
throw new ReadException("Failed to scan existing Delta files", e);
}

physicalSchema = snapshot.getSchema();

} else {

// Table doesn't exist yet - no previous files to remove
// Convert InternalSchema to StructType for physical schema
DeltaKernelSchemaExtractor schemaExtractor = DeltaKernelSchemaExtractor.getInstance();
physicalSchema = schemaExtractor.fromInternalSchema(tableSchema);
}

FilesDiff<InternalFile, RowBackedAction> diff =
InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles);

return applyDiff(
diff.getFilesAdded(),
diff.getFilesRemoved(),
tableSchema,
table.getPath(engine),
physicalSchema);
}

private boolean checkTableExists(Table table) {
return DeltaKernelUtils.tableExists(engine, table.getPath(engine));
}

/**
* Converts an InternalDataFile back to Delta Kernel's AddFile action. This is needed to create
* RemoveFile actions from existing files.
*/
private AddFile createAddFileFromInternalDataFile(
InternalDataFile internalFile, StructType physicalSchema) {
// Extract partition values from InternalDataFile using existing logic
Map<String, String> partitionValuesMap =
deltaKernelPartitionExtractor.partitionValueSerialization(internalFile);
MapValue partitionValues = convertToMapValue(partitionValuesMap);

// Create AddFile Row using the same pattern as createAddFileAction
Row addFileRow =
AddFile.createAddFileRow(
physicalSchema,
PathUtils.getRelativePath(internalFile.getPhysicalPath(), basePath),
partitionValues,
internalFile.getFileSizeBytes(),
internalFile.getLastModified(),
true, // dataChange - assume true for existing files
Optional.empty(), // deletionVector
Optional.empty(), // tags
Optional.empty(), // baseRowId
Optional.empty(), // defaultRowCommitVersion
Optional.empty()); // stats - set to empty since we're creating RemoveFile

// Wrap the Row back into an AddFile object
return new AddFile(addFileRow);
}

public Seq<RowBackedAction> applyDiff(
InternalFilesDiff internalFilesDiff,
InternalSchema tableSchema,
String tableBasePath,
StructType physicalSchema) {
List<RowBackedAction> removeActions =
internalFilesDiff.dataFilesRemoved().stream()
.map(dFile -> createAddFileAction(dFile, tableBasePath, physicalSchema))
.map(addFile -> new RemoveFile(addFile.toRemoveFileRow(false, Optional.empty())))
.collect(CustomCollectors.toList(internalFilesDiff.dataFilesRemoved().size()));
return applyDiff(
internalFilesDiff.dataFilesAdded(),
removeActions,
tableSchema,
tableBasePath,
physicalSchema);
}

private Seq<RowBackedAction> applyDiff(
Set<? extends InternalFile> filesAdded,
Collection<RowBackedAction> removeFileActions,
InternalSchema tableSchema,
String tableBasePath,
StructType physicalSchema) {
Stream<RowBackedAction> addActions =
filesAdded.stream()
.filter(InternalDataFile.class::isInstance)
.map(file -> (InternalDataFile) file)
.map(dFile -> createAddFileAction(dFile, tableBasePath, physicalSchema));
int totalActions = filesAdded.size() + removeFileActions.size();
List<RowBackedAction> allActions =
Stream.concat(addActions, removeFileActions.stream())
.collect(CustomCollectors.toList(totalActions));
return JavaConverters.asScalaBuffer(allActions).toSeq();
}

private AddFile createAddFileAction(
InternalDataFile dataFile, String tableBasePath, StructType physicalSchema) {
// Convert partition values from Map<String, String> to MapValue
Map<String, String> partitionValuesMap =
deltaKernelPartitionExtractor.partitionValueSerialization(dataFile);
MapValue partitionValues = convertToMapValue(partitionValuesMap);

Row addFileRow =
AddFile.createAddFileRow(
physicalSchema,
// Delta Lake supports relative and absolute paths in theory but relative paths seem
// more commonly supported by query engines in our testing
PathUtils.getRelativePath(dataFile.getPhysicalPath(), tableBasePath),
partitionValues,
dataFile.getFileSizeBytes(),
dataFile.getLastModified(),
true, // dataChange
Optional.empty(), // deletionVector
Optional.empty(), // tags
Optional.empty(), // baseRowId
Optional.empty(), // defaultRowCommitVersion
Optional.empty() // stats - TODO: convert column stats to DataFileStatistics
);

// Wrap the Row back into an AddFile object so we can use its methods
return new AddFile(addFileRow);
}

private MapValue convertToMapValue(Map<String, String> map) {
return VectorUtils.stringStringMapValue(map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.types.TimestampNTZType;
import io.delta.kernel.types.TimestampType;

import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.delta.DeltaPartitionExtractor;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
Expand All @@ -66,16 +68,15 @@ public InternalSchema toInternalSchema(StructType structType) {
return toInternalSchema(structType, null, false, null, null);
}

String trimmedTypeName = "";
InternalType type = null;

private InternalSchema toInternalSchema(
DataType dataType,
String parentPath,
boolean nullable,
String comment,
FieldMetadata originalMetadata) {

String trimmedTypeName = "";
InternalType type = null;
Map<InternalSchema.MetadataKey, Object> metadata = null;
List<InternalField> fields = null;

Expand Down Expand Up @@ -229,4 +230,118 @@ private InternalSchema toInternalSchema(
.fields(fields)
.build();
}

/**
* Converts an InternalSchema to Delta Kernel StructType.
*
* @param internalSchema the internal schema representation
* @return Delta Kernel StructType
*/
public StructType fromInternalSchema(InternalSchema internalSchema) {
List<StructField> fields =
internalSchema.getFields().stream()
.map(
field ->
new StructField(
field.getName(),
convertFieldType(field),
field.getSchema().isNullable(),
getFieldMetadata(field.getSchema())))
.collect(CustomCollectors.toList(internalSchema.getFields().size()));
return new StructType(fields);
}

/**
* Converts an InternalField to Delta Kernel DataType.
*
* @param field the internal field
* @return Delta Kernel DataType
*/
private DataType convertFieldType(InternalField field) {
switch (field.getSchema().getDataType()) {
case STRING:
case ENUM:
return StringType.STRING;
case INT:
return IntegerType.INTEGER;
case LONG:
return LongType.LONG;
case BYTES:
case FIXED:
case UUID:
return BinaryType.BINARY;
case BOOLEAN:
return BooleanType.BOOLEAN;
case FLOAT:
return FloatType.FLOAT;
case DATE:
return DateType.DATE;
case TIMESTAMP:
return TimestampType.TIMESTAMP;
case TIMESTAMP_NTZ:
return TimestampNTZType.TIMESTAMP_NTZ;
case DOUBLE:
return DoubleType.DOUBLE;
case DECIMAL:
int precision =
(int) field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION);
int scale =
(int) field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE);
return new DecimalType(precision, scale);
case RECORD:
return fromInternalSchema(field.getSchema());
case MAP:
InternalField key =
field.getSchema().getFields().stream()
.filter(
mapField ->
InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("Invalid map schema"));
InternalField value =
field.getSchema().getFields().stream()
.filter(
mapField ->
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("Invalid map schema"));
return new MapType(
convertFieldType(key), convertFieldType(value), value.getSchema().isNullable());
case LIST:
InternalField element =
field.getSchema().getFields().stream()
.filter(
arrayField ->
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(
arrayField.getName()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("Invalid array schema"));
return new ArrayType(convertFieldType(element), element.getSchema().isNullable());
default:
throw new NotSupportedException("Unsupported type: " + field.getSchema().getDataType());
}
}

/**
* Creates Delta Kernel FieldMetadata from InternalSchema.
*
* @param schema the internal schema
* @return Delta Kernel FieldMetadata
*/
private FieldMetadata getFieldMetadata(InternalSchema schema) {
FieldMetadata.Builder metadataBuilder = FieldMetadata.builder();

// Handle UUID type
InternalType type = schema.getDataType();
if (type == InternalType.UUID) {
metadataBuilder.putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid");
}

// Handle comment
if (schema.getComment() != null) {
metadataBuilder.putString("comment", schema.getComment());
}

return metadataBuilder.build();
}
}
Loading