diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java index 61a2ba8f7..71f989325 100644 --- a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java @@ -27,12 +27,14 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.OptionalLong; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.Builder; import lombok.NonNull; +import lombok.extern.log4j.Log4j2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -63,6 +65,7 @@ import org.apache.xtable.spi.extractor.ConversionSource; @Builder +@Log4j2 public class ParquetConversionSource implements ConversionSource { private static final ParquetSchemaExtractor schemaExtractor = @@ -79,6 +82,7 @@ public class ParquetConversionSource implements ConversionSource { private final String tableName; private final String basePath; @NonNull private final Configuration hadoopConf; + private final ParquetDataManager parquetDataManager; private InternalTable createInternalTableFromFile(LocatedFileStatus latestFile) { ParquetMetadata parquetMetadata = @@ -136,7 +140,8 @@ private InternalDataFile createInternalDataFileFromParquetFile( .physicalPath(parquetFile.getPath().toString()) .partitionValues( partitionValueExtractor.extractPartitionValues( - partitionSpecExtractor.spec(schema), basePath)) + partitionSpecExtractor.spec(schema), + HudiPathUtils.getPartitionPath(new Path(basePath), parquetFile.getPath()))) .lastModified(parquetFile.getModificationTime()) .fileSizeBytes(parquetFile.getLen()) .columnStats( @@ -162,7 +167,7 @@ public TableChange getTableChangeForCommit(Long modificationTime) { parquetFiles .filter(fileStatus -> fileStatus.getModificationTime() > modificationTime) .collect(Collectors.toList()); - InternalTable internalTable = getMostRecentTable(parquetFiles); + InternalTable internalTable = getMostRecentTable(getParquetFiles(hadoopConf, basePath)); for (FileStatus tableStatus : tableChangesAfter) { InternalDataFile currentDataFile = createInternalDataFileFromParquetFile(tableStatus, internalTable.getReadSchema()); @@ -170,6 +175,9 @@ public TableChange getTableChangeForCommit(Long modificationTime) { } return TableChange.builder() + .sourceIdentifier( + getCommitIdentifier( + parquetDataManager.getMostRecentParquetFile().getModificationTime())) .tableAsOfChange(internalTable) .filesDiff(InternalFilesDiff.builder().filesAdded(addedInternalDataFiles).build()) .build(); @@ -235,8 +243,32 @@ private Stream getParquetFiles(Configuration hadoopConf, Stri } @Override - public boolean isIncrementalSyncSafeFrom(Instant instant) { - return false; + public boolean isIncrementalSyncSafeFrom(Instant timeInMillis) { + Stream parquetFilesMetadata = parquetDataManager.getCurrentFileInfo(); + OptionalLong earliestModTimeOpt = + parquetFilesMetadata.mapToLong(ParquetFileInfo::getModificationTime).min(); + + if (!earliestModTimeOpt.isPresent()) { + log.warn("No parquet files found in table {}. Incremental sync is not possible.", tableName); + return false; + } + + long earliestModTime = earliestModTimeOpt.getAsLong(); + + if (earliestModTime > timeInMillis.toEpochMilli()) { + log.warn( + "Incremental sync is not safe. Earliest available metadata (time={}) is newer " + + "than requested instant {}.", + Instant.ofEpochMilli(earliestModTime), + timeInMillis.toEpochMilli()); + return false; + } + + log.debug( + "Incremental sync is safe from instant {} for table {}", + timeInMillis.toEpochMilli(), + tableName); + return true; } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java index 0d508823a..81035ceaa 100644 --- a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java @@ -32,12 +32,15 @@ public ParquetConversionSource getConversionSourceInstance(SourceTable sourceTab ParquetPartitionValueExtractor partitionValueExtractor = new ParquetPartitionValueExtractor( sourcePartitionSpecExtractor.getPathToPartitionFieldFormat()); + ParquetDataManager dataManager = + new ParquetDataManager(this.hadoopConf, sourceTable.getBasePath()); return ParquetConversionSource.builder() .tableName(sourceTable.getName()) .basePath(sourceTable.getBasePath()) .hadoopConf(this.hadoopConf) .partitionSpecExtractor(sourcePartitionSpecExtractor) .partitionValueExtractor(partitionValueExtractor) + .parquetDataManager(dataManager) .build(); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java new file mode 100644 index 000000000..dabe70610 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetDataManager.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.util.functional.RemoteIterators; + +import org.apache.xtable.exception.ReadException; + +/** + * Manages Parquet File's Metadata + * + *

This class provides functions to handle Parquet metadata, creating metadata objects from + * parquet files and filtering the files based on the modification times. + */ +@Log4j2 +@RequiredArgsConstructor +public class ParquetDataManager { + private final Configuration hadoopConf; + private final String basePath; + private final FileSystem fileSystem; + + public ParquetDataManager(Configuration hadoopConf, String basePath) { + this.hadoopConf = hadoopConf; + this.basePath = basePath; + try { + URI uri = Paths.get(basePath).toUri(); + this.fileSystem = FileSystem.get(uri, hadoopConf); + } catch (IOException e) { + throw new ReadException("Unable to initialize file system for base path: " + basePath, e); + } + } + + @Getter(value = AccessLevel.PRIVATE, lazy = true) + private final List parquetFiles = loadParquetFiles(); + + ParquetFileInfo getMostRecentParquetFile() { + LocatedFileStatus file = + getParquetFiles().stream() + .max(Comparator.comparing(LocatedFileStatus::getModificationTime)) + .orElseThrow(() -> new IllegalStateException("No files found")); + return new ParquetFileInfo(hadoopConf, file); + } + + ParquetFileInfo getParquetDataFileAt(long targetTime) { + return getParquetFiles().stream() + .filter(file -> file.getModificationTime() >= targetTime) + .min(Comparator.comparing(LocatedFileStatus::getModificationTime)) + .map(file -> new ParquetFileInfo(hadoopConf, file)) + .orElseThrow(() -> new IllegalStateException("No file found at or after " + targetTime)); + } + + private List loadParquetFiles() { + try { + RemoteIterator iterator = fileSystem.listFiles(new Path(basePath), true); + return RemoteIterators.toList(iterator).stream() + .filter(file -> file.getPath().getName().endsWith("parquet")) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new ReadException("Unable to read files from file system", e); + } + } + + Stream getCurrentFileInfo() { + return getParquetFiles().stream() + .map(fileStatus -> new ParquetFileInfo(hadoopConf, fileStatus)); + } + + List getParquetFilesMetadataAfterTime(long syncTime) { + return getParquetFiles().stream() + .filter(file -> file.getModificationTime() >= syncTime) + .map(file -> new ParquetFileInfo(hadoopConf, file)) + .collect(Collectors.toList()); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetFileInfo.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetFileInfo.java new file mode 100644 index 000000000..539349edb --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetFileInfo.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import lombok.Getter; +import lombok.Value; +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; + +@Log4j2 +@Value +class ParquetFileInfo { + @Getter(lazy = true) + ParquetMetadata metadata = getMetadataInternal(); + + long modificationTime; + long size; + Path path; + Configuration conf; + + public ParquetFileInfo(Configuration conf, FileStatus file) { + this.modificationTime = file.getModificationTime(); + this.path = file.getPath(); + this.size = file.getLen(); + this.conf = conf; + } + + private ParquetMetadata getMetadataInternal() { + return ParquetMetadataExtractor.readParquetMetadata(conf, path); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java index f022b7531..f29a186d9 100644 --- a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java @@ -20,9 +20,6 @@ import java.io.IOException; -import lombok.AccessLevel; -import lombok.NoArgsConstructor; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; @@ -30,11 +27,11 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; import org.apache.parquet.schema.MessageType; import org.apache.xtable.exception.ReadException; -@NoArgsConstructor(access = AccessLevel.PRIVATE) public class ParquetMetadataExtractor { private static final ParquetMetadataExtractor INSTANCE = new ParquetMetadataExtractor(); @@ -43,16 +40,23 @@ public static ParquetMetadataExtractor getInstance() { return INSTANCE; } - public MessageType getSchema(ParquetMetadata footer) { - return footer.getFileMetaData().getSchema(); + public static MessageType getSchema(ParquetMetadata footer) { + MessageType schema = footer.getFileMetaData().getSchema(); + return schema; } - public ParquetMetadata readParquetMetadata(Configuration conf, Path filePath) { + public static ParquetMetadata readParquetMetadata(Configuration conf, Path filePath) { + InputFile file = null; + try { + file = HadoopInputFile.fromPath(filePath, conf); + } catch (IOException e) { + throw new ReadException("Failed to read the parquet file", e); + } + ParquetReadOptions options = HadoopReadOptions.builder(conf, filePath).build(); - try (ParquetFileReader fileReader = - ParquetFileReader.open(HadoopInputFile.fromPath(filePath, conf), options)) { + try (ParquetFileReader fileReader = ParquetFileReader.open(file, options)) { return fileReader.getFooter(); - } catch (IOException e) { + } catch (Exception e) { throw new ReadException("Failed to read the parquet file", e); } } diff --git a/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java index 2ffa579a1..872b149d6 100644 --- a/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/parquet/ITParquetConversionSource.java @@ -18,6 +18,7 @@ package org.apache.xtable.parquet; +import static org.apache.spark.sql.functions.expr; import static org.apache.xtable.GenericTable.getTableName; import static org.apache.xtable.model.storage.TableFormat.DELTA; import static org.apache.xtable.model.storage.TableFormat.HUDI; @@ -25,11 +26,15 @@ import static org.apache.xtable.model.storage.TableFormat.PARQUET; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; import java.sql.Timestamp; import java.time.Duration; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -45,6 +50,10 @@ import lombok.Builder; import lombok.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -59,6 +68,7 @@ import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -74,6 +84,10 @@ import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.hudi.HudiTestUtil; +import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; public class ITParquetConversionSource { @@ -167,7 +181,7 @@ private static Stream provideArgsForSyncTesting() { return partitionConfigs.stream() .flatMap( partitionConfig -> - Stream.of(SyncMode.FULL) // Incremental sync is not yet supported + Stream.of(SyncMode.FULL, SyncMode.INCREMENTAL) .map( syncMode -> Arguments.of( @@ -418,6 +432,124 @@ private void checkDatasetEquivalence( } } + @Test + void testIncrementalSyncWithMultiplePartitions() throws IOException { + + Configuration conf = sparkSession.sparkContext().hadoopConfiguration(); + + StructType schema = + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("id", DataTypes.IntegerType, false), + DataTypes.createStructField("value", DataTypes.StringType, false), + DataTypes.createStructField("year", DataTypes.IntegerType, false), + DataTypes.createStructField("month", DataTypes.IntegerType, false) + }); + List data = + Arrays.asList( + RowFactory.create(100, "A", 2026, 12), + RowFactory.create(101, "AA", 2026, 12), + RowFactory.create(102, "CB", 2027, 11), + RowFactory.create(103, "BA", 2027, 11)); + + Dataset dfInit = sparkSession.createDataFrame(data, schema); + Path fixedPath = Paths.get("target", "fixed-parquet-data", "parquet_table_test_2"); + // String outputPath = fixedPath.toString(); + Dataset df = dfInit.withColumn("full_date", expr("make_date(year, month, 1)")); + String outputPath = + new java.io.File("target/fixed-parquet-data/parquet_table_test_2").getAbsolutePath(); + df.coalesce(1).write().partitionBy("year", "month").mode("overwrite").parquet(outputPath); + + // test find files to sync + FileSystem fs = FileSystem.get(fixedPath.toUri(), conf); + // set the modification time to the table file + // update modificationTime for file to append + // many partitions case + List newPartitions = Arrays.asList("year=2026/month=12", "year=2027/month=11"); + long targetModificationTime = System.currentTimeMillis() - 360000; + long newModificationTime = System.currentTimeMillis() - 50000; + long testTime = System.currentTimeMillis() - 90000; // between two prev times + for (String partition : newPartitions) { + org.apache.hadoop.fs.Path partitionPath = + new org.apache.hadoop.fs.Path(outputPath, partition); + if (fs.exists(partitionPath)) { + updateModificationTimeRecursive(fs, partitionPath, targetModificationTime); + } + } + // create new file to append using Spark + List futureDataToSync = + Arrays.asList( + RowFactory.create(101, "A", 2026, 12), + RowFactory.create(301, "D", 2027, 11), + RowFactory.create(302, "DA", 2027, 11)); + Dataset dfToSyncInit = sparkSession.createDataFrame(futureDataToSync, schema); + Dataset dfToSync = dfToSyncInit.withColumn("full_date", expr("make_date(year, month, 1)")); + dfToSync.coalesce(1).write().partitionBy("year", "month").mode("append").parquet(outputPath); + + // conversionSource operations + Properties sourceProperties = new Properties(); + String partitionConfig = "full_date:MONTH:year=yyyy/month=MM"; + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); + + SourceTable tableConfig = + SourceTable.builder() + .name("parquet_table_test_2") + .basePath(fixedPath.toAbsolutePath().toString()) + .additionalProperties(sourceProperties) + .formatName(TableFormat.PARQUET) + .build(); + + ParquetConversionSourceProvider conversionSourceProvider = + new ParquetConversionSourceProvider(); + conversionSourceProvider.init(conf); + ParquetConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance(tableConfig); + + for (String partition : newPartitions) { + org.apache.hadoop.fs.Path partitionPath = + new org.apache.hadoop.fs.Path(outputPath, partition); + + RemoteIterator it = fs.listFiles(partitionPath, false); + while (it.hasNext()) { + LocatedFileStatus fileStatus = it.next(); + + if (fileStatus.getModificationTime() > newModificationTime) { + fs.setTimes(fileStatus.getPath(), newModificationTime, -1); + } else { + fs.setTimes(fileStatus.getPath(), targetModificationTime, -1); + } + } + fs.setTimes(partitionPath, newModificationTime, -1); + } + + InternalTable result = conversionSource.getTable(newModificationTime); + assertEquals( + Instant.ofEpochMilli(newModificationTime).toString(), + result.getLatestCommitTime().toString()); + assertEquals("parquet_table_test_2", result.getName()); + assertEquals(TableFormat.PARQUET, result.getTableFormat()); + assertNotNull(result.getReadSchema()); + InternalSnapshot snapshot = conversionSource.getCurrentSnapshot(); + assertNotNull(snapshot); + TableChange changes = conversionSource.getTableChangeForCommit(newModificationTime); + assertNotNull(changes); + Instant instantBeforeFirstSnapshot = + Instant.ofEpochMilli(snapshot.getTable().getLatestCommitTime().toEpochMilli()); + assertEquals(instantBeforeFirstSnapshot.toEpochMilli(), newModificationTime); + assertTrue(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(testTime))); + } + + private void updateModificationTimeRecursive( + FileSystem fs, org.apache.hadoop.fs.Path path, long time) throws IOException { + RemoteIterator it = fs.listFiles(path, true); + while (it.hasNext()) { + LocatedFileStatus status = it.next(); + if (status.getPath().getName().endsWith(".parquet")) { + fs.setTimes(status.getPath(), time, -1); + } + } + } + @Builder @Value private static class TableFormatPartitionDataHolder { diff --git a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetDataManager.java b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetDataManager.java new file mode 100644 index 000000000..7a0b0ce30 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetDataManager.java @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.FileTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class TestParquetDataManager { + + private static final Configuration CONF = new Configuration(); + + // Helper method to create a mock LocatedFileStatus + private static LocatedFileStatus createMockFileStatus(String name, long modTime) { + LocatedFileStatus mockStatus = mock(LocatedFileStatus.class); + org.apache.hadoop.fs.Path mockPath = mock(org.apache.hadoop.fs.Path.class); + when(mockPath.getName()).thenReturn(name); + when(mockStatus.getPath()).thenReturn(mockPath); + when(mockStatus.getModificationTime()).thenReturn(modTime); + when(mockStatus.getLen()).thenReturn(1024L); + return mockStatus; + } + + private static class StubbedRemoteIterator implements RemoteIterator { + private final Iterator files; + + public StubbedRemoteIterator(List files) { + this.files = files.iterator(); + } + + @Override + public boolean hasNext() { + return files.hasNext(); + } + + @Override + public LocatedFileStatus next() { + return files.next(); + } + } + + // Helper method to create a mock FileSystem + private static FileSystem createMockFileSystem(RemoteIterator iterator) + throws IOException { + FileSystem mockFs = mock(FileSystem.class); + when(mockFs.listFiles(any(org.apache.hadoop.fs.Path.class), anyBoolean())).thenReturn(iterator); + return mockFs; + } + + @Test + void testGetMostRecentParquetFile_withMultipleFiles() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 3000L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 2000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + ParquetFileInfo result = manager.getMostRecentParquetFile(); + + assertNotNull(result); + assertEquals(3000L, result.getModificationTime()); + } + + @Test + void testGetMostRecentParquetFile_noFiles() throws IOException { + RemoteIterator iterator = new StubbedRemoteIterator(new ArrayList<>()); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + IllegalStateException exception = + assertThrows(IllegalStateException.class, manager::getMostRecentParquetFile); + assertEquals("No files found", exception.getMessage()); + } + + @Test + void testGetParquetDataFileAt_exactMatch() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + ParquetFileInfo result = manager.getParquetDataFileAt(2000L); + + assertNotNull(result); + assertEquals(2000L, result.getModificationTime()); + } + + @Test + void testGetParquetDataFileAt_firstAfterTime() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2500L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + ParquetFileInfo result = manager.getParquetDataFileAt(2000L); + + assertNotNull(result); + assertEquals(2500L, result.getModificationTime()); + } + + @Test + void testGetParquetDataFileAt_multipleAfterTime() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2500L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + ParquetFileInfo result = manager.getParquetDataFileAt(2000L); + + assertNotNull(result); + assertEquals(2500L, result.getModificationTime()); + } + + @Test + void testGetParquetDataFileAt_noMatch() throws IOException { + RemoteIterator iterator = new StubbedRemoteIterator(Collections.emptyList()); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + IllegalStateException exception = + assertThrows(IllegalStateException.class, () -> manager.getParquetDataFileAt(5000L)); + assertTrue(exception.getMessage().contains("No file found at or after 5000")); + } + + @Test + void testGetParquetDataFileAt_allBefore() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + IllegalStateException exception = + assertThrows(IllegalStateException.class, () -> manager.getParquetDataFileAt(3000L)); + assertTrue(exception.getMessage().contains("No file found at or after 3000")); + } + + @Test + void testGetCurrentFileInfo_multipleFiles() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + Stream result = manager.getCurrentFileInfo(); + + assertNotNull(result); + List fileList = result.collect(Collectors.toList()); + assertEquals(3, fileList.size()); + assertEquals(1000L, fileList.get(0).getModificationTime()); + assertEquals(2000L, fileList.get(1).getModificationTime()); + assertEquals(3000L, fileList.get(2).getModificationTime()); + } + + @Test + void testGetCurrentFileInfo_emptyList() throws IOException { + RemoteIterator iterator = new StubbedRemoteIterator(Collections.emptyList()); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + Stream result = manager.getCurrentFileInfo(); + + assertNotNull(result); + assertEquals(0, result.count()); + } + + @Test + void testGetCurrentFileInfo_streamCharacteristics() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + Stream result = manager.getCurrentFileInfo(); + + assertNotNull(result); + assertTrue(result.allMatch(info -> info.getModificationTime() > 0)); + } + + @Test + void testGetParquetFilesMetadataAfterTime_someMatch() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + List result = manager.getParquetFilesMetadataAfterTime(2000L); + + assertNotNull(result); + assertEquals(2, result.size()); + assertEquals(2000L, result.get(0).getModificationTime()); + assertEquals(3000L, result.get(1).getModificationTime()); + } + + @Test + void testGetParquetFilesMetadataAfterTime_allMatch() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 2000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + List result = manager.getParquetFilesMetadataAfterTime(1000L); + + assertNotNull(result); + assertEquals(2, result.size()); + } + + @Test + void testGetParquetFilesMetadataAfterTime_noneMatch() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + List result = manager.getParquetFilesMetadataAfterTime(5000L); + + assertNotNull(result); + assertEquals(0, result.size()); + } + + @Test + void testGetParquetFilesMetadataAfterTime_exactTimeMatch() throws IOException { + LocatedFileStatus file1 = createMockFileStatus("file1.parquet", 1000L); + LocatedFileStatus file2 = createMockFileStatus("file2.parquet", 2000L); + LocatedFileStatus file3 = createMockFileStatus("file3.parquet", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(file1, file2, file3)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + List result = manager.getParquetFilesMetadataAfterTime(2000L); + + assertNotNull(result); + assertEquals(2, result.size()); + assertEquals(2000L, result.get(0).getModificationTime()); + assertEquals(3000L, result.get(1).getModificationTime()); + } + + @Test + void testGetParquetFiles_caching() throws IOException { + LocatedFileStatus file = createMockFileStatus("file.parquet", 1000L); + RemoteIterator iterator = + new StubbedRemoteIterator(Collections.singletonList(file)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + // No filesystem access should happen yet + verify(mockFs, never()).listFiles(any(org.apache.hadoop.fs.Path.class), anyBoolean()); + + // Access multiple times + manager.getCurrentFileInfo(); + manager.getMostRecentParquetFile(); + manager.getParquetFilesMetadataAfterTime(0L); + + // Verify filesystem was accessed only once + verify(mockFs, times(1)).listFiles(any(org.apache.hadoop.fs.Path.class), anyBoolean()); + } + + @Test + void testOnlyParquetFilesIncluded() throws IOException { + LocatedFileStatus parquetFile = createMockFileStatus("data.parquet", 1000L); + LocatedFileStatus txtFile = createMockFileStatus("readme.txt", 2000L); + LocatedFileStatus jsonFile = createMockFileStatus("config.json", 3000L); + + RemoteIterator iterator = + new StubbedRemoteIterator(Arrays.asList(parquetFile, txtFile, jsonFile)); + FileSystem mockFs = createMockFileSystem(iterator); + + ParquetDataManager manager = new ParquetDataManager(CONF, "test-path", mockFs); + + List result = manager.getParquetFilesMetadataAfterTime(0L); + + assertEquals(1, result.size()); + assertEquals(1000L, result.get(0).getModificationTime()); + } + + @Test + void testWithRealFileSystem_multipleFiles(@TempDir Path tempDir) throws IOException { + // Create multiple parquet files with different modification times + Path file1 = tempDir.resolve("data1.parquet"); + Path file2 = tempDir.resolve("data2.parquet"); + Path file3 = tempDir.resolve("data3.parquet"); + + Files.createFile(file1); + Files.createFile(file2); + Files.createFile(file3); + + // Set different modification times + Files.setLastModifiedTime(file1, FileTime.fromMillis(1000L)); + Files.setLastModifiedTime(file2, FileTime.fromMillis(3000L)); + Files.setLastModifiedTime(file3, FileTime.fromMillis(2000L)); + + ParquetDataManager manager = new ParquetDataManager(CONF, tempDir.toString()); + + ParquetFileInfo mostRecent = manager.getMostRecentParquetFile(); + assertNotNull(mostRecent); + assertEquals(3000L, mostRecent.getModificationTime()); + + List afterTime = manager.getParquetFilesMetadataAfterTime(2000L); + assertEquals(2, afterTime.size()); + } + + @Test + void testWithRealFileSystem_nestedDirectories(@TempDir Path tempDir) throws IOException { + // Create nested directory structure + Path subDir1 = tempDir.resolve("subdir1"); + Path subDir2 = tempDir.resolve("subdir2"); + Files.createDirectories(subDir1); + Files.createDirectories(subDir2); + + // Create parquet files in different directories + Path file1 = tempDir.resolve("root.parquet"); + Path file2 = subDir1.resolve("nested1.parquet"); + Path file3 = subDir2.resolve("nested2.parquet"); + + Files.createFile(file1); + Files.createFile(file2); + Files.createFile(file3); + + Files.setLastModifiedTime(file1, FileTime.fromMillis(1000L)); + Files.setLastModifiedTime(file2, FileTime.fromMillis(2000L)); + Files.setLastModifiedTime(file3, FileTime.fromMillis(3000L)); + ParquetDataManager manager = new ParquetDataManager(CONF, tempDir.toString()); + + List allFiles = manager.getParquetFilesMetadataAfterTime(0L); + assertEquals(3, allFiles.size()); + + ParquetFileInfo mostRecent = manager.getMostRecentParquetFile(); + assertEquals(3000L, mostRecent.getModificationTime()); + } + + @Test + void testWithRealFileSystem_mixedFileTypes(@TempDir Path tempDir) throws IOException { + // Create mix of parquet and non-parquet files + Path parquetFile1 = tempDir.resolve("data1.parquet"); + Path parquetFile2 = tempDir.resolve("data2.parquet"); + Path txtFile = tempDir.resolve("readme.txt"); + Path jsonFile = tempDir.resolve("config.json"); + + Files.createFile(parquetFile1); + Files.createFile(parquetFile2); + Files.createFile(txtFile); + Files.createFile(jsonFile); + + Files.setLastModifiedTime(parquetFile1, FileTime.fromMillis(1000L)); + Files.setLastModifiedTime(parquetFile2, FileTime.fromMillis(2000L)); + Files.setLastModifiedTime(txtFile, FileTime.fromMillis(3000L)); + Files.setLastModifiedTime(jsonFile, FileTime.fromMillis(4000L)); + + ParquetDataManager manager = new ParquetDataManager(CONF, tempDir.toString()); + + List allFiles = manager.getParquetFilesMetadataAfterTime(0L); + // Only parquet files should be included + assertEquals(2, allFiles.size()); + + ParquetFileInfo mostRecent = manager.getMostRecentParquetFile(); + // Most recent parquet file, not the txt or json + assertEquals(2000L, mostRecent.getModificationTime()); + } +}