diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java index 5e7e1a1f6193..85775781253e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java @@ -119,11 +119,19 @@ public static void stopMetastoreAndSpark() throws Exception { } protected long waitUntilAfter(long timestampMillis) { - long current = System.currentTimeMillis(); - while (current <= timestampMillis) { - current = System.currentTimeMillis(); + // Sleep once for the remaining time instead of busy-spinning on System.currentTimeMillis(), + // which pegs a core for the whole wait and starves other test forks/threads on a busy CI box. + // A past timestamp returns immediately. + long delta = timestampMillis - System.currentTimeMillis(); + if (delta >= 0) { + try { + Thread.sleep(delta + 1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting until after " + timestampMillis, e); + } } - return current; + return System.currentTimeMillis(); } protected List sql(String query, Object... args) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 4efb883b5d6c..abf7e9b3a2d6 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -603,36 +603,33 @@ public void testReadingStreamFromTimestamp() throws Exception { @TestTemplate public void testReadingStreamFromFutureTimetsamp() throws Exception { - long futureTimestamp = System.currentTimeMillis() + 10000; - - StreamingQuery query = - startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp)); - - List actual = rowsAvailable(query); - assertThat(actual).isEmpty(); - List data = Lists.newArrayList( new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")); - // Perform several inserts that should not show up because the fromTimestamp has not elapsed - IntStream.range(0, 3) - .forEach( - x -> { - appendData(data); - assertThat(rowsAvailable(query)).isEmpty(); - }); + // Snapshots committed before the stream's start timestamp must not show up. + IntStream.range(0, 3).forEach(x -> appendData(data)); + table.refresh(); + + // Anchor the start timestamp just after the last pre-timestamp snapshot, then let the wall + // clock move past it so later snapshots are strictly newer. Keeps the "skip snapshots older + // than fromTimestamp" semantics without waiting out a synthetic future timestamp (was now + + // 10s, busy-spun on by waitUntilAfter). + long fromTimestamp = table.currentSnapshot().timestampMillis() + 1; + waitUntilAfter(fromTimestamp); + + StreamingQuery query = + startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(fromTimestamp)); - waitUntilAfter(futureTimestamp); + assertThat(rowsAvailable(query)).isEmpty(); // Data appended after the timestamp should appear appendData(data); // Allow async background thread to refresh, else test sometimes fails Thread.sleep(50); - actual = rowsAvailable(query); - assertThat(actual).containsExactlyInAnyOrderElementsOf(data); + assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(data); } @TestTemplate @@ -641,17 +638,20 @@ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws E Lists.newArrayList( new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")); appendData(dataBeforeTimestamp); + table.refresh(); - long streamStartTimestamp = System.currentTimeMillis() + 2000; + // Anchor the start timestamp just after the existing snapshot instead of waiting out a + // synthetic future timestamp (was now + 2s); later snapshots are then strictly newer. + long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1; + waitUntilAfter(streamStartTimestamp); - // Start the stream with a future timestamp after the current snapshot + // Start the stream with a timestamp after the current snapshot StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)); List actual = rowsAvailable(query); assertThat(actual).isEmpty(); - // Stream should contain data added after the timestamp elapses - waitUntilAfter(streamStartTimestamp); + // Stream should contain data added after the timestamp List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; appendDataAsMultipleSnapshots(expected); assertThat(rowsAvailable(query)) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestBase.java index 5e7e1a1f6193..85775781253e 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestBase.java @@ -119,11 +119,19 @@ public static void stopMetastoreAndSpark() throws Exception { } protected long waitUntilAfter(long timestampMillis) { - long current = System.currentTimeMillis(); - while (current <= timestampMillis) { - current = System.currentTimeMillis(); + // Sleep once for the remaining time instead of busy-spinning on System.currentTimeMillis(), + // which pegs a core for the whole wait and starves other test forks/threads on a busy CI box. + // A past timestamp returns immediately. + long delta = timestampMillis - System.currentTimeMillis(); + if (delta >= 0) { + try { + Thread.sleep(delta + 1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting until after " + timestampMillis, e); + } } - return current; + return System.currentTimeMillis(); } protected List sql(String query, Object... args) { diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 06189b304299..8b131c02611a 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -610,36 +610,33 @@ public void testReadingStreamFromTimestamp() throws Exception { @TestTemplate public void testReadingStreamFromFutureTimetsamp() throws Exception { - long futureTimestamp = System.currentTimeMillis() + 10000; - - StreamingQuery query = - startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp)); - - List actual = rowsAvailable(query); - assertThat(actual).isEmpty(); - List data = Lists.newArrayList( new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")); - // Perform several inserts that should not show up because the fromTimestamp has not elapsed - IntStream.range(0, 3) - .forEach( - x -> { - appendData(data); - assertThat(rowsAvailable(query)).isEmpty(); - }); + // Snapshots committed before the stream's start timestamp must not show up. + IntStream.range(0, 3).forEach(x -> appendData(data)); + table.refresh(); + + // Anchor the start timestamp just after the last pre-timestamp snapshot, then let the wall + // clock move past it so later snapshots are strictly newer. Keeps the "skip snapshots older + // than fromTimestamp" semantics without waiting out a synthetic future timestamp (was now + + // 10s, busy-spun on by waitUntilAfter). + long fromTimestamp = table.currentSnapshot().timestampMillis() + 1; + waitUntilAfter(fromTimestamp); + + StreamingQuery query = + startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(fromTimestamp)); - waitUntilAfter(futureTimestamp); + assertThat(rowsAvailable(query)).isEmpty(); // Data appended after the timestamp should appear appendData(data); // Allow async background thread to refresh, else test sometimes fails Thread.sleep(50); - actual = rowsAvailable(query); - assertThat(actual).containsExactlyInAnyOrderElementsOf(data); + assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(data); } @TestTemplate @@ -648,17 +645,20 @@ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws E Lists.newArrayList( new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")); appendData(dataBeforeTimestamp); + table.refresh(); - long streamStartTimestamp = System.currentTimeMillis() + 2000; + // Anchor the start timestamp just after the existing snapshot instead of waiting out a + // synthetic future timestamp (was now + 2s); later snapshots are then strictly newer. + long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1; + waitUntilAfter(streamStartTimestamp); - // Start the stream with a future timestamp after the current snapshot + // Start the stream with a timestamp after the current snapshot StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)); List actual = rowsAvailable(query); assertThat(actual).isEmpty(); - // Stream should contain data added after the timestamp elapses - waitUntilAfter(streamStartTimestamp); + // Stream should contain data added after the timestamp List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; appendDataAsMultipleSnapshots(expected); assertThat(rowsAvailable(query)) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java index 507d7b313b42..ae403d38a49f 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java @@ -120,11 +120,19 @@ public static void stopMetastoreAndSpark() throws Exception { } protected long waitUntilAfter(long timestampMillis) { - long current = System.currentTimeMillis(); - while (current <= timestampMillis) { - current = System.currentTimeMillis(); + // Sleep once for the remaining time instead of busy-spinning on System.currentTimeMillis(), + // which pegs a core for the whole wait and starves other test forks/threads on a busy CI box. + // A past timestamp returns immediately. + long delta = timestampMillis - System.currentTimeMillis(); + if (delta >= 0) { + try { + Thread.sleep(delta + 1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting until after " + timestampMillis, e); + } } - return current; + return System.currentTimeMillis(); } protected List sql(String query, Object... args) { diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 89947f73ea38..93ed50dca310 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -611,36 +611,33 @@ public void testReadingStreamFromTimestamp() throws Exception { @TestTemplate public void testReadingStreamFromFutureTimetsamp() throws Exception { - long futureTimestamp = System.currentTimeMillis() + 10000; - - StreamingQuery query = - startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp)); - - List actual = rowsAvailable(query); - assertThat(actual).isEmpty(); - List data = Lists.newArrayList( new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")); - // Perform several inserts that should not show up because the fromTimestamp has not elapsed - IntStream.range(0, 3) - .forEach( - x -> { - appendData(data); - assertThat(rowsAvailable(query)).isEmpty(); - }); + // Snapshots committed before the stream's start timestamp must not show up. + IntStream.range(0, 3).forEach(x -> appendData(data)); + table.refresh(); + + // Anchor the start timestamp just after the last pre-timestamp snapshot, then let the wall + // clock move past it so later snapshots are strictly newer. Keeps the "skip snapshots older + // than fromTimestamp" semantics without waiting out a synthetic future timestamp (was now + + // 10s, busy-spun on by waitUntilAfter). + long fromTimestamp = table.currentSnapshot().timestampMillis() + 1; + waitUntilAfter(fromTimestamp); + + StreamingQuery query = + startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(fromTimestamp)); - waitUntilAfter(futureTimestamp); + assertThat(rowsAvailable(query)).isEmpty(); // Data appended after the timestamp should appear appendData(data); // Allow async background thread to refresh, else test sometimes fails Thread.sleep(50); - actual = rowsAvailable(query); - assertThat(actual).containsExactlyInAnyOrderElementsOf(data); + assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(data); } @TestTemplate @@ -649,17 +646,20 @@ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws E Lists.newArrayList( new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")); appendData(dataBeforeTimestamp); + table.refresh(); - long streamStartTimestamp = System.currentTimeMillis() + 2000; + // Anchor the start timestamp just after the existing snapshot instead of waiting out a + // synthetic future timestamp (was now + 2s); later snapshots are then strictly newer. + long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1; + waitUntilAfter(streamStartTimestamp); - // Start the stream with a future timestamp after the current snapshot + // Start the stream with a timestamp after the current snapshot StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)); List actual = rowsAvailable(query); assertThat(actual).isEmpty(); - // Stream should contain data added after the timestamp elapses - waitUntilAfter(streamStartTimestamp); + // Stream should contain data added after the timestamp List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; appendDataAsMultipleSnapshots(expected); assertThat(rowsAvailable(query))