From 8ea882fd5e45da24fa79dbc4ab71f8607d49d8f6 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Thu, 25 Jun 2026 13:22:48 -0700 Subject: [PATCH] Spark: Avoid real-time waits in streaming read timestamp tests TestStructuredStreamingRead3.testReadingStreamFromFutureTimetsamp and testReadingStreamFromTimestampFutureWithExistingSnapshots picked a start timestamp of "now + 10s" / "now + 2s" and then waited for the wall clock to reach it via waitUntilAfter(). That wait is pure idle time on every run (the future test alone averaged ~16s/run in CI). Anchor the stream's start timestamp to the committed snapshot's timestampMillis() + 1 instead. Snapshots committed before that point are still excluded and those committed after are still included, so the semantics are unchanged, but no synthetic future instant has to elapse. waitUntilAfter() also busy-spun on System.currentTimeMillis(), pegging a core for the entire wait and starving other parallel test forks on a busy CI box. Sleep once for the remaining time instead; a past timestamp returns immediately. Applied identically to spark v3.5, v4.0 and v4.1. --- .../org/apache/iceberg/spark/TestBase.java | 16 +++++-- .../source/TestStructuredStreamingRead3.java | 44 +++++++++---------- .../org/apache/iceberg/spark/TestBase.java | 16 +++++-- .../source/TestStructuredStreamingRead3.java | 44 +++++++++---------- .../org/apache/iceberg/spark/TestBase.java | 16 +++++-- .../source/TestStructuredStreamingRead3.java | 44 +++++++++---------- 6 files changed, 102 insertions(+), 78 deletions(-) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java index 5e7e1a1f6193..85775781253e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java @@ -119,11 +119,19 @@ public static void stopMetastoreAndSpark() throws Exception { } protected long waitUntilAfter(long timestampMillis) { - long current = System.currentTimeMillis(); - while (current <= timestampMillis) { - current = System.currentTimeMillis(); + // Sleep once for the remaining time instead of busy-spinning on System.currentTimeMillis(), + // which pegs a core for the whole wait and starves other test forks/threads on a busy CI box. + // A past timestamp returns immediately. + long delta = timestampMillis - System.currentTimeMillis(); + if (delta >= 0) { + try { + Thread.sleep(delta + 1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting until after " + timestampMillis, e); + } } - return current; + return System.currentTimeMillis(); } protected List sql(String query, Object... args) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 4efb883b5d6c..abf7e9b3a2d6 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -603,36 +603,33 @@ public void testReadingStreamFromTimestamp() throws Exception { @TestTemplate public void testReadingStreamFromFutureTimetsamp() throws Exception { - long futureTimestamp = System.currentTimeMillis() + 10000; - - StreamingQuery query = - startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp)); - - List actual = rowsAvailable(query); - assertThat(actual).isEmpty(); - List data = Lists.newArrayList( new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")); - // Perform several inserts that should not show up because the fromTimestamp has not elapsed - IntStream.range(0, 3) - .forEach( - x -> { - appendData(data); - assertThat(rowsAvailable(query)).isEmpty(); - }); + // Snapshots committed before the stream's start timestamp must not show up. + IntStream.range(0, 3).forEach(x -> appendData(data)); + table.refresh(); + + // Anchor the start timestamp just after the last pre-timestamp snapshot, then let the wall + // clock move past it so later snapshots are strictly newer. Keeps the "skip snapshots older + // than fromTimestamp" semantics without waiting out a synthetic future timestamp (was now + + // 10s, busy-spun on by waitUntilAfter). + long fromTimestamp = table.currentSnapshot().timestampMillis() + 1; + waitUntilAfter(fromTimestamp); + + StreamingQuery query = + startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(fromTimestamp)); - waitUntilAfter(futureTimestamp); + assertThat(rowsAvailable(query)).isEmpty(); // Data appended after the timestamp should appear appendData(data); // Allow async background thread to refresh, else test sometimes fails Thread.sleep(50); - actual = rowsAvailable(query); - assertThat(actual).containsExactlyInAnyOrderElementsOf(data); + assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(data); } @TestTemplate @@ -641,17 +638,20 @@ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws E Lists.newArrayList( new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")); appendData(dataBeforeTimestamp); + table.refresh(); - long streamStartTimestamp = System.currentTimeMillis() + 2000; + // Anchor the start timestamp just after the existing snapshot instead of waiting out a + // synthetic future timestamp (was now + 2s); later snapshots are then strictly newer. + long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1; + waitUntilAfter(streamStartTimestamp); - // Start the stream with a future timestamp after the current snapshot + // Start the stream with a timestamp after the current snapshot StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)); List actual = rowsAvailable(query); assertThat(actual).isEmpty(); - // Stream should contain data added after the timestamp elapses - waitUntilAfter(streamStartTimestamp); + // Stream should contain data added after the timestamp List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; appendDataAsMultipleSnapshots(expected); assertThat(rowsAvailable(query)) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestBase.java index 5e7e1a1f6193..85775781253e 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestBase.java @@ -119,11 +119,19 @@ public static void stopMetastoreAndSpark() throws Exception { } protected long waitUntilAfter(long timestampMillis) { - long current = System.currentTimeMillis(); - while (current <= timestampMillis) { - current = System.currentTimeMillis(); + // Sleep once for the remaining time instead of busy-spinning on System.currentTimeMillis(), + // which pegs a core for the whole wait and starves other test forks/threads on a busy CI box. + // A past timestamp returns immediately. + long delta = timestampMillis - System.currentTimeMillis(); + if (delta >= 0) { + try { + Thread.sleep(delta + 1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting until after " + timestampMillis, e); + } } - return current; + return System.currentTimeMillis(); } protected List sql(String query, Object... args) { diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 06189b304299..8b131c02611a 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -610,36 +610,33 @@ public void testReadingStreamFromTimestamp() throws Exception { @TestTemplate public void testReadingStreamFromFutureTimetsamp() throws Exception { - long futureTimestamp = System.currentTimeMillis() + 10000; - - StreamingQuery query = - startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp)); - - List actual = rowsAvailable(query); - assertThat(actual).isEmpty(); - List data = Lists.newArrayList( new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")); - // Perform several inserts that should not show up because the fromTimestamp has not elapsed - IntStream.range(0, 3) - .forEach( - x -> { - appendData(data); - assertThat(rowsAvailable(query)).isEmpty(); - }); + // Snapshots committed before the stream's start timestamp must not show up. + IntStream.range(0, 3).forEach(x -> appendData(data)); + table.refresh(); + + // Anchor the start timestamp just after the last pre-timestamp snapshot, then let the wall + // clock move past it so later snapshots are strictly newer. Keeps the "skip snapshots older + // than fromTimestamp" semantics without waiting out a synthetic future timestamp (was now + + // 10s, busy-spun on by waitUntilAfter). + long fromTimestamp = table.currentSnapshot().timestampMillis() + 1; + waitUntilAfter(fromTimestamp); + + StreamingQuery query = + startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(fromTimestamp)); - waitUntilAfter(futureTimestamp); + assertThat(rowsAvailable(query)).isEmpty(); // Data appended after the timestamp should appear appendData(data); // Allow async background thread to refresh, else test sometimes fails Thread.sleep(50); - actual = rowsAvailable(query); - assertThat(actual).containsExactlyInAnyOrderElementsOf(data); + assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(data); } @TestTemplate @@ -648,17 +645,20 @@ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws E Lists.newArrayList( new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")); appendData(dataBeforeTimestamp); + table.refresh(); - long streamStartTimestamp = System.currentTimeMillis() + 2000; + // Anchor the start timestamp just after the existing snapshot instead of waiting out a + // synthetic future timestamp (was now + 2s); later snapshots are then strictly newer. + long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1; + waitUntilAfter(streamStartTimestamp); - // Start the stream with a future timestamp after the current snapshot + // Start the stream with a timestamp after the current snapshot StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)); List actual = rowsAvailable(query); assertThat(actual).isEmpty(); - // Stream should contain data added after the timestamp elapses - waitUntilAfter(streamStartTimestamp); + // Stream should contain data added after the timestamp List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; appendDataAsMultipleSnapshots(expected); assertThat(rowsAvailable(query)) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java index 507d7b313b42..ae403d38a49f 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java @@ -120,11 +120,19 @@ public static void stopMetastoreAndSpark() throws Exception { } protected long waitUntilAfter(long timestampMillis) { - long current = System.currentTimeMillis(); - while (current <= timestampMillis) { - current = System.currentTimeMillis(); + // Sleep once for the remaining time instead of busy-spinning on System.currentTimeMillis(), + // which pegs a core for the whole wait and starves other test forks/threads on a busy CI box. + // A past timestamp returns immediately. + long delta = timestampMillis - System.currentTimeMillis(); + if (delta >= 0) { + try { + Thread.sleep(delta + 1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting until after " + timestampMillis, e); + } } - return current; + return System.currentTimeMillis(); } protected List sql(String query, Object... args) { diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 89947f73ea38..93ed50dca310 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -611,36 +611,33 @@ public void testReadingStreamFromTimestamp() throws Exception { @TestTemplate public void testReadingStreamFromFutureTimetsamp() throws Exception { - long futureTimestamp = System.currentTimeMillis() + 10000; - - StreamingQuery query = - startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp)); - - List actual = rowsAvailable(query); - assertThat(actual).isEmpty(); - List data = Lists.newArrayList( new SimpleRecord(-2, "minustwo"), new SimpleRecord(-1, "minusone"), new SimpleRecord(0, "zero")); - // Perform several inserts that should not show up because the fromTimestamp has not elapsed - IntStream.range(0, 3) - .forEach( - x -> { - appendData(data); - assertThat(rowsAvailable(query)).isEmpty(); - }); + // Snapshots committed before the stream's start timestamp must not show up. + IntStream.range(0, 3).forEach(x -> appendData(data)); + table.refresh(); + + // Anchor the start timestamp just after the last pre-timestamp snapshot, then let the wall + // clock move past it so later snapshots are strictly newer. Keeps the "skip snapshots older + // than fromTimestamp" semantics without waiting out a synthetic future timestamp (was now + + // 10s, busy-spun on by waitUntilAfter). + long fromTimestamp = table.currentSnapshot().timestampMillis() + 1; + waitUntilAfter(fromTimestamp); + + StreamingQuery query = + startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(fromTimestamp)); - waitUntilAfter(futureTimestamp); + assertThat(rowsAvailable(query)).isEmpty(); // Data appended after the timestamp should appear appendData(data); // Allow async background thread to refresh, else test sometimes fails Thread.sleep(50); - actual = rowsAvailable(query); - assertThat(actual).containsExactlyInAnyOrderElementsOf(data); + assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(data); } @TestTemplate @@ -649,17 +646,20 @@ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws E Lists.newArrayList( new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three")); appendData(dataBeforeTimestamp); + table.refresh(); - long streamStartTimestamp = System.currentTimeMillis() + 2000; + // Anchor the start timestamp just after the existing snapshot instead of waiting out a + // synthetic future timestamp (was now + 2s); later snapshots are then strictly newer. + long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1; + waitUntilAfter(streamStartTimestamp); - // Start the stream with a future timestamp after the current snapshot + // Start the stream with a timestamp after the current snapshot StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)); List actual = rowsAvailable(query); assertThat(actual).isEmpty(); - // Stream should contain data added after the timestamp elapses - waitUntilAfter(streamStartTimestamp); + // Stream should contain data added after the timestamp List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; appendDataAsMultipleSnapshots(expected); assertThat(rowsAvailable(query))