Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,19 @@ public static void stopMetastoreAndSpark() throws Exception {
}

protected long waitUntilAfter(long timestampMillis) {
long current = System.currentTimeMillis();
while (current <= timestampMillis) {
current = System.currentTimeMillis();
// Sleep once for the remaining time instead of busy-spinning on System.currentTimeMillis(),
// which pegs a core for the whole wait and starves other test forks/threads on a busy CI box.
// A past timestamp returns immediately.
long delta = timestampMillis - System.currentTimeMillis();
if (delta >= 0) {
try {
Thread.sleep(delta + 1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting until after " + timestampMillis, e);
}
}
return current;
return System.currentTimeMillis();
}

protected List<Object[]> sql(String query, Object... args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,36 +603,33 @@ public void testReadingStreamFromTimestamp() throws Exception {

@TestTemplate
public void testReadingStreamFromFutureTimetsamp() throws Exception {
long futureTimestamp = System.currentTimeMillis() + 10000;

StreamingQuery query =
startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp));

List<SimpleRecord> actual = rowsAvailable(query);
assertThat(actual).isEmpty();

List<SimpleRecord> data =
Lists.newArrayList(
new SimpleRecord(-2, "minustwo"),
new SimpleRecord(-1, "minusone"),
new SimpleRecord(0, "zero"));

// Perform several inserts that should not show up because the fromTimestamp has not elapsed
IntStream.range(0, 3)
.forEach(
x -> {
appendData(data);
assertThat(rowsAvailable(query)).isEmpty();
});
// Snapshots committed before the stream's start timestamp must not show up.
IntStream.range(0, 3).forEach(x -> appendData(data));
table.refresh();

// Anchor the start timestamp just after the last pre-timestamp snapshot, then let the wall
// clock move past it so later snapshots are strictly newer. Keeps the "skip snapshots older
// than fromTimestamp" semantics without waiting out a synthetic future timestamp (was now +
// 10s, busy-spun on by waitUntilAfter).
long fromTimestamp = table.currentSnapshot().timestampMillis() + 1;
waitUntilAfter(fromTimestamp);

StreamingQuery query =
startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(fromTimestamp));

waitUntilAfter(futureTimestamp);
assertThat(rowsAvailable(query)).isEmpty();

// Data appended after the timestamp should appear
appendData(data);
// Allow async background thread to refresh, else test sometimes fails
Thread.sleep(50);
actual = rowsAvailable(query);
assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(data);
}

@TestTemplate
Expand All @@ -641,17 +638,20 @@ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws E
Lists.newArrayList(
new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three"));
appendData(dataBeforeTimestamp);
table.refresh();

long streamStartTimestamp = System.currentTimeMillis() + 2000;
// Anchor the start timestamp just after the existing snapshot instead of waiting out a
// synthetic future timestamp (was now + 2s); later snapshots are then strictly newer.
long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
waitUntilAfter(streamStartTimestamp);

// Start the stream with a future timestamp after the current snapshot
// Start the stream with a timestamp after the current snapshot
StreamingQuery query =
startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp));
List<SimpleRecord> actual = rowsAvailable(query);
assertThat(actual).isEmpty();

// Stream should contain data added after the timestamp elapses
waitUntilAfter(streamStartTimestamp);
// Stream should contain data added after the timestamp
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);
assertThat(rowsAvailable(query))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,19 @@ public static void stopMetastoreAndSpark() throws Exception {
}

protected long waitUntilAfter(long timestampMillis) {
long current = System.currentTimeMillis();
while (current <= timestampMillis) {
current = System.currentTimeMillis();
// Sleep once for the remaining time instead of busy-spinning on System.currentTimeMillis(),
// which pegs a core for the whole wait and starves other test forks/threads on a busy CI box.
// A past timestamp returns immediately.
long delta = timestampMillis - System.currentTimeMillis();
if (delta >= 0) {
try {
Thread.sleep(delta + 1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting until after " + timestampMillis, e);
}
}
return current;
return System.currentTimeMillis();
}

protected List<Object[]> sql(String query, Object... args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,36 +610,33 @@ public void testReadingStreamFromTimestamp() throws Exception {

@TestTemplate
public void testReadingStreamFromFutureTimetsamp() throws Exception {
long futureTimestamp = System.currentTimeMillis() + 10000;

StreamingQuery query =
startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp));

List<SimpleRecord> actual = rowsAvailable(query);
assertThat(actual).isEmpty();

List<SimpleRecord> data =
Lists.newArrayList(
new SimpleRecord(-2, "minustwo"),
new SimpleRecord(-1, "minusone"),
new SimpleRecord(0, "zero"));

// Perform several inserts that should not show up because the fromTimestamp has not elapsed
IntStream.range(0, 3)
.forEach(
x -> {
appendData(data);
assertThat(rowsAvailable(query)).isEmpty();
});
// Snapshots committed before the stream's start timestamp must not show up.
IntStream.range(0, 3).forEach(x -> appendData(data));
table.refresh();

// Anchor the start timestamp just after the last pre-timestamp snapshot, then let the wall
// clock move past it so later snapshots are strictly newer. Keeps the "skip snapshots older
// than fromTimestamp" semantics without waiting out a synthetic future timestamp (was now +
// 10s, busy-spun on by waitUntilAfter).
long fromTimestamp = table.currentSnapshot().timestampMillis() + 1;
waitUntilAfter(fromTimestamp);

StreamingQuery query =
startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(fromTimestamp));

waitUntilAfter(futureTimestamp);
assertThat(rowsAvailable(query)).isEmpty();

// Data appended after the timestamp should appear
appendData(data);
// Allow async background thread to refresh, else test sometimes fails
Thread.sleep(50);
actual = rowsAvailable(query);
assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(data);
}

@TestTemplate
Expand All @@ -648,17 +645,20 @@ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws E
Lists.newArrayList(
new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three"));
appendData(dataBeforeTimestamp);
table.refresh();

long streamStartTimestamp = System.currentTimeMillis() + 2000;
// Anchor the start timestamp just after the existing snapshot instead of waiting out a
// synthetic future timestamp (was now + 2s); later snapshots are then strictly newer.
long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
waitUntilAfter(streamStartTimestamp);

// Start the stream with a future timestamp after the current snapshot
// Start the stream with a timestamp after the current snapshot
StreamingQuery query =
startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp));
List<SimpleRecord> actual = rowsAvailable(query);
assertThat(actual).isEmpty();

// Stream should contain data added after the timestamp elapses
waitUntilAfter(streamStartTimestamp);
// Stream should contain data added after the timestamp
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);
assertThat(rowsAvailable(query))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,19 @@ public static void stopMetastoreAndSpark() throws Exception {
}

protected long waitUntilAfter(long timestampMillis) {
long current = System.currentTimeMillis();
while (current <= timestampMillis) {
current = System.currentTimeMillis();
// Sleep once for the remaining time instead of busy-spinning on System.currentTimeMillis(),
// which pegs a core for the whole wait and starves other test forks/threads on a busy CI box.
// A past timestamp returns immediately.
long delta = timestampMillis - System.currentTimeMillis();
if (delta >= 0) {
try {
Thread.sleep(delta + 1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting until after " + timestampMillis, e);
}
}
return current;
return System.currentTimeMillis();
}

protected List<Object[]> sql(String query, Object... args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,36 +611,33 @@ public void testReadingStreamFromTimestamp() throws Exception {

@TestTemplate
public void testReadingStreamFromFutureTimetsamp() throws Exception {
long futureTimestamp = System.currentTimeMillis() + 10000;

StreamingQuery query =
startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp));

List<SimpleRecord> actual = rowsAvailable(query);
assertThat(actual).isEmpty();

List<SimpleRecord> data =
Lists.newArrayList(
new SimpleRecord(-2, "minustwo"),
new SimpleRecord(-1, "minusone"),
new SimpleRecord(0, "zero"));

// Perform several inserts that should not show up because the fromTimestamp has not elapsed
IntStream.range(0, 3)
.forEach(
x -> {
appendData(data);
assertThat(rowsAvailable(query)).isEmpty();
});
// Snapshots committed before the stream's start timestamp must not show up.
IntStream.range(0, 3).forEach(x -> appendData(data));
table.refresh();

// Anchor the start timestamp just after the last pre-timestamp snapshot, then let the wall
// clock move past it so later snapshots are strictly newer. Keeps the "skip snapshots older
// than fromTimestamp" semantics without waiting out a synthetic future timestamp (was now +
// 10s, busy-spun on by waitUntilAfter).
long fromTimestamp = table.currentSnapshot().timestampMillis() + 1;
waitUntilAfter(fromTimestamp);

StreamingQuery query =
startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(fromTimestamp));

waitUntilAfter(futureTimestamp);
assertThat(rowsAvailable(query)).isEmpty();

// Data appended after the timestamp should appear
appendData(data);
// Allow async background thread to refresh, else test sometimes fails
Thread.sleep(50);
actual = rowsAvailable(query);
assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(data);
}

@TestTemplate
Expand All @@ -649,17 +646,20 @@ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws E
Lists.newArrayList(
new SimpleRecord(1, "one"), new SimpleRecord(2, "two"), new SimpleRecord(3, "three"));
appendData(dataBeforeTimestamp);
table.refresh();

long streamStartTimestamp = System.currentTimeMillis() + 2000;
// Anchor the start timestamp just after the existing snapshot instead of waiting out a
// synthetic future timestamp (was now + 2s); later snapshots are then strictly newer.
long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
waitUntilAfter(streamStartTimestamp);

// Start the stream with a future timestamp after the current snapshot
// Start the stream with a timestamp after the current snapshot
StreamingQuery query =
startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp));
List<SimpleRecord> actual = rowsAvailable(query);
assertThat(actual).isEmpty();

// Stream should contain data added after the timestamp elapses
waitUntilAfter(streamStartTimestamp);
// Stream should contain data added after the timestamp
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);
assertThat(rowsAvailable(query))
Expand Down