diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogScannerITCase.java index 84b50af2c7..0fb3e2d5ad 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogScannerITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/RemoteLogScannerITCase.java @@ -149,50 +149,52 @@ void testScanFromRemoteAndProject(String format) throws Exception { FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(new TableBucket(tableId, 0)); - // test fetch. + // Build expected projected rows for [a, c]. + List expectedRowsAC = new ArrayList<>(); + for (int i = 0; i < expectedSize; i++) { + String value = i % 2 == 0 ? "hello, friend" + i : null; + expectedRowsAC.add(row(i, value)); + } + + // test fetch with projection [a, c]. + // Records may arrive out of order when spanning multiple remote log segments, + // so use containsExactlyInAnyOrderElementsOf instead of sequential assertions. LogScanner logScanner = createLogScanner(table, new int[] {0, 2}); logScanner.subscribeFromBeginning(0); - int count = 0; - while (count < expectedSize) { + List rowList = new ArrayList<>(); + while (rowList.size() < expectedSize) { ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); for (ScanRecord scanRecord : scanRecords) { assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY); assertThat(scanRecord.getRow().getFieldCount()).isEqualTo(2); - assertThat(scanRecord.getRow().getInt(0)).isEqualTo(count); - if (count % 2 == 0) { - assertThat(scanRecord.getRow().getString(1).toString()) - .isEqualTo("hello, friend" + count); - } else { - // check null values - assertThat(scanRecord.getRow().isNullAt(1)).isTrue(); - } - count++; + InternalRow row = scanRecord.getRow(); + rowList.add(row(row.getInt(0), row.isNullAt(1) ? null : row.getString(1))); } } - assertThat(count).isEqualTo(expectedSize); + assertThat(rowList).containsExactlyInAnyOrderElementsOf(expectedRowsAC); logScanner.close(); - // fetch data with projection reorder. + // Build expected projected rows for [c, a] (reordered). + List expectedRowsCA = new ArrayList<>(); + for (int i = 0; i < expectedSize; i++) { + String value = i % 2 == 0 ? "hello, friend" + i : null; + expectedRowsCA.add(row(value, i)); + } + + // fetch data with projection reorder [c, a]. logScanner = createLogScanner(table, new int[] {2, 0}); logScanner.subscribeFromBeginning(0); - count = 0; - while (count < expectedSize) { + rowList = new ArrayList<>(); + while (rowList.size() < expectedSize) { ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); for (ScanRecord scanRecord : scanRecords) { assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY); assertThat(scanRecord.getRow().getFieldCount()).isEqualTo(2); - assertThat(scanRecord.getRow().getInt(1)).isEqualTo(count); - if (count % 2 == 0) { - assertThat(scanRecord.getRow().getString(0).toString()) - .isEqualTo("hello, friend" + count); - } else { - // check null values - assertThat(scanRecord.getRow().isNullAt(0)).isTrue(); - } - count++; + InternalRow row = scanRecord.getRow(); + rowList.add(row(row.isNullAt(0) ? null : row.getString(0), row.getInt(1))); } } - assertThat(count).isEqualTo(expectedSize); + assertThat(rowList).containsExactlyInAnyOrderElementsOf(expectedRowsCA); logScanner.close(); }