) value));
+ } else {
+ result.put(fullName, value);
+ }
+ });
+ }
}
return result;
}
diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelUtils.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelUtils.java
new file mode 100644
index 000000000..2939bc18d
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.kernel;
+
+import lombok.experimental.UtilityClass;
+
+import io.delta.kernel.Table;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.TableNotFoundException;
+
+/**
+ * Utility methods for working with Delta Kernel API.
+ *
+ * This class provides common helper methods used across Delta Kernel integration components to
+ * avoid code duplication and ensure consistent behavior.
+ */
+@UtilityClass
+public class DeltaKernelUtils {
+
+ /**
+ * Checks if a Delta table exists at the specified path.
+ *
+ *
This method only catches {@link TableNotFoundException}, allowing other exceptions (network
+ * errors, permission issues, corrupted metadata) to propagate. This ensures real errors are
+ * visible rather than being silently masked.
+ *
+ * @param engine the Delta Kernel engine to use
+ * @param basePath the path to the Delta table
+ * @return true if the table exists, false if it doesn't exist
+ * @throws RuntimeException if there's an error other than table not found (e.g., network issues,
+ * permissions)
+ */
+ public static boolean tableExists(Engine engine, String basePath) {
+ try {
+ Table table = Table.forPath(engine, basePath);
+ table.getLatestSnapshot(engine);
+ return true;
+ } catch (TableNotFoundException e) {
+ // Expected: table doesn't exist yet
+ return false;
+ }
+ // Let other exceptions propagate (network issues, permissions, corrupted metadata, etc.)
+ }
+}
diff --git a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelDataFileUpdatesExtractor.java b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelDataFileUpdatesExtractor.java
new file mode 100644
index 000000000..46ffab752
--- /dev/null
+++ b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelDataFileUpdatesExtractor.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.kernel;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import scala.collection.JavaConverters;
+
+import io.delta.kernel.Table;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.actions.AddFile;
+import io.delta.kernel.internal.actions.RemoveFile;
+import io.delta.kernel.internal.actions.RowBackedAction;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+
+public class TestDeltaKernelDataFileUpdatesExtractor {
+
+ @TempDir private Path tempDir;
+
+ private Engine engine;
+ private DeltaKernelDataFileUpdatesExtractor extractor;
+ private InternalSchema testSchema;
+ private StructType physicalSchema;
+
+ @BeforeEach
+ public void setup() {
+ Configuration hadoopConf = new Configuration();
+ engine = DefaultEngine.create(hadoopConf);
+
+ // Create test schema
+ testSchema =
+ InternalSchema.builder()
+ .name("record")
+ .dataType(InternalType.RECORD)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("id")
+ .schema(
+ InternalSchema.builder()
+ .name("integer")
+ .dataType(InternalType.INT)
+ .isNullable(false)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("name")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .build()))
+ .build();
+
+ // Create physical schema
+ physicalSchema =
+ new StructType()
+ .add(new StructField("id", IntegerType.INTEGER, false))
+ .add(new StructField("name", StringType.STRING, true));
+
+ // Initialize extractor
+ extractor =
+ DeltaKernelDataFileUpdatesExtractor.builder()
+ .engine(engine)
+ .basePath(tempDir.toString())
+ .includeColumnStats(false)
+ .build();
+ }
+
+ @Test
+ public void testCreateAddFileAction() throws IOException {
+ // Create a test data file
+ String testFilePath = tempDir.resolve("test_data.parquet").toString();
+ Files.createFile(Paths.get(testFilePath));
+
+ InternalDataFile dataFile =
+ InternalDataFile.builder()
+ .physicalPath(testFilePath)
+ .fileSizeBytes(1024L)
+ .lastModified(Instant.now().toEpochMilli())
+ .recordCount(100L)
+ .partitionValues(Collections.emptyList())
+ .columnStats(Collections.emptyList())
+ .build();
+
+ // Create a simple Delta table for testing
+ Table table = createSimpleDeltaTable();
+
+ List partitionedDataFiles =
+ Collections.singletonList(
+ PartitionFileGroup.builder()
+ .files(Collections.singletonList(dataFile))
+ .partitionValues(Collections.emptyList())
+ .build());
+
+ // Execute applySnapshot
+ scala.collection.Seq actions =
+ extractor.applySnapshot(table, partitionedDataFiles, testSchema);
+
+ // Verify actions are created
+ assertNotNull(actions);
+ List actionList = JavaConverters.seqAsJavaList(actions);
+ assertFalse(actionList.isEmpty(), "Should have at least one action");
+
+ // Verify we have AddFile actions
+ boolean hasAddFile = actionList.stream().anyMatch(action -> action instanceof AddFile);
+ assertTrue(hasAddFile, "Should contain AddFile actions");
+ }
+
+ @Test
+ public void testApplySnapshotWithPartitionedData() throws IOException {
+ // Create test data files with partitions
+ String testFilePath1 = tempDir.resolve("partition1/test_data1.parquet").toString();
+ String testFilePath2 = tempDir.resolve("partition2/test_data2.parquet").toString();
+ Files.createDirectories(Paths.get(testFilePath1).getParent());
+ Files.createDirectories(Paths.get(testFilePath2).getParent());
+ Files.createFile(Paths.get(testFilePath1));
+ Files.createFile(Paths.get(testFilePath2));
+
+ InternalPartitionField partitionField =
+ InternalPartitionField.builder()
+ .sourceField(
+ InternalField.builder()
+ .name("partition_col")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .build())
+ .build())
+ .transformType(PartitionTransformType.VALUE)
+ .build();
+
+ PartitionValue partitionValue1 =
+ PartitionValue.builder()
+ .partitionField(partitionField)
+ .range(Range.scalar("partition1"))
+ .build();
+
+ PartitionValue partitionValue2 =
+ PartitionValue.builder()
+ .partitionField(partitionField)
+ .range(Range.scalar("partition2"))
+ .build();
+
+ InternalDataFile dataFile1 =
+ InternalDataFile.builder()
+ .physicalPath(testFilePath1)
+ .fileSizeBytes(1024L)
+ .lastModified(Instant.now().toEpochMilli())
+ .recordCount(50L)
+ .partitionValues(Collections.singletonList(partitionValue1))
+ .columnStats(Collections.emptyList())
+ .build();
+
+ InternalDataFile dataFile2 =
+ InternalDataFile.builder()
+ .physicalPath(testFilePath2)
+ .fileSizeBytes(2048L)
+ .lastModified(Instant.now().toEpochMilli())
+ .recordCount(75L)
+ .partitionValues(Collections.singletonList(partitionValue2))
+ .columnStats(Collections.emptyList())
+ .build();
+
+ Table table = createSimpleDeltaTable();
+
+ List partitionedDataFiles =
+ Arrays.asList(
+ PartitionFileGroup.builder()
+ .files(Collections.singletonList(dataFile1))
+ .partitionValues(Collections.singletonList(partitionValue1))
+ .build(),
+ PartitionFileGroup.builder()
+ .files(Collections.singletonList(dataFile2))
+ .partitionValues(Collections.singletonList(partitionValue2))
+ .build());
+
+ // Execute applySnapshot
+ scala.collection.Seq actions =
+ extractor.applySnapshot(table, partitionedDataFiles, testSchema);
+
+ // Verify
+ assertNotNull(actions);
+ List actionList = JavaConverters.seqAsJavaList(actions);
+ assertFalse(actionList.isEmpty(), "Should have actions for partitioned data");
+
+ // Should have AddFile actions for new files
+ long addFileCount = actionList.stream().filter(action -> action instanceof AddFile).count();
+ assertTrue(addFileCount >= 2, "Should have at least 2 AddFile actions");
+ }
+
+ @Test
+ public void testDifferentialSyncWithExistingData() throws IOException {
+ // This test simulates a real differential sync scenario:
+ // 1. Delta table has existing files: file1.parquet, file2.parquet
+ // 2. New sync brings: file2.parquet (unchanged), file3.parquet (new)
+ // 3. Expected result: AddFile for file3, RemoveFile for file1
+
+ // Step 1: Create a Delta table with existing data
+ Path tablePath = tempDir.resolve("delta_table_with_data");
+ Files.createDirectories(tablePath);
+ Path deltaLogPath = tablePath.resolve("_delta_log");
+ Files.createDirectories(deltaLogPath);
+
+ // Create existing data files
+ Path existingFile1 = tablePath.resolve("file1.parquet");
+ Path existingFile2 = tablePath.resolve("file2.parquet");
+ Files.createFile(existingFile1);
+ Files.createFile(existingFile2);
+
+ // Create initial commit with file1 and file2
+ Path initialCommit = deltaLogPath.resolve("00000000000000000000.json");
+ String initialCommitJson =
+ "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n"
+ + "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\""
+ + physicalSchema.toJson().replace("\"", "\\\"")
+ + "\",\"partitionColumns\":[],\"configuration\":{},\"createdTime\":"
+ + System.currentTimeMillis()
+ + "}}\n"
+ + "{\"add\":{\"path\":\"file1.parquet\",\"partitionValues\":{},\"size\":1024,\"modificationTime\":"
+ + Instant.now().toEpochMilli()
+ + ",\"dataChange\":true,\"stats\":\"{}\"}}\n"
+ + "{\"add\":{\"path\":\"file2.parquet\",\"partitionValues\":{},\"size\":2048,\"modificationTime\":"
+ + Instant.now().toEpochMilli()
+ + ",\"dataChange\":true,\"stats\":\"{}\"}}\n";
+ Files.write(initialCommit, initialCommitJson.getBytes(StandardCharsets.UTF_8));
+
+ // Create the table
+ Table table = Table.forPath(engine, tablePath.toString());
+ assertNotNull(table);
+
+ // Step 2: Prepare new sync data - file2 (unchanged) + file3 (new)
+ Path newFile3 = tablePath.resolve("file3.parquet");
+ Files.createFile(newFile3);
+
+ InternalDataFile dataFile2 =
+ InternalDataFile.builder()
+ .physicalPath(existingFile2.toString())
+ .fileSizeBytes(2048L)
+ .lastModified(Instant.now().toEpochMilli())
+ .recordCount(100L)
+ .partitionValues(Collections.emptyList())
+ .columnStats(Collections.emptyList())
+ .build();
+
+ InternalDataFile dataFile3 =
+ InternalDataFile.builder()
+ .physicalPath(newFile3.toString())
+ .fileSizeBytes(3072L)
+ .lastModified(Instant.now().toEpochMilli())
+ .recordCount(150L)
+ .partitionValues(Collections.emptyList())
+ .columnStats(Collections.emptyList())
+ .build();
+
+ List newPartitionedDataFiles =
+ Collections.singletonList(
+ PartitionFileGroup.builder()
+ .files(Arrays.asList(dataFile2, dataFile3))
+ .partitionValues(Collections.emptyList())
+ .build());
+
+ // Step 3: Apply snapshot (differential sync)
+ DeltaKernelDataFileUpdatesExtractor syncExtractor =
+ DeltaKernelDataFileUpdatesExtractor.builder()
+ .engine(engine)
+ .basePath(tablePath.toString())
+ .includeColumnStats(false)
+ .build();
+
+ scala.collection.Seq actions =
+ syncExtractor.applySnapshot(table, newPartitionedDataFiles, testSchema);
+
+ // Step 4: Verify the differential sync results
+ assertNotNull(actions, "Actions should not be null");
+ List actionList = JavaConverters.seqAsJavaList(actions);
+ assertFalse(actionList.isEmpty(), "Should have actions for differential sync");
+
+ // Count AddFile and RemoveFile actions
+ long addFileCount = actionList.stream().filter(action -> action instanceof AddFile).count();
+ long removeFileCount =
+ actionList.stream().filter(action -> action instanceof RemoveFile).count();
+
+ // Verify: Should have AddFile for file3 (new file)
+ assertTrue(addFileCount >= 1, "Should have at least 1 AddFile action for new file (file3)");
+
+ // Verify: Should have RemoveFile for file1 (removed from new sync)
+ assertTrue(
+ removeFileCount >= 1,
+ "Should have at least 1 RemoveFile action for file1 that's not in new sync");
+
+ // Verify specific files in actions
+ boolean hasFile3Add =
+ actionList.stream()
+ .filter(action -> action instanceof AddFile)
+ .map(action -> (AddFile) action)
+ .anyMatch(addFile -> addFile.getPath().contains("file3.parquet"));
+
+ assertTrue(hasFile3Add, "Should have AddFile action for file3.parquet");
+
+ // Note: file2 should not appear in actions as it's unchanged
+ // file1 should appear as RemoveFile as it's not in the new sync
+ System.out.println(
+ "Differential sync completed: "
+ + addFileCount
+ + " files added, "
+ + removeFileCount
+ + " files removed");
+ }
+
+ private Table createSimpleDeltaTable() {
+ try {
+ // Create a simple Delta table directory structure
+ Path tablePath = tempDir.resolve("delta_table");
+ Files.createDirectories(tablePath);
+ Path deltaLogPath = tablePath.resolve("_delta_log");
+ Files.createDirectories(deltaLogPath);
+
+ // Create an empty commit file to make it a valid Delta table
+ Path commitFile = deltaLogPath.resolve("00000000000000000000.json");
+ String commitJson =
+ "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n"
+ + "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\""
+ + physicalSchema.toJson().replace("\"", "\\\"")
+ + "\",\"partitionColumns\":[],\"configuration\":{},\"createdTime\":"
+ + System.currentTimeMillis()
+ + "}}\n";
+ Files.write(commitFile, commitJson.getBytes(StandardCharsets.UTF_8));
+
+ return Table.forPath(engine, tablePath.toString());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create test Delta table", e);
+ }
+ }
+}
diff --git a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelReadWriteIntegration.java b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelReadWriteIntegration.java
new file mode 100644
index 000000000..9a24fc009
--- /dev/null
+++ b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelReadWriteIntegration.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.kernel;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.SnapshotImpl;
+
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.model.InternalSnapshot;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.spi.sync.TableFormatSync;
+
+/**
+ * Comprehensive end-to-end integration test for Delta Kernel read and write operations.
+ *
+ * This test validates: 1. Writing data to Delta tables using DeltaKernelConversionTarget 2.
+ * Reading data from Delta tables using DeltaKernelConversionSource 3. Round-trip data integrity
+ * (write → read → validate) 4. Partitioned tables 5. Incremental updates (add/remove files) 6. Time
+ * travel (version-based reads) 7. Empty table handling
+ */
+public class TestDeltaKernelReadWriteIntegration {
+ private static final Random RANDOM = new Random();
+ private static final Instant LAST_COMMIT_TIME = Instant.now();
+
+ @TempDir public Path tempDir;
+ private Engine engine;
+
+ @BeforeEach
+ public void setup() {
+ Configuration hadoopConf = new Configuration();
+ engine = DefaultEngine.create(hadoopConf);
+ }
+
+ /**
+ * Test 1: Basic Write and Read Validates that data written to Delta can be read back correctly.
+ */
+ @Test
+ public void testBasicWriteAndRead() throws Exception {
+ String tableName = "test_basic_" + UUID.randomUUID();
+ Path basePath = tempDir.resolve(tableName);
+ Files.createDirectories(basePath);
+
+ // === WRITE PHASE ===
+ InternalSchema schema = createSimpleSchema();
+ DeltaKernelConversionTarget writer = createWriter(tableName, basePath);
+
+ // Create test data files
+ InternalDataFile file1 = createDataFile(1, Collections.emptyList(), basePath);
+ InternalDataFile file2 = createDataFile(2, Collections.emptyList(), basePath);
+
+ // Write data to Delta table
+ InternalTable writeTable = createInternalTable(tableName, basePath, schema, null);
+ InternalSnapshot snapshot = buildSnapshot(writeTable, "0", file1, file2);
+ TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot);
+
+ // Verify Delta table was created
+ assertTrue(Files.exists(basePath.resolve("_delta_log")), "Delta log directory should exist");
+
+ // === READ PHASE ===
+ DeltaKernelConversionSource reader = createReader(tableName, basePath);
+
+ // Read current table metadata
+ InternalTable readTable = reader.getCurrentTable();
+ assertNotNull(readTable, "Should be able to read table");
+ assertEquals(tableName, readTable.getName());
+
+ // Normalize paths for comparison (handle file:// prefix differences)
+ String expectedPath = basePath.toString();
+ String actualPath = readTable.getBasePath().replace("file://", "").replace("file:", "");
+ assertTrue(
+ actualPath.endsWith(expectedPath) || actualPath.equals(expectedPath),
+ "Base path should match. Expected: " + expectedPath + ", Actual: " + actualPath);
+
+ // Verify schema
+ InternalSchema readSchema = readTable.getReadSchema();
+ assertNotNull(readSchema);
+ assertEquals(schema.getFields().size(), readSchema.getFields().size());
+
+ // Read current snapshot
+ InternalSnapshot readSnapshot = reader.getCurrentSnapshot();
+ assertNotNull(readSnapshot);
+
+ // Extract data files from partition groups (files with same partition values are grouped)
+ List dataFiles = extractDataFiles(readSnapshot);
+ assertEquals(2, dataFiles.size(), "Should have 2 files in snapshot");
+
+ // Compare by physical path to uniquely identify files (not by size which could be duplicated)
+ assertTrue(
+ dataFiles.stream().anyMatch(f -> f.getPhysicalPath().contains("data_1.parquet")),
+ "Should contain file1 (data_1.parquet)");
+ assertTrue(
+ dataFiles.stream().anyMatch(f -> f.getPhysicalPath().contains("data_2.parquet")),
+ "Should contain file2 (data_2.parquet)");
+ }
+
+ /**
+ * Test 2: Partitioned Table Write and Read Validates partition handling in both write and read
+ * operations.
+ */
+ @Test
+ public void testPartitionedTableRoundTrip() throws Exception {
+ String tableName = "test_partitioned_" + UUID.randomUUID();
+ Path basePath = tempDir.resolve(tableName);
+ Files.createDirectories(basePath);
+
+ DeltaKernelConversionTarget writer = createWriter(tableName, basePath);
+ DeltaKernelConversionSource reader = createReader(tableName, basePath);
+
+ // Define partition field
+ InternalPartitionField partitionField =
+ InternalPartitionField.builder()
+ .sourceField(
+ InternalField.builder()
+ .name("string_field")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .build())
+ .build())
+ .transformType(PartitionTransformType.VALUE)
+ .build();
+
+ // === WRITE PHASE ===
+ InternalSchema schema = createSimpleSchema();
+ InternalTable table =
+ createInternalTable(tableName, basePath, schema, Collections.singletonList(partitionField));
+
+ // Create partitioned data files
+ List partition1 =
+ Collections.singletonList(
+ PartitionValue.builder()
+ .partitionField(partitionField)
+ .range(Range.scalar("category_a"))
+ .build());
+ List partition2 =
+ Collections.singletonList(
+ PartitionValue.builder()
+ .partitionField(partitionField)
+ .range(Range.scalar("category_b"))
+ .build());
+
+ InternalDataFile file1 = createDataFile(1, partition1, basePath);
+ InternalDataFile file2 = createDataFile(2, partition1, basePath);
+ InternalDataFile file3 = createDataFile(3, partition2, basePath);
+
+ InternalSnapshot snapshot = buildSnapshot(table, "0", file1, file2, file3);
+ TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot);
+
+ // === READ PHASE ===
+ InternalTable readTable = reader.getCurrentTable();
+
+ // Verify partitioning
+ assertNotNull(readTable.getPartitioningFields());
+ assertEquals(1, readTable.getPartitioningFields().size());
+ assertEquals(
+ "string_field", readTable.getPartitioningFields().get(0).getSourceField().getName());
+
+ // Verify all files are present
+ InternalSnapshot readSnapshot = reader.getCurrentSnapshot();
+ List dataFiles = extractDataFiles(readSnapshot);
+ assertEquals(3, dataFiles.size(), "Should have all 3 partitioned files");
+
+ // Verify partition columns in Delta metadata
+ Table deltaTable = Table.forPath(engine, basePath.toString());
+ Snapshot deltaSnapshot = deltaTable.getLatestSnapshot(engine);
+ SnapshotImpl snapshotImpl = (SnapshotImpl) deltaSnapshot;
+ Set partitionColumns = snapshotImpl.getMetadata().getPartitionColNames();
+ assertEquals(1, partitionColumns.size());
+ assertTrue(partitionColumns.contains("string_field"));
+ }
+
+ /**
+ * Test 3: Incremental Updates (Add/Remove Files) Validates that incremental changes are properly
+ * handled.
+ */
+ @Test
+ public void testIncrementalUpdates() throws Exception {
+ String tableName = "test_incremental_" + UUID.randomUUID();
+ Path basePath = tempDir.resolve(tableName);
+ Files.createDirectories(basePath);
+
+ DeltaKernelConversionTarget writer = createWriter(tableName, basePath);
+ DeltaKernelConversionSource reader = createReader(tableName, basePath);
+
+ InternalSchema schema = createSimpleSchema();
+ InternalTable table = createInternalTable(tableName, basePath, schema, null);
+
+ // === SNAPSHOT 1: Initial files ===
+ InternalDataFile file1 = createDataFile(1, Collections.emptyList(), basePath);
+ InternalDataFile file2 = createDataFile(2, Collections.emptyList(), basePath);
+ InternalSnapshot snapshot1 = buildSnapshot(table, "0", file1, file2);
+ TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot1);
+
+ InternalSnapshot read1 = reader.getCurrentSnapshot();
+ assertEquals(2, extractDataFiles(read1).size(), "Should have 2 files after first snapshot");
+
+ // === SNAPSHOT 2: Remove file1, keep file2, add file3 ===
+ InternalDataFile file3 = createDataFile(3, Collections.emptyList(), basePath);
+ InternalSnapshot snapshot2 = buildSnapshot(table, "1", file2, file3);
+ TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot2);
+
+ InternalSnapshot read2 = reader.getCurrentSnapshot();
+ List files2 = extractDataFiles(read2);
+ assertEquals(2, files2.size(), "Should have 2 files after second snapshot");
+
+ // Verify correct files are present (compare by path, not size)
+ assertTrue(
+ files2.stream().anyMatch(f -> f.getPhysicalPath().contains("data_2.parquet")),
+ "file2 should be present");
+ assertTrue(
+ files2.stream().anyMatch(f -> f.getPhysicalPath().contains("data_3.parquet")),
+ "file3 should be present");
+ assertFalse(
+ files2.stream().anyMatch(f -> f.getPhysicalPath().contains("data_1.parquet")),
+ "file1 should be removed");
+
+ // === SNAPSHOT 3: Replace all files ===
+ InternalDataFile file4 = createDataFile(4, Collections.emptyList(), basePath);
+ InternalSnapshot snapshot3 = buildSnapshot(table, "2", file4);
+ TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot3);
+
+ InternalSnapshot read3 = reader.getCurrentSnapshot();
+ List files3 = extractDataFiles(read3);
+ assertEquals(1, files3.size(), "Should have only 1 file after third snapshot");
+ assertTrue(
+ files3.get(0).getPhysicalPath().contains("data_4.parquet"),
+ "Should contain file4 (data_4.parquet)");
+ }
+
+ /** Test 4: Read at Specific Version (Time Travel) Validates version-based reading. */
+ @Test
+ public void testReadAtVersion() throws Exception {
+ String tableName = "test_versioned_" + UUID.randomUUID();
+ Path basePath = tempDir.resolve(tableName);
+ Files.createDirectories(basePath);
+
+ DeltaKernelConversionTarget writer = createWriter(tableName, basePath);
+ DeltaKernelConversionSource reader = createReader(tableName, basePath);
+
+ InternalSchema schema = createSimpleSchema();
+ InternalTable table = createInternalTable(tableName, basePath, schema, null);
+
+ // Write version 0
+ InternalDataFile file1 = createDataFile(1, Collections.emptyList(), basePath);
+ InternalSnapshot snapshot0 = buildSnapshot(table, "0", file1);
+ TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot0);
+
+ // Write version 1
+ InternalDataFile file2 = createDataFile(2, Collections.emptyList(), basePath);
+ InternalSnapshot snapshot1 = buildSnapshot(table, "1", file1, file2);
+ TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot1);
+
+ // Write version 2
+ InternalDataFile file3 = createDataFile(3, Collections.emptyList(), basePath);
+ InternalSnapshot snapshot2 = buildSnapshot(table, "2", file2, file3);
+ TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot2);
+
+ // Read at version 0 (should have only file1)
+ InternalTable tableV0 = reader.getTable(0L);
+ assertNotNull(tableV0);
+
+ // Read at version 1 (should have file1 and file2)
+ InternalTable tableV1 = reader.getTable(1L);
+ assertNotNull(tableV1);
+
+ // Read latest version (should have file2 and file3)
+ InternalSnapshot latestSnapshot = reader.getCurrentSnapshot();
+ List latestFiles = extractDataFiles(latestSnapshot);
+ assertEquals(2, latestFiles.size());
+
+ // Verify latest version doesn't have file1 (compare by path, not size)
+ assertFalse(
+ latestFiles.stream().anyMatch(f -> f.getPhysicalPath().contains("data_1.parquet")),
+ "Latest version should not have file1");
+ }
+
+ /** Test 5: Empty Table Creation and Read Validates handling of empty tables. */
+ @Test
+ public void testEmptyTableRoundTrip() throws Exception {
+ String tableName = "test_empty_" + UUID.randomUUID();
+ Path basePath = tempDir.resolve(tableName);
+ Files.createDirectories(basePath);
+
+ DeltaKernelConversionTarget writer = createWriter(tableName, basePath);
+ DeltaKernelConversionSource reader = createReader(tableName, basePath);
+
+ // Write empty table with just schema
+ InternalSchema schema = createSimpleSchema();
+ InternalTable table = createInternalTable(tableName, basePath, schema, null);
+ InternalSnapshot emptySnapshot = buildSnapshot(table, "0"); // No files
+
+ TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), emptySnapshot);
+
+ // Read back
+ InternalTable readTable = reader.getCurrentTable();
+ assertNotNull(readTable);
+ assertEquals(schema.getFields().size(), readTable.getReadSchema().getFields().size());
+
+ InternalSnapshot readSnapshot = reader.getCurrentSnapshot();
+ assertNotNull(readSnapshot);
+ assertEquals(0, readSnapshot.getPartitionedDataFiles().size(), "Should have no files");
+ }
+
+ // ==================== Helper Methods ====================
+
+ private DeltaKernelConversionTarget createWriter(String tableName, Path basePath) {
+ return new DeltaKernelConversionTarget(
+ TargetTable.builder()
+ .name(tableName)
+ .basePath(basePath.toString())
+ .metadataRetention(Duration.of(1, ChronoUnit.HOURS))
+ .formatName(TableFormat.DELTA)
+ .build(),
+ engine);
+ }
+
+ private DeltaKernelConversionSource createReader(String tableName, Path basePath) {
+ return DeltaKernelConversionSource.builder()
+ .basePath(basePath.toString())
+ .tableName(tableName)
+ .engine(engine)
+ .build();
+ }
+
+ private InternalSchema createSimpleSchema() {
+ Map timestampMetadata = new HashMap<>();
+ timestampMetadata.put(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS);
+
+ return InternalSchema.builder()
+ .dataType(InternalType.RECORD)
+ .name("test_schema")
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("id")
+ .schema(
+ InternalSchema.builder()
+ .name("long")
+ .dataType(InternalType.LONG)
+ .isNullable(false)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("string_field")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("int_field")
+ .schema(
+ InternalSchema.builder()
+ .name("int")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("timestamp_field")
+ .schema(
+ InternalSchema.builder()
+ .name("timestamp")
+ .dataType(InternalType.TIMESTAMP)
+ .isNullable(true)
+ .metadata(timestampMetadata)
+ .build())
+ .build()))
+ .isNullable(false)
+ .build();
+ }
+
+ private InternalTable createInternalTable(
+ String tableName,
+ Path basePath,
+ InternalSchema schema,
+ List partitionFields) {
+ return InternalTable.builder()
+ .name(tableName)
+ .basePath(basePath.toUri().toString())
+ .layoutStrategy(DataLayoutStrategy.FLAT)
+ .tableFormat(TableFormat.HUDI)
+ .readSchema(schema)
+ .partitioningFields(partitionFields)
+ .latestCommitTime(LAST_COMMIT_TIME)
+ .build();
+ }
+
+ private InternalSnapshot buildSnapshot(
+ InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) {
+ return InternalSnapshot.builder()
+ .table(table)
+ .partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles)))
+ .sourceIdentifier(sourceIdentifier)
+ .build();
+ }
+
+ private InternalDataFile createDataFile(
+ int index, List partitionValues, Path basePath) {
+ try {
+ Path filePath = basePath.resolve("data_" + index + ".parquet");
+ Files.createFile(filePath);
+
+ String physicalPath = new org.apache.hadoop.fs.Path(filePath.toUri()).toString();
+
+ return InternalDataFile.builder()
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .fileSizeBytes(1000 + index)
+ .physicalPath(physicalPath)
+ .recordCount(100)
+ .partitionValues(partitionValues)
+ .columnStats(Collections.emptyList())
+ .lastModified(Instant.now().toEpochMilli())
+ .build();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create test data file", e);
+ }
+ }
+
+ private List extractDataFiles(InternalSnapshot snapshot) {
+ List files = new ArrayList<>();
+ for (PartitionFileGroup group : snapshot.getPartitionedDataFiles()) {
+ files.addAll(group.getDataFiles());
+ }
+ return files;
+ }
+}
diff --git a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java
index 4e242da1d..bb3146ca5 100644
--- a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java
+++ b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java
@@ -18,6 +18,10 @@
package org.apache.xtable.kernel;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -38,6 +42,7 @@
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.types.TimestampNTZType;
import io.delta.kernel.types.TimestampType;
@@ -47,6 +52,11 @@
import org.apache.xtable.model.schema.InternalType;
public class TestDeltaKernelSchemaExtractor {
+
+ private final DeltaKernelSchemaExtractor extractor = DeltaKernelSchemaExtractor.getInstance();
+
+ // ========== Tests for toInternalSchema() ==========
+
@Test
public void testPrimitiveTypes() {
Map decimalMetadata = new HashMap<>();
@@ -873,4 +883,142 @@ public void testIcebergToDeltaUUIDSupport() {
internalSchema,
DeltaKernelSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
}
+
+ // ========== Tests for fromInternalSchema() - New Tests ==========
+
+ @Test
+ public void testFromInternalSchemaSimpleTypes() {
+ // Create an InternalSchema with simple types
+ InternalField idField =
+ InternalField.builder()
+ .name("id")
+ .schema(
+ InternalSchema.builder()
+ .name("integer")
+ .dataType(InternalType.INT)
+ .isNullable(false)
+ .build())
+ .build();
+
+ InternalField nameField =
+ InternalField.builder()
+ .name("name")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .build();
+
+ InternalField activeField =
+ InternalField.builder()
+ .name("active")
+ .schema(
+ InternalSchema.builder()
+ .name("boolean")
+ .dataType(InternalType.BOOLEAN)
+ .isNullable(false)
+ .build())
+ .build();
+
+ InternalSchema internalSchema =
+ InternalSchema.builder()
+ .name("record")
+ .dataType(InternalType.RECORD)
+ .fields(Arrays.asList(idField, nameField, activeField))
+ .build();
+
+ // Convert to Delta Kernel StructType
+ StructType deltaSchema = extractor.fromInternalSchema(internalSchema);
+
+ // Verify
+ assertNotNull(deltaSchema);
+ assertEquals(3, deltaSchema.fields().size());
+
+ // Check id field
+ StructField idDeltaField = deltaSchema.fields().get(0);
+ assertEquals("id", idDeltaField.getName());
+ assertEquals(IntegerType.INTEGER, idDeltaField.getDataType());
+ assertEquals(false, idDeltaField.isNullable());
+
+ // Check name field
+ StructField nameDeltaField = deltaSchema.fields().get(1);
+ assertEquals("name", nameDeltaField.getName());
+ assertEquals(StringType.STRING, nameDeltaField.getDataType());
+ assertEquals(true, nameDeltaField.isNullable());
+
+ // Check active field
+ StructField activeDeltaField = deltaSchema.fields().get(2);
+ assertEquals("active", activeDeltaField.getName());
+ assertEquals(BooleanType.BOOLEAN, activeDeltaField.getDataType());
+ assertEquals(false, activeDeltaField.isNullable());
+ }
+
+ @Test
+ public void testFromInternalSchemaWithUUID() {
+ // Create an InternalSchema with UUID type
+ InternalField uuidField =
+ InternalField.builder()
+ .name("userId")
+ .schema(
+ InternalSchema.builder()
+ .name("binary")
+ .dataType(InternalType.UUID)
+ .isNullable(false)
+ .build())
+ .build();
+
+ InternalSchema internalSchema =
+ InternalSchema.builder()
+ .name("record")
+ .dataType(InternalType.RECORD)
+ .fields(Collections.singletonList(uuidField))
+ .build();
+
+ // Convert to Delta Kernel StructType
+ StructType deltaSchema = extractor.fromInternalSchema(internalSchema);
+
+ // Verify
+ assertNotNull(deltaSchema);
+ assertEquals(1, deltaSchema.fields().size());
+
+ StructField uuidDeltaField = deltaSchema.fields().get(0);
+ assertEquals("userId", uuidDeltaField.getName());
+ assertTrue(uuidDeltaField.getDataType() instanceof BinaryType);
+ assertEquals(false, uuidDeltaField.isNullable());
+
+ // Check metadata contains UUID marker
+ FieldMetadata metadata = uuidDeltaField.getMetadata();
+ assertTrue(metadata.contains(InternalSchema.XTABLE_LOGICAL_TYPE));
+ assertEquals("uuid", metadata.getString(InternalSchema.XTABLE_LOGICAL_TYPE));
+ }
+
+ @Test
+ public void testRoundTripConversion() {
+ // Create a Delta Kernel StructType
+ StructType originalDeltaSchema =
+ new StructType(
+ Arrays.asList(
+ new StructField("id", IntegerType.INTEGER, false),
+ new StructField("name", StringType.STRING, true),
+ new StructField("score", DoubleType.DOUBLE, false)));
+
+ // Convert to InternalSchema
+ InternalSchema internalSchema = extractor.toInternalSchema(originalDeltaSchema);
+ // Convert back to Delta Kernel StructType
+ StructType convertedDeltaSchema = extractor.fromInternalSchema(internalSchema);
+
+ // Verify structure matches
+ assertEquals(originalDeltaSchema.fields().size(), convertedDeltaSchema.fields().size());
+
+ for (int i = 0; i < originalDeltaSchema.fields().size(); i++) {
+ StructField original = originalDeltaSchema.fields().get(i);
+ StructField converted = convertedDeltaSchema.fields().get(i);
+
+ assertEquals(original.getName(), converted.getName());
+ assertEquals(original.getDataType(), converted.getDataType());
+ assertEquals(original.isNullable(), converted.isNullable());
+ }
+ }
}
diff --git a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSync.java b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSync.java
new file mode 100644
index 000000000..dec2daa04
--- /dev/null
+++ b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSync.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.kernel;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.ScanImpl;
+import io.delta.kernel.internal.SnapshotImpl;
+import io.delta.kernel.internal.actions.AddFile;
+import io.delta.kernel.utils.CloseableIterator;
+
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.model.InternalSnapshot;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.spi.sync.TableFormatSync;
+
+/**
+ * Validates that Delta Kernel tables are properly created/updated using
+ * DeltaKernelConversionTarget. Tests partitioning, schema evolution, and metadata sync without
+ * Spark SQL dependencies.
+ */
+public class TestDeltaKernelSync {
+ private static final Instant LAST_COMMIT_TIME = Instant.ofEpochSecond(1000);
+
+ @TempDir public Path tempDir;
+ private DeltaKernelConversionTarget conversionTarget;
+ private Path basePath;
+ private String tableName;
+ private Engine engine;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ tableName = "test-" + UUID.randomUUID();
+ basePath = tempDir.resolve(tableName);
+ Files.createDirectories(basePath);
+
+ Configuration hadoopConf = new Configuration();
+ engine = DefaultEngine.create(hadoopConf);
+
+ conversionTarget =
+ new DeltaKernelConversionTarget(
+ TargetTable.builder()
+ .name(tableName)
+ .basePath(basePath.toString())
+ .metadataRetention(Duration.of(1, ChronoUnit.HOURS))
+ .formatName(TableFormat.DELTA)
+ .build(),
+ engine);
+ }
+
+ @Test
+ public void testCreateSnapshotControlFlow() throws Exception {
+ InternalSchema schema1 = getInternalSchema();
+ List fields2 = new ArrayList<>(schema1.getFields());
+ fields2.add(
+ InternalField.builder()
+ .name("float_field")
+ .schema(
+ InternalSchema.builder()
+ .name("float")
+ .dataType(InternalType.FLOAT)
+ .isNullable(true)
+ .build())
+ .build());
+ InternalSchema schema2 = getInternalSchema().toBuilder().fields(fields2).build();
+ InternalTable table1 = getInternalTable(tableName, basePath, schema1, null, LAST_COMMIT_TIME);
+ InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME);
+
+ InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(), basePath);
+ InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), basePath);
+ InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), basePath);
+
+ InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2);
+ InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3);
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
+ validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2)));
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2);
+ validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3)));
+ }
+
+ @Test
+ public void testFileRemovalWithCheckpoint() throws Exception {
+ // This test does 11 syncs to trigger checkpoint creation (happens at 10th commit)
+ // and verifies that file removal works correctly after checkpoint exists
+ String checkpointTableName = "test_table_checkpoint_" + UUID.randomUUID();
+ Path checkpointTestPath = tempDir.resolve(checkpointTableName);
+ Files.createDirectories(checkpointTestPath);
+
+ InternalSchema schema = getInternalSchema();
+ InternalTable checkpointTable =
+ getInternalTable(checkpointTableName, checkpointTestPath, schema, null, LAST_COMMIT_TIME);
+
+ DeltaKernelConversionTarget checkpointTarget =
+ new DeltaKernelConversionTarget(
+ TargetTable.builder()
+ .name(checkpointTableName)
+ .basePath(checkpointTestPath.toString())
+ .metadataRetention(Duration.of(1, ChronoUnit.HOURS))
+ .formatName(TableFormat.DELTA)
+ .build(),
+ engine);
+
+ // Do 10 syncs to trigger checkpoint creation
+ for (int i = 0; i < 10; i++) {
+ InternalDataFile file1 = getDataFile(i * 2 + 1, Collections.emptyList(), checkpointTestPath);
+ InternalDataFile file2 = getDataFile(i * 2 + 2, Collections.emptyList(), checkpointTestPath);
+
+ InternalSnapshot snapshot = buildSnapshot(checkpointTable, String.valueOf(i), file1, file2);
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(checkpointTarget), snapshot);
+ }
+
+ // 11th sync: This triggers checkpoint creation at version 10
+ InternalDataFile file21 = getDataFile(21, Collections.emptyList(), checkpointTestPath);
+ InternalDataFile file22 = getDataFile(22, Collections.emptyList(), checkpointTestPath);
+ InternalSnapshot snapshot11 = buildSnapshot(checkpointTable, "10", file21, file22);
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(checkpointTarget), snapshot11);
+
+ // Checkpoint is created synchronously via post-commit hooks
+ Path checkpointFile =
+ checkpointTestPath.resolve("_delta_log/00000000000000000010.checkpoint.parquet");
+ assertTrue(Files.exists(checkpointFile), "Checkpoint file should exist after 10 commits");
+
+ // 12th sync: NOW checkpoint exists and can be used to detect file removals
+ InternalDataFile file23 = getDataFile(23, Collections.emptyList(), checkpointTestPath);
+ InternalDataFile file24 = getDataFile(24, Collections.emptyList(), checkpointTestPath);
+ InternalSnapshot snapshot12 = buildSnapshot(checkpointTable, "11", file23, file24);
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(checkpointTarget), snapshot12);
+
+ // Validate: Should only have file23 and file24 (file21/file22 should be removed)
+ validateDeltaTable(checkpointTestPath, new HashSet<>(Arrays.asList(file23, file24)));
+ }
+
+ @Test
+ public void testPrimitiveFieldPartitioning() throws Exception {
+ InternalSchema schema = getInternalSchema();
+ InternalPartitionField internalPartitionField =
+ InternalPartitionField.builder()
+ .sourceField(
+ InternalField.builder()
+ .name("string_field")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .build())
+ .build())
+ .transformType(PartitionTransformType.VALUE)
+ .build();
+ InternalTable table =
+ getInternalTable(
+ tableName,
+ basePath,
+ schema,
+ Collections.singletonList(internalPartitionField),
+ LAST_COMMIT_TIME);
+
+ List partitionValues1 =
+ Collections.singletonList(
+ PartitionValue.builder()
+ .partitionField(internalPartitionField)
+ .range(Range.scalar("level"))
+ .build());
+ List partitionValues2 =
+ Collections.singletonList(
+ PartitionValue.builder()
+ .partitionField(internalPartitionField)
+ .range(Range.scalar("warning"))
+ .build());
+ InternalDataFile dataFile1 = getDataFile(1, partitionValues1, basePath);
+ InternalDataFile dataFile2 = getDataFile(2, partitionValues1, basePath);
+ InternalDataFile dataFile3 = getDataFile(3, partitionValues2, basePath);
+
+ InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3);
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
+
+ // Validate all files are present
+ validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2, dataFile3)));
+
+ // Verify partition columns are set
+ Table deltaTable = Table.forPath(engine, basePath.toString());
+ Snapshot snapshot = deltaTable.getLatestSnapshot(engine);
+ SnapshotImpl snapshotImpl = (SnapshotImpl) snapshot;
+ Set partitionColumns = snapshotImpl.getMetadata().getPartitionColNames();
+ assertEquals(1, partitionColumns.size());
+ assertTrue(partitionColumns.contains("string_field"));
+ }
+
+ @Test
+ public void testMultipleFieldPartitioning() throws Exception {
+ InternalSchema schema = getInternalSchema();
+ InternalPartitionField internalPartitionField1 =
+ InternalPartitionField.builder()
+ .sourceField(
+ InternalField.builder()
+ .name("string_field")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .build())
+ .build())
+ .transformType(PartitionTransformType.VALUE)
+ .build();
+ InternalPartitionField internalPartitionField2 =
+ InternalPartitionField.builder()
+ .sourceField(
+ InternalField.builder()
+ .name("int_field")
+ .schema(InternalSchema.builder().name("int").dataType(InternalType.INT).build())
+ .build())
+ .transformType(PartitionTransformType.VALUE)
+ .build();
+ InternalTable table =
+ getInternalTable(
+ tableName,
+ basePath,
+ schema,
+ Arrays.asList(internalPartitionField1, internalPartitionField2),
+ LAST_COMMIT_TIME);
+
+ List partitionValues1 =
+ Arrays.asList(
+ PartitionValue.builder()
+ .partitionField(internalPartitionField1)
+ .range(Range.scalar("level"))
+ .build(),
+ PartitionValue.builder()
+ .partitionField(internalPartitionField2)
+ .range(Range.scalar(10))
+ .build());
+ List partitionValues2 =
+ Arrays.asList(
+ PartitionValue.builder()
+ .partitionField(internalPartitionField1)
+ .range(Range.scalar("level"))
+ .build(),
+ PartitionValue.builder()
+ .partitionField(internalPartitionField2)
+ .range(Range.scalar(20))
+ .build());
+ List partitionValues3 =
+ Arrays.asList(
+ PartitionValue.builder()
+ .partitionField(internalPartitionField1)
+ .range(Range.scalar("warning"))
+ .build(),
+ PartitionValue.builder()
+ .partitionField(internalPartitionField2)
+ .range(Range.scalar(20))
+ .build());
+
+ InternalDataFile dataFile1 = getDataFile(1, partitionValues1, basePath);
+ InternalDataFile dataFile2 = getDataFile(2, partitionValues2, basePath);
+ InternalDataFile dataFile3 = getDataFile(3, partitionValues3, basePath);
+
+ InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3);
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
+ validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2, dataFile3)));
+
+ // Verify partition columns
+ Table deltaTable = Table.forPath(engine, basePath.toString());
+ Snapshot snapshot = deltaTable.getLatestSnapshot(engine);
+ SnapshotImpl snapshotImpl = (SnapshotImpl) snapshot;
+ Set partitionColumns = snapshotImpl.getMetadata().getPartitionColNames();
+ assertEquals(2, partitionColumns.size());
+ assertTrue(partitionColumns.contains("string_field"));
+ assertTrue(partitionColumns.contains("int_field"));
+ }
+
+ @Test
+ @Disabled(
+ "Disabled due to tags not present in commitinfo - https://github.com/delta-io/delta/issues/6167")
+ public void testSourceTargetIdMapping() throws Exception {
+ InternalSchema baseSchema = getInternalSchema();
+ InternalTable sourceTable =
+ getInternalTable("source_table", basePath, baseSchema, null, LAST_COMMIT_TIME);
+
+ InternalDataFile sourceDataFile1 = getDataFile(101, Collections.emptyList(), basePath);
+ InternalDataFile sourceDataFile2 = getDataFile(102, Collections.emptyList(), basePath);
+ InternalDataFile sourceDataFile3 = getDataFile(103, Collections.emptyList(), basePath);
+
+ InternalSnapshot sourceSnapshot1 =
+ buildSnapshot(sourceTable, "0", sourceDataFile1, sourceDataFile2);
+ InternalSnapshot sourceSnapshot2 =
+ buildSnapshot(sourceTable, "1", sourceDataFile2, sourceDataFile3);
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), sourceSnapshot1);
+ Optional mappedTargetId1 =
+ conversionTarget.getTargetCommitIdentifier(sourceSnapshot1.getSourceIdentifier());
+ validateDeltaTable(basePath, new HashSet<>(Arrays.asList(sourceDataFile1, sourceDataFile2)));
+ assertTrue(mappedTargetId1.isPresent());
+ assertEquals("0", mappedTargetId1.get());
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), sourceSnapshot2);
+ Optional mappedTargetId2 =
+ conversionTarget.getTargetCommitIdentifier(sourceSnapshot2.getSourceIdentifier());
+ validateDeltaTable(basePath, new HashSet<>(Arrays.asList(sourceDataFile2, sourceDataFile3)));
+ assertTrue(mappedTargetId2.isPresent());
+ assertEquals("1", mappedTargetId2.get());
+
+ Optional unmappedTargetId = conversionTarget.getTargetCommitIdentifier("s3");
+ assertFalse(unmappedTargetId.isPresent());
+ }
+
+ @Test
+ public void testGetTargetCommitIdentifierWithNullSourceIdentifier() throws Exception {
+ InternalSchema baseSchema = getInternalSchema();
+ InternalTable internalTable =
+ getInternalTable("source_table", basePath, baseSchema, null, LAST_COMMIT_TIME);
+ InternalDataFile sourceDataFile = getDataFile(101, Collections.emptyList(), basePath);
+ InternalSnapshot snapshot = buildSnapshot(internalTable, "0", sourceDataFile);
+
+ // Mock the snapshot sync process
+ conversionTarget.beginSync(internalTable);
+ TableSyncMetadata tableSyncMetadata =
+ TableSyncMetadata.of(
+ internalTable.getLatestCommitTime(), new ArrayList<>(snapshot.getPendingCommits()));
+ conversionTarget.syncMetadata(tableSyncMetadata);
+ conversionTarget.syncSchema(internalTable.getReadSchema());
+ conversionTarget.syncPartitionSpec(internalTable.getPartitioningFields());
+ conversionTarget.syncFilesForSnapshot(snapshot.getPartitionedDataFiles());
+ conversionTarget.completeSync();
+
+ // getTargetCommitIdentifier is not supported in DeltaKernelConversionTarget
+ // because Delta Kernel 4.0.0 does not support commit tags
+ NotSupportedException exception =
+ assertThrows(
+ NotSupportedException.class, () -> conversionTarget.getTargetCommitIdentifier("0"));
+ assertTrue(
+ exception
+ .getMessage()
+ .contains("Source-to-target commit identifier mapping is not supported"));
+ }
+
+ @Test
+ public void testGetTableMetadata() throws Exception {
+ InternalSchema schema = getInternalSchema();
+ InternalTable table = getInternalTable(tableName, basePath, schema, null, LAST_COMMIT_TIME);
+ InternalDataFile dataFile = getDataFile(1, Collections.emptyList(), basePath);
+ InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile);
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), snapshot);
+
+ Optional metadata = conversionTarget.getTableMetadata();
+ assertTrue(metadata.isPresent(), "Metadata should be present after sync");
+ TableSyncMetadata syncMetadata = metadata.get();
+ assertNotNull(syncMetadata.getLastInstantSynced(), "Last instant synced should not be null");
+ }
+
+ private void validateDeltaTable(Path basePath, Set expectedFiles)
+ throws IOException {
+ Table table = Table.forPath(engine, basePath.toString());
+ assertNotNull(table);
+
+ Snapshot snapshot = table.getLatestSnapshot(engine);
+ assertNotNull(snapshot);
+
+ // Scan all files
+ ScanImpl scan = (ScanImpl) snapshot.getScanBuilder().build();
+ CloseableIterator scanFiles = scan.getScanFiles(engine, false);
+
+ Map pathToFile =
+ expectedFiles.stream()
+ .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity()));
+
+ int count = 0;
+ while (scanFiles.hasNext()) {
+ FilteredColumnarBatch batch = scanFiles.next();
+ CloseableIterator rows = batch.getRows();
+
+ while (rows.hasNext()) {
+ Row scanFileRow = rows.next();
+ AddFile addFile =
+ new AddFile(scanFileRow.getStruct(scanFileRow.getSchema().indexOf("add")));
+
+ String fullPath =
+ new org.apache.hadoop.fs.Path(basePath.resolve(addFile.getPath()).toUri()).toString();
+ InternalDataFile expected = pathToFile.get(fullPath);
+ assertNotNull(expected, "Unexpected file in Delta table: " + fullPath);
+ assertEquals(addFile.getSize(), expected.getFileSizeBytes());
+ count++;
+ }
+ }
+
+ assertEquals(
+ expectedFiles.size(), count, "Number of files from Delta scan don't match expectation");
+ }
+
+ private InternalSnapshot buildSnapshot(
+ InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) {
+ return InternalSnapshot.builder()
+ .table(table)
+ .partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles)))
+ .sourceIdentifier(sourceIdentifier)
+ .build();
+ }
+
+ private InternalTable getInternalTable(
+ String tableName,
+ Path basePath,
+ InternalSchema schema,
+ List partitionFields,
+ Instant lastCommitTime) {
+ return InternalTable.builder()
+ .name(tableName)
+ .basePath(basePath.toUri().toString())
+ .layoutStrategy(DataLayoutStrategy.FLAT)
+ .tableFormat(TableFormat.HUDI)
+ .readSchema(schema)
+ .partitioningFields(partitionFields)
+ .latestCommitTime(lastCommitTime)
+ .build();
+ }
+
+ private InternalDataFile getDataFile(
+ int index, List partitionValues, Path basePath) {
+ // Create actual physical file so Delta Kernel can reference it
+ try {
+ Path filePath = basePath.resolve("physical" + index + ".parquet");
+ Files.createFile(filePath);
+
+ String physicalPath = new org.apache.hadoop.fs.Path(filePath.toUri()).toString();
+
+ return InternalDataFile.builder()
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .fileSizeBytes(1000L + (index * 100L)) // Deterministic size based on index
+ .physicalPath(physicalPath)
+ .recordCount(100L + (index * 10L)) // Deterministic record count based on index
+ .partitionValues(partitionValues)
+ .columnStats(Collections.emptyList())
+ .lastModified(1000000000L + (index * 1000L)) // Deterministic timestamp based on index
+ .build();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create test data file", e);
+ }
+ }
+
+ private InternalSchema getInternalSchema() {
+ Map timestampMetadata = new HashMap<>();
+ timestampMetadata.put(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS);
+ return InternalSchema.builder()
+ .dataType(InternalType.RECORD)
+ .name("top_level_schema")
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("long_field")
+ .schema(
+ InternalSchema.builder()
+ .name("long")
+ .dataType(InternalType.LONG)
+ .isNullable(true)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("string_field")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("int_field")
+ .schema(
+ InternalSchema.builder()
+ .name("int")
+ .dataType(InternalType.INT)
+ .isNullable(true)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("timestamp_field")
+ .schema(
+ InternalSchema.builder()
+ .name("time")
+ .dataType(InternalType.TIMESTAMP)
+ .isNullable(true)
+ .metadata(timestampMetadata)
+ .build())
+ .build()))
+ .isNullable(false)
+ .build();
+ }
+
+ @Test
+ public void testTimestampNtz() throws Exception {
+ InternalSchema schema1 = getInternalSchemaWithTimestampNtz();
+ List fields2 = new ArrayList<>(schema1.getFields());
+ fields2.add(
+ InternalField.builder()
+ .name("float_field")
+ .schema(
+ InternalSchema.builder()
+ .name("float")
+ .dataType(InternalType.FLOAT)
+ .isNullable(true)
+ .build())
+ .build());
+ InternalSchema schema2 = getInternalSchema().toBuilder().fields(fields2).build();
+ InternalTable table1 = getInternalTable(tableName, basePath, schema1, null, LAST_COMMIT_TIME);
+ InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME);
+
+ InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(), basePath);
+ InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), basePath);
+ InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), basePath);
+
+ InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2);
+ InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3);
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
+ validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2)));
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2);
+ validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3)));
+ }
+
+ private InternalSchema getInternalSchemaWithTimestampNtz() {
+ Map timestampMetadata = new HashMap<>();
+ timestampMetadata.put(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);
+ List fields = new ArrayList<>(getInternalSchema().getFields());
+ fields.add(
+ InternalField.builder()
+ .name("timestamp_ntz_field")
+ .schema(
+ InternalSchema.builder()
+ .name("time_ntz")
+ .dataType(InternalType.TIMESTAMP_NTZ)
+ .isNullable(true)
+ .metadata(timestampMetadata)
+ .build())
+ .build());
+ return getInternalSchema().toBuilder().fields(fields).build();
+ }
+}