diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java index f7cb1f2cabc..bc1eb214c7f 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java @@ -152,6 +152,34 @@ public List getNewFilesForRound(ReplicationRound replicationRound) throws return filesInRound; } + /** + * Retrieves new replication log files that belong to replication rounds from startRound to + * endRound (inclusive). Iterates through all rounds in the range and collects valid log files + * from each round's shard directory. + * @param startRound - The starting replication round (inclusive) + * @param endRound - The ending replication round (inclusive) + * @return List of valid log file paths from startRound to endRound, empty list if startRound > + * endRound + * @throws IOException if there's an error accessing the file system + */ + public List getNewFiles(ReplicationRound startRound, ReplicationRound endRound) + throws IOException { + List files = new ArrayList<>(); + // Early return if startRound is after endRound (invalid range) + if (startRound.getStartTime() > endRound.getStartTime()) { + return files; + } + // Iterate through all rounds from startRound to endRound (exclusive of endRound) + ReplicationRound firstRound = startRound; + while (!firstRound.equals(endRound)) { + files.addAll(getNewFilesForRound(firstRound)); + firstRound = replicationShardDirectoryManager.getNextRound(firstRound); + } + // Add the files for the endRound (inclusive) + files.addAll(getNewFilesForRound(endRound)); + return files; + } + /** * Retrieves all valid log files currently in the in-progress directory. * @return List of valid log file paths in the in-progress directory, empty list if directory diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java index 36daca8b97d..b6ce96e62c1 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplay.java @@ -31,6 +31,7 @@ import org.apache.phoenix.replication.ReplicationLogDiscovery; import org.apache.phoenix.replication.ReplicationLogTracker; import org.apache.phoenix.replication.ReplicationRound; +import org.apache.phoenix.replication.ReplicationShardDirectoryManager; import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery; import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscoveryReplayImpl; import org.slf4j.Logger; @@ -455,15 +456,47 @@ protected HAGroupStoreRecord getHAGroupRecord() throws IOException { * Determines whether failover should be triggered based on completion criteria. Failover is safe * to trigger when all of the following conditions are met: 1. A failover has been requested * (failoverPending is true) 2. No files are currently in the in-progress directory 3. No new - * files exist for ongoing round These conditions ensure all replication logs have been processed - * before transitioning the cluster from STANDBY to ACTIVE state. + * files exist from the next round to process up to the current timestamp round. The third + * condition checks for new files in the range from nextRoundToProcess (derived from + * getLastRoundProcessed()) to currentTimestampRound (derived from current time). This ensures all + * replication logs up to the current time have been processed before transitioning the cluster + * from STANDBY to ACTIVE state. * @return true if all conditions are met and failover should be triggered, false otherwise * @throws IOException if there's an error checking file status */ protected boolean shouldTriggerFailover() throws IOException { - return failoverPending.get() && replicationLogTracker.getInProgressFiles().isEmpty() - && replicationLogTracker.getNewFilesForRound(replicationLogTracker - .getReplicationShardDirectoryManager().getNextRound(getLastRoundProcessed())).isEmpty(); + LOG.debug("Checking if failover should be triggered. failoverPending={}", failoverPending); + // Check if failover has been requested + if (!failoverPending.get()) { + LOG.debug("Failover not triggered. failoverPending is false."); + return false; + } + // Check if in-progress directory is empty + boolean isInProgressDirectoryEmpty = replicationLogTracker.getInProgressFiles().isEmpty(); + if (!isInProgressDirectoryEmpty) { + LOG.debug("Failover not triggered. In progress directory is not empty."); + return false; + } + // Check if there are any new files from next round to current timestamp round + ReplicationShardDirectoryManager replicationShardDirectoryManager = + replicationLogTracker.getReplicationShardDirectoryManager(); + ReplicationRound nextRoundToProcess = + replicationShardDirectoryManager.getNextRound(getLastRoundProcessed()); + ReplicationRound currentTimestampRound = replicationShardDirectoryManager + .getReplicationRoundFromStartTime(EnvironmentEdgeManager.currentTime()); + LOG.debug("Checking the new files from next round {} to current timestamp round {}.", + nextRoundToProcess, currentTimestampRound); + boolean isInDirectoryEmpty = + replicationLogTracker.getNewFiles(nextRoundToProcess, currentTimestampRound).isEmpty(); + + if (!isInDirectoryEmpty) { + LOG.debug( + "Failover not triggered. New files exist from next round to current " + "timestamp round."); + return false; + } + + LOG.info("Failover can be triggered."); + return true; } protected void triggerFailover() { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java index 68045bf5e04..a20d9221061 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogDiscoveryReplayTestIT.java @@ -1816,7 +1816,7 @@ public Path getInProgressDirPath() { /** * Tests the shouldTriggerFailover method with various combinations of failoverPending, - * in-progress files, and new files for next round. + * in-progress files, and new files from next round to current timestamp round. */ @Test public void testShouldTriggerFailover() throws IOException { @@ -1838,13 +1838,16 @@ public void testShouldTriggerFailover() throws IOException { try { // Create test rounds ReplicationRound testRound = new ReplicationRound(1704153600000L, 1704153660000L); - ReplicationRound nextRound = - tracker.getReplicationShardDirectoryManager().getNextRound(testRound); + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + ReplicationRound nextRoundToProcess = shardManager.getNextRound(testRound); + ReplicationRound currentTimestampRound = + shardManager.getReplicationRoundFromStartTime(currentTime); // Test Case 1: All conditions true - should return true { when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList()); - when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList()); + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)) + .thenReturn(Collections.emptyList()); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); discovery.setLastRoundInSync(testRound); @@ -1858,7 +1861,8 @@ public void testShouldTriggerFailover() throws IOException { // Test Case 2: failoverPending is false - should return false { when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList()); - when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList()); + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)) + .thenReturn(Collections.emptyList()); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); discovery.setLastRoundInSync(testRound); @@ -1873,7 +1877,8 @@ public void testShouldTriggerFailover() throws IOException { { when(tracker.getInProgressFiles()) .thenReturn(Collections.singletonList(new Path("test.plog"))); - when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList()); + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)) + .thenReturn(Collections.emptyList()); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); discovery.setLastRoundInSync(testRound); @@ -1884,10 +1889,11 @@ public void testShouldTriggerFailover() throws IOException { discovery.shouldTriggerFailover()); } - // Test Case 4: new files exist for next round - should return false + // Test Case 4: new files exist from next round to current timestamp round - should return + // false { when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList()); - when(tracker.getNewFilesForRound(nextRound)) + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)) .thenReturn(Collections.singletonList(new Path("test.plog"))); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); @@ -1895,7 +1901,8 @@ public void testShouldTriggerFailover() throws IOException { discovery.setLastRoundProcessed(testRound); discovery.setFailoverPending(true); - assertFalse("Should not trigger failover when new files exist for next round", + assertFalse( + "Should not trigger failover when new files exist from next round to current timestamp round", discovery.shouldTriggerFailover()); } @@ -1903,7 +1910,8 @@ public void testShouldTriggerFailover() throws IOException { { when(tracker.getInProgressFiles()) .thenReturn(Collections.singletonList(new Path("test.plog"))); - when(tracker.getNewFilesForRound(nextRound)).thenReturn(Collections.emptyList()); + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)) + .thenReturn(Collections.emptyList()); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); discovery.setLastRoundInSync(testRound); @@ -1918,7 +1926,7 @@ public void testShouldTriggerFailover() throws IOException { // Test Case 6: failoverPending false AND new files exist - should return false { when(tracker.getInProgressFiles()).thenReturn(Collections.emptyList()); - when(tracker.getNewFilesForRound(nextRound)) + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)) .thenReturn(Collections.singletonList(new Path("test.plog"))); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); @@ -1934,7 +1942,7 @@ public void testShouldTriggerFailover() throws IOException { { when(tracker.getInProgressFiles()) .thenReturn(Collections.singletonList(new Path("test1.plog"))); - when(tracker.getNewFilesForRound(nextRound)) + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)) .thenReturn(Collections.singletonList(new Path("test2.plog"))); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); @@ -1950,7 +1958,7 @@ public void testShouldTriggerFailover() throws IOException { { when(tracker.getInProgressFiles()) .thenReturn(Collections.singletonList(new Path("test.plog"))); - when(tracker.getNewFilesForRound(nextRound)) + when(tracker.getNewFiles(nextRoundToProcess, currentTimestampRound)) .thenReturn(Collections.singletonList(new Path("test2.plog"))); TestableReplicationLogDiscoveryReplay discovery = new TestableReplicationLogDiscoveryReplay(tracker, haGroupStoreRecord); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java index 8c7c239c671..67155de6bf1 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java @@ -1362,6 +1362,292 @@ public void testGetOlderInProgressFilesWithMixedFileTypes() throws IOException { resultFilenames.contains(invalidFile3.getName())); } + @Test + public void testGetNewFilesWithStartAndEndRound() throws IOException { + // Initialize tracker + tracker.init(); + + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 1000L; + + // Create start round (60 seconds duration) + long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00 + long startRoundEndTime = startRoundStartTime + roundDurationMs; + ReplicationRound startRound = new ReplicationRound(startRoundStartTime, startRoundEndTime); + + // Create middle round + long middleRoundStartTime = startRoundEndTime; + long middleRoundEndTime = middleRoundStartTime + roundDurationMs; + ReplicationRound middleRound = new ReplicationRound(middleRoundStartTime, middleRoundEndTime); + + // Create end round + long endRoundStartTime = middleRoundEndTime; + long endRoundEndTime = endRoundStartTime + roundDurationMs; + ReplicationRound endRound = new ReplicationRound(endRoundStartTime, endRoundEndTime); + + // Get shard directories for each round + Path startRoundShardDir = shardManager.getShardDirectory(startRound); + Path middleRoundShardDir = shardManager.getShardDirectory(middleRound); + Path endRoundShardDir = shardManager.getShardDirectory(endRound); + + // Create shard directories + localFs.mkdirs(startRoundShardDir); + localFs.mkdirs(middleRoundShardDir); + localFs.mkdirs(endRoundShardDir); + + // Create files in start round + Path startRoundFile1 = new Path(startRoundShardDir, startRoundStartTime + "_rs1.plog"); + Path startRoundFile2 = new Path(startRoundShardDir, startRoundStartTime + 30000 + "_rs2.plog"); + + // Create files in middle round + Path middleRoundFile1 = new Path(middleRoundShardDir, middleRoundStartTime + "_rs3.plog"); + Path middleRoundFile2 = + new Path(middleRoundShardDir, middleRoundStartTime + 30000 + "_rs4.plog"); + + // Create files in end round + Path endRoundFile1 = new Path(endRoundShardDir, endRoundStartTime + "_rs5.plog"); + Path endRoundFile2 = new Path(endRoundShardDir, endRoundStartTime + 30000 + "_rs6.plog"); + + // Create files outside the range (before start round) + ReplicationRound beforeStartRound = shardManager.getPreviousRound(startRound); + Path beforeStartRoundShardDir = shardManager.getShardDirectory(beforeStartRound); + localFs.mkdirs(beforeStartRoundShardDir); + Path beforeStartRoundFile = + new Path(beforeStartRoundShardDir, beforeStartRound.getStartTime() + "_rs0.plog"); + + // Create files outside the range (after end round) + ReplicationRound afterEndRound = shardManager.getNextRound(endRound); + Path afterEndRoundShardDir = shardManager.getShardDirectory(afterEndRound); + localFs.mkdirs(afterEndRoundShardDir); + Path afterEndRoundFile = + new Path(afterEndRoundShardDir, afterEndRound.getStartTime() + "_rs7.plog"); + + // Create all files + localFs.create(startRoundFile1, true).close(); + localFs.create(startRoundFile2, true).close(); + localFs.create(middleRoundFile1, true).close(); + localFs.create(middleRoundFile2, true).close(); + localFs.create(endRoundFile1, true).close(); + localFs.create(endRoundFile2, true).close(); + localFs.create(beforeStartRoundFile, true).close(); + localFs.create(afterEndRoundFile, true).close(); + + // Call getNewFiles with startRound and endRound + List result = tracker.getNewFiles(startRound, endRound); + + // Verify file system operations + // Should call getNewFilesForRound for each round (start, middle, end) + // Each call to getNewFilesForRound calls exists() and listStatus() on shard directories + // Note: init() already called exists() on in-progress directory + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(startRoundShardDir)); + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(middleRoundShardDir)); + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir)); + Mockito.verify(mockFs, times(3)).listStatus(Mockito.any(Path.class)); + + // Prepare expected set of file paths (should include files from start, middle, and end rounds) + Set expectedPaths = new HashSet<>(); + expectedPaths.add(startRoundFile1.toString()); + expectedPaths.add(startRoundFile2.toString()); + expectedPaths.add(middleRoundFile1.toString()); + expectedPaths.add(middleRoundFile2.toString()); + expectedPaths.add(endRoundFile1.toString()); + expectedPaths.add(endRoundFile2.toString()); + + // Create actual set of paths + Set actualPaths = + result.stream().map(path -> path.toUri().getPath()).collect(Collectors.toSet()); + + // Verify all files from start to end rounds are returned + assertEquals("Should return exactly 6 files from start to end rounds", expectedPaths.size(), + actualPaths.size()); + assertEquals("File paths do not match", expectedPaths, actualPaths); + + // Verify files outside the range are not included + assertFalse("Should not contain file from before start round", + actualPaths.contains(beforeStartRoundFile.toString())); + assertFalse("Should not contain file from after end round", + actualPaths.contains(afterEndRoundFile.toString())); + } + + @Test + public void testGetNewFilesWithSameStartAndEndRound() throws IOException { + // Initialize tracker + tracker.init(); + + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 1000L; + + // Create a single round + long roundStartTime = 1704153600000L; // 2024-01-02 00:00:00 + long roundEndTime = roundStartTime + roundDurationMs; + ReplicationRound round = new ReplicationRound(roundStartTime, roundEndTime); + + // Get shard directory for this round + Path roundShardDir = shardManager.getShardDirectory(round); + localFs.mkdirs(roundShardDir); + + // Create files in the round + Path file1 = new Path(roundShardDir, roundStartTime + "_rs1.plog"); + Path file2 = new Path(roundShardDir, roundStartTime + 30000 + "_rs2.plog"); + Path file3 = new Path(roundShardDir, roundStartTime + 50000 + "_rs3.plog"); + + // Create files + localFs.create(file1, true).close(); + localFs.create(file2, true).close(); + localFs.create(file3, true).close(); + + // Call getNewFiles with same start and end round + List result = tracker.getNewFiles(round, round); + + // Verify file system operations + // Should call getNewFilesForRound once for the round + // Note: init() already called exists() on in-progress directory + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(roundShardDir)); + Mockito.verify(mockFs, times(1)).listStatus(Mockito.any(Path.class)); + + // Prepare expected set of file paths + Set expectedPaths = new HashSet<>(); + expectedPaths.add(file1.toString()); + expectedPaths.add(file2.toString()); + expectedPaths.add(file3.toString()); + + // Create actual set of paths + Set actualPaths = + result.stream().map(path -> path.toUri().getPath()).collect(Collectors.toSet()); + + // Verify all files from the round are returned + assertEquals("Should return exactly 3 files from the round", expectedPaths.size(), + actualPaths.size()); + assertEquals("File paths do not match", expectedPaths, actualPaths); + } + + @Test + public void testGetNewFilesWithInvalidRange() throws IOException { + // Initialize tracker + tracker.init(); + + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 1000L; + + // Create end round (earlier time) + long endRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00 + long endRoundEndTime = endRoundStartTime + roundDurationMs; + ReplicationRound endRound = new ReplicationRound(endRoundStartTime, endRoundEndTime); + + // Create start round (later time) - invalid: start > end + long startRoundStartTime = endRoundEndTime; + long startRoundEndTime = startRoundStartTime + roundDurationMs; + ReplicationRound startRound = new ReplicationRound(startRoundStartTime, startRoundEndTime); + + // Get shard directories + Path startRoundShardDir = shardManager.getShardDirectory(startRound); + Path endRoundShardDir = shardManager.getShardDirectory(endRound); + + // Create shard directories + localFs.mkdirs(startRoundShardDir); + localFs.mkdirs(endRoundShardDir); + + // Create files in both rounds + Path startRoundFile = new Path(startRoundShardDir, startRoundStartTime + "_rs1.plog"); + Path endRoundFile = new Path(endRoundShardDir, endRoundStartTime + "_rs2.plog"); + + localFs.create(startRoundFile, true).close(); + localFs.create(endRoundFile, true).close(); + + // Call getNewFiles with invalid range (startRound.getStartTime() > endRound.getStartTime()) + List result = tracker.getNewFiles(startRound, endRound); + + // Verify empty list is returned when startRound.getStartTime() > endRound.getStartTime() + assertTrue("Should return empty list for invalid range", result.isEmpty()); + + // Verify no file system operations were performed on shard directories (early return) + // Note: init() already called exists() and mkdirs() on in-progress directory + Mockito.verify(mockFs, times(0)).exists(Mockito.eq(startRoundShardDir)); + Mockito.verify(mockFs, times(0)).exists(Mockito.eq(endRoundShardDir)); + Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class)); + } + + @Test + public void testGetNewFilesWithEmptyRounds() throws IOException { + // Initialize tracker + tracker.init(); + + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 1000L; + + // Create start round + long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00 + long startRoundEndTime = startRoundStartTime + roundDurationMs; + ReplicationRound startRound = new ReplicationRound(startRoundStartTime, startRoundEndTime); + + // Create end round + long endRoundStartTime = startRoundEndTime; + long endRoundEndTime = endRoundStartTime + roundDurationMs; + ReplicationRound endRound = new ReplicationRound(endRoundStartTime, endRoundEndTime); + + // Get shard directories + Path startRoundShardDir = shardManager.getShardDirectory(startRound); + Path endRoundShardDir = shardManager.getShardDirectory(endRound); + + // Create shard directories but leave them empty + localFs.mkdirs(startRoundShardDir); + localFs.mkdirs(endRoundShardDir); + + // Call getNewFiles with empty rounds + List result = tracker.getNewFiles(startRound, endRound); + + // Verify file system operations + // Should call getNewFilesForRound for each round (start and end) + // Note: init() already called exists() on in-progress directory + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(startRoundShardDir)); + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir)); + Mockito.verify(mockFs, times(2)).listStatus(Mockito.any(Path.class)); + + // Verify empty list is returned + assertTrue("Should return empty list for empty rounds", result.isEmpty()); + } + + @Test + public void testGetNewFilesWithNonExistentRounds() throws IOException { + // Initialize tracker + tracker.init(); + + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + long roundDurationMs = shardManager.getReplicationRoundDurationSeconds() * 1000L; + + // Create start round + long startRoundStartTime = 1704153600000L; // 2024-01-02 00:00:00 + long startRoundEndTime = startRoundStartTime + roundDurationMs; + ReplicationRound startRound = new ReplicationRound(startRoundStartTime, startRoundEndTime); + + // Create end round + long endRoundStartTime = startRoundEndTime; + long endRoundEndTime = endRoundStartTime + roundDurationMs; + ReplicationRound endRound = new ReplicationRound(endRoundStartTime, endRoundEndTime); + + // Get shard directories + Path startRoundShardDir = shardManager.getShardDirectory(startRound); + Path endRoundShardDir = shardManager.getShardDirectory(endRound); + + // Assert that shard directories do not exist + assertFalse("Start round shard directory should not exist", localFs.exists(startRoundShardDir)); + assertFalse("End round shard directory should not exist", localFs.exists(endRoundShardDir)); + + // Call getNewFiles with non-existent rounds + List result = tracker.getNewFiles(startRound, endRound); + + // Verify file system operations + // Should call exists() for each round (start and end) + // Note: init() already called exists() on in-progress directory + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(startRoundShardDir)); + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(endRoundShardDir)); + // listStatus() should not be called when directories don't exist + Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class)); + + // Verify empty list is returned + assertTrue("Should return empty list for non-existent rounds", result.isEmpty()); + } + private int countDirectories(FileSystem fs, Path path) throws IOException { if (!fs.exists(path)) { return 0;