Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,50 +149,52 @@ void testScanFromRemoteAndProject(String format) throws Exception {

FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(new TableBucket(tableId, 0));

// test fetch.
// Build expected projected rows for [a, c].
List<GenericRow> expectedRowsAC = new ArrayList<>();
for (int i = 0; i < expectedSize; i++) {
String value = i % 2 == 0 ? "hello, friend" + i : null;
expectedRowsAC.add(row(i, value));
}

// test fetch with projection [a, c].
// Records may arrive out of order when spanning multiple remote log segments,
// so use containsExactlyInAnyOrderElementsOf instead of sequential assertions.
LogScanner logScanner = createLogScanner(table, new int[] {0, 2});
logScanner.subscribeFromBeginning(0);
int count = 0;
while (count < expectedSize) {
List<GenericRow> rowList = new ArrayList<>();
while (rowList.size() < expectedSize) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
for (ScanRecord scanRecord : scanRecords) {
assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
assertThat(scanRecord.getRow().getFieldCount()).isEqualTo(2);
assertThat(scanRecord.getRow().getInt(0)).isEqualTo(count);
if (count % 2 == 0) {
assertThat(scanRecord.getRow().getString(1).toString())
.isEqualTo("hello, friend" + count);
} else {
// check null values
assertThat(scanRecord.getRow().isNullAt(1)).isTrue();
}
count++;
InternalRow row = scanRecord.getRow();
rowList.add(row(row.getInt(0), row.isNullAt(1) ? null : row.getString(1)));
}
}
assertThat(count).isEqualTo(expectedSize);
assertThat(rowList).containsExactlyInAnyOrderElementsOf(expectedRowsAC);
logScanner.close();

// fetch data with projection reorder.
// Build expected projected rows for [c, a] (reordered).
List<GenericRow> expectedRowsCA = new ArrayList<>();
for (int i = 0; i < expectedSize; i++) {
String value = i % 2 == 0 ? "hello, friend" + i : null;
expectedRowsCA.add(row(value, i));
}

// fetch data with projection reorder [c, a].
logScanner = createLogScanner(table, new int[] {2, 0});
logScanner.subscribeFromBeginning(0);
count = 0;
while (count < expectedSize) {
rowList = new ArrayList<>();
while (rowList.size() < expectedSize) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
for (ScanRecord scanRecord : scanRecords) {
assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY);
assertThat(scanRecord.getRow().getFieldCount()).isEqualTo(2);
assertThat(scanRecord.getRow().getInt(1)).isEqualTo(count);
if (count % 2 == 0) {
assertThat(scanRecord.getRow().getString(0).toString())
.isEqualTo("hello, friend" + count);
} else {
// check null values
assertThat(scanRecord.getRow().isNullAt(0)).isTrue();
}
count++;
InternalRow row = scanRecord.getRow();
rowList.add(row(row.isNullAt(0) ? null : row.getString(0), row.getInt(1)));
}
}
assertThat(count).isEqualTo(expectedSize);
assertThat(rowList).containsExactlyInAnyOrderElementsOf(expectedRowsCA);
logScanner.close();
}

Expand Down
Loading