Skip to content

Commit dbe43c8

Browse files
committed
[client] Implement adaptive fetch rate control for LogScanner
1 parent 81a1ec8 commit dbe43c8

4 files changed

Lines changed: 368 additions & 2 deletions

File tree

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner.log;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.annotation.VisibleForTesting;
22+
import org.apache.fluss.metadata.TableBucket;
23+
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
27+
/**
28+
* Controls the fetch rate for individual buckets based on the amount of data returned in recent
29+
* fetches.
30+
*
31+
* <p>For buckets that consistently return data, they are fetched every round. For buckets that
32+
* return no data for consecutive fetches, the fetch frequency is progressively reduced using
33+
* exponential backoff. This is particularly useful for partitioned tables where only the latest
34+
* partitions have active data, avoiding wasted CPU and network resources on empty partitions.
35+
*
36+
* <p>The backoff schedule is: after 1 empty fetch, skip 1 round; after 2 empty fetches, skip 2
37+
* rounds; after 3, skip 4 rounds; and so on (powers of 2), up to {@code maxSkipRounds}. Any fetch
38+
* that returns data immediately resets the backoff to zero.
39+
*
40+
* <p>This class is NOT thread-safe. Callers must ensure proper synchronization.
41+
*/
42+
@Internal
43+
class BucketFetchRateController {
44+
45+
/** Maximum exponent for the exponential backoff (2^5 = 32). */
46+
private static final int MAX_BACKOFF_SHIFT = 5;
47+
48+
private final int maxSkipRounds;
49+
private final Map<TableBucket, BucketFetchState> bucketStates;
50+
51+
BucketFetchRateController(int maxSkipRounds) {
52+
this.maxSkipRounds = maxSkipRounds;
53+
this.bucketStates = new HashMap<>();
54+
}
55+
56+
/**
57+
* Determines whether the given bucket should be included in the current fetch round.
58+
*
59+
* <p>If the bucket is in a cool down period (i.e., it has been returning empty results), this
60+
* method decrements the remaining skip count and returns {@code false}. Otherwise, it returns
61+
* {@code true} indicating the bucket should be fetched.
62+
*
63+
* @param tableBucket the bucket to check
64+
* @return {@code true} if the bucket should be fetched in this round
65+
*/
66+
boolean shouldFetch(TableBucket tableBucket) {
67+
BucketFetchState state = bucketStates.get(tableBucket);
68+
if (state == null) {
69+
return true;
70+
}
71+
if (state.remainingSkipRounds > 0) {
72+
state.remainingSkipRounds--;
73+
return false;
74+
}
75+
return true;
76+
}
77+
78+
/**
79+
* Records the result of a fetch for the given bucket, adjusting future fetch frequency.
80+
*
81+
* <p>If the fetch returned data, the backoff is immediately reset to zero. If the fetch
82+
* returned no data, the consecutive empty count is incremented and a new skip interval is
83+
* calculated using exponential backoff.
84+
*
85+
* @param tableBucket the bucket that was fetched
86+
* @param hasRecords {@code true} if the fetch returned any records
87+
*/
88+
void recordFetchResult(TableBucket tableBucket, boolean hasRecords) {
89+
BucketFetchState state =
90+
bucketStates.computeIfAbsent(tableBucket, k -> new BucketFetchState());
91+
if (hasRecords) {
92+
state.consecutiveEmptyFetches = 0;
93+
state.remainingSkipRounds = 0;
94+
} else {
95+
state.consecutiveEmptyFetches++;
96+
int shift = Math.min(state.consecutiveEmptyFetches - 1, MAX_BACKOFF_SHIFT);
97+
state.remainingSkipRounds = Math.min(1 << shift, maxSkipRounds);
98+
}
99+
}
100+
101+
/** Removes the tracking state for the given bucket. */
102+
void removeBucket(TableBucket tableBucket) {
103+
bucketStates.remove(tableBucket);
104+
}
105+
106+
/** Resets all tracking state. */
107+
void reset() {
108+
bucketStates.clear();
109+
}
110+
111+
/** Returns the current number of remaining skip rounds for the given bucket, for testing. */
112+
@VisibleForTesting
113+
int getRemainingSkipRounds(TableBucket tableBucket) {
114+
BucketFetchState state = bucketStates.get(tableBucket);
115+
return state == null ? 0 : state.remainingSkipRounds;
116+
}
117+
118+
/** Returns the number of consecutive empty fetches for the given bucket, for testing. */
119+
@VisibleForTesting
120+
int getConsecutiveEmptyFetches(TableBucket tableBucket) {
121+
BucketFetchState state = bucketStates.get(tableBucket);
122+
return state == null ? 0 : state.consecutiveEmptyFetches;
123+
}
124+
125+
/** Per-bucket fetch state tracking consecutive empty fetches and cool down. */
126+
private static class BucketFetchState {
127+
int consecutiveEmptyFetches = 0;
128+
int remainingSkipRounds = 0;
129+
}
130+
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public class LogFetcher implements Closeable {
102102
private final LogFetchBuffer logFetchBuffer;
103103
private final LogFetchCollector logFetchCollector;
104104
private final RemoteLogDownloader remoteLogDownloader;
105+
@Nullable private final BucketFetchRateController fetchRateController;
105106

106107
@GuardedBy("this")
107108
private final Set<Integer> nodesWithPendingFetchRequests;
@@ -150,6 +151,15 @@ public LogFetcher(
150151
this.remoteLogDownloader =
151152
new RemoteLogDownloader(tablePath, conf, remoteFileDownloader, scannerMetricGroup);
152153
remoteLogDownloader.start();
154+
if (conf.getBoolean(ConfigOptions.CLIENT_SCANNER_LOG_ADAPTIVE_FETCH_ENABLED)) {
155+
this.fetchRateController =
156+
new BucketFetchRateController(
157+
conf.getInt(
158+
ConfigOptions
159+
.CLIENT_SCANNER_LOG_ADAPTIVE_FETCH_MAX_SKIP_ROUNDS));
160+
} else {
161+
this.fetchRateController = null;
162+
}
153163
}
154164

155165
/**
@@ -383,15 +393,17 @@ private synchronized void handleFetchLogResponse(
383393
+ "unsubscribed.",
384394
tb);
385395
} else {
396+
boolean hasData;
386397
if (fetchResultForBucket.fetchFromRemote()) {
398+
hasData = true;
387399
pendRemoteFetches(
388400
fetchResultForBucket.remoteLogFetchInfo(),
389401
fetchOffset,
390402
fetchResultForBucket.getHighWatermark());
391403
} else {
392404
LogRecords logRecords = fetchResultForBucket.recordsOrEmpty();
393-
boolean hasRecords = !MemoryLogRecords.EMPTY.equals(logRecords);
394-
if (hasRecords) {
405+
hasData = !MemoryLogRecords.EMPTY.equals(logRecords);
406+
if (hasData) {
395407
// Retain the parsed buffer so it stays alive while
396408
// this CompletedFetch's records are being consumed.
397409
if (parsedByteBuf != null) {
@@ -422,6 +434,11 @@ private synchronized void handleFetchLogResponse(
422434
null));
423435
}
424436
}
437+
// Track adaptive fetch rate for successful fetches
438+
if (fetchRateController != null
439+
&& fetchResultForBucket.getErrorCode() == Errors.NONE.code()) {
440+
fetchRateController.recordFetchResult(tb, hasData);
441+
}
425442
}
426443
}
427444
}
@@ -499,6 +516,10 @@ Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
499516
if (tableId == null) {
500517
tableId = tb.getTableId();
501518
}
519+
// Adaptive fetch: skip buckets in cool down period
520+
if (fetchRateController != null && !fetchRateController.shouldFetch(tb)) {
521+
continue;
522+
}
502523
Long offset = logScannerStatus.getBucketOffset(tb);
503524
if (offset == null) {
504525
LOG.debug(
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.table.scanner.log;
19+
20+
import org.apache.fluss.metadata.TableBucket;
21+
22+
import org.junit.jupiter.api.BeforeEach;
23+
import org.junit.jupiter.api.Test;
24+
25+
import static org.assertj.core.api.Assertions.assertThat;
26+
27+
/** Tests for {@link BucketFetchRateController}. */
28+
class BucketFetchRateControllerTest {
29+
30+
private static final int MAX_SKIP_ROUNDS = 32;
31+
private BucketFetchRateController controller;
32+
33+
@BeforeEach
34+
void setup() {
35+
controller = new BucketFetchRateController(MAX_SKIP_ROUNDS);
36+
}
37+
38+
@Test
39+
void testNewBucketShouldAlwaysFetch() {
40+
TableBucket tb = new TableBucket(1L, 0L, 0);
41+
assertThat(controller.shouldFetch(tb)).isTrue();
42+
}
43+
44+
@Test
45+
void testBucketWithDataResetsCoolDown() {
46+
TableBucket tb = new TableBucket(1L, 0L, 0);
47+
48+
// Record multiple empty fetches to build up cool down
49+
controller.recordFetchResult(tb, false);
50+
controller.recordFetchResult(tb, false);
51+
controller.recordFetchResult(tb, false);
52+
assertThat(controller.getConsecutiveEmptyFetches(tb)).isEqualTo(3);
53+
assertThat(controller.getRemainingSkipRounds(tb)).isGreaterThan(0);
54+
55+
// Now record a fetch with data
56+
controller.recordFetchResult(tb, true);
57+
assertThat(controller.getConsecutiveEmptyFetches(tb)).isEqualTo(0);
58+
assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(0);
59+
assertThat(controller.shouldFetch(tb)).isTrue();
60+
}
61+
62+
@Test
63+
void testExponentialBackoff() {
64+
TableBucket tb = new TableBucket(1L, 0L, 0);
65+
66+
// 1st empty fetch: skip 1 round (2^0 = 1)
67+
controller.recordFetchResult(tb, false);
68+
assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(1);
69+
70+
// Consume the skip round
71+
assertThat(controller.shouldFetch(tb)).isFalse();
72+
assertThat(controller.shouldFetch(tb)).isTrue();
73+
74+
// 2nd empty fetch: skip 2 rounds (2^1 = 2)
75+
controller.recordFetchResult(tb, false);
76+
assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(2);
77+
78+
// Consume the 2 skip rounds
79+
assertThat(controller.shouldFetch(tb)).isFalse();
80+
assertThat(controller.shouldFetch(tb)).isFalse();
81+
assertThat(controller.shouldFetch(tb)).isTrue();
82+
83+
// 3rd empty fetch: skip 4 rounds (2^2 = 4)
84+
controller.recordFetchResult(tb, false);
85+
assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(4);
86+
87+
for (int i = 0; i < 4; i++) {
88+
assertThat(controller.shouldFetch(tb)).isFalse();
89+
}
90+
assertThat(controller.shouldFetch(tb)).isTrue();
91+
}
92+
93+
@Test
94+
void testMaxSkipRoundsCap() {
95+
TableBucket tb = new TableBucket(1L, 0L, 0);
96+
97+
// Record many empty fetches to exceed the max backoff shift
98+
for (int i = 0; i < 20; i++) {
99+
controller.recordFetchResult(tb, false);
100+
// Consume all skip rounds
101+
while (!controller.shouldFetch(tb)) {
102+
// draining
103+
}
104+
}
105+
106+
// After 20 empty fetches, skip rounds should be capped at MAX_SKIP_ROUNDS
107+
controller.recordFetchResult(tb, false);
108+
assertThat(controller.getRemainingSkipRounds(tb)).isLessThanOrEqualTo(MAX_SKIP_ROUNDS);
109+
}
110+
111+
@Test
112+
void testMultipleBucketsIndependent() {
113+
TableBucket tb1 = new TableBucket(1L, 0L, 0);
114+
TableBucket tb2 = new TableBucket(1L, 1L, 0);
115+
116+
// tb1 gets empty fetches, tb2 gets data
117+
controller.recordFetchResult(tb1, false);
118+
controller.recordFetchResult(tb1, false);
119+
controller.recordFetchResult(tb2, true);
120+
121+
// tb1 should be in cool down
122+
assertThat(controller.getConsecutiveEmptyFetches(tb1)).isEqualTo(2);
123+
assertThat(controller.getRemainingSkipRounds(tb1)).isGreaterThan(0);
124+
125+
// tb2 should be fetch able immediately
126+
assertThat(controller.getConsecutiveEmptyFetches(tb2)).isEqualTo(0);
127+
assertThat(controller.shouldFetch(tb2)).isTrue();
128+
}
129+
130+
@Test
131+
void testRemoveBucket() {
132+
TableBucket tb = new TableBucket(1L, 0L, 0);
133+
134+
controller.recordFetchResult(tb, false);
135+
assertThat(controller.getConsecutiveEmptyFetches(tb)).isEqualTo(1);
136+
137+
controller.removeBucket(tb);
138+
assertThat(controller.getConsecutiveEmptyFetches(tb)).isEqualTo(0);
139+
assertThat(controller.shouldFetch(tb)).isTrue();
140+
}
141+
142+
@Test
143+
void testReset() {
144+
TableBucket tb1 = new TableBucket(1L, 0L, 0);
145+
TableBucket tb2 = new TableBucket(1L, 1L, 0);
146+
147+
controller.recordFetchResult(tb1, false);
148+
controller.recordFetchResult(tb2, false);
149+
150+
controller.reset();
151+
152+
assertThat(controller.getConsecutiveEmptyFetches(tb1)).isEqualTo(0);
153+
assertThat(controller.getConsecutiveEmptyFetches(tb2)).isEqualTo(0);
154+
assertThat(controller.shouldFetch(tb1)).isTrue();
155+
assertThat(controller.shouldFetch(tb2)).isTrue();
156+
}
157+
158+
@Test
159+
void testShouldFetchDecrementsCounter() {
160+
TableBucket tb = new TableBucket(1L, 0L, 0);
161+
162+
// 1 empty fetch -> 1 skip round
163+
controller.recordFetchResult(tb, false);
164+
assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(1);
165+
166+
// shouldFetch returns false and decrements to 0
167+
assertThat(controller.shouldFetch(tb)).isFalse();
168+
assertThat(controller.getRemainingSkipRounds(tb)).isEqualTo(0);
169+
170+
// Now it should be fetch able
171+
assertThat(controller.shouldFetch(tb)).isTrue();
172+
}
173+
174+
@Test
175+
void testSmallMaxSkipRounds() {
176+
BucketFetchRateController smallController = new BucketFetchRateController(4);
177+
TableBucket tb = new TableBucket(1L, 0L, 0);
178+
179+
// Record many empty fetches
180+
for (int i = 0; i < 10; i++) {
181+
smallController.recordFetchResult(tb, false);
182+
// Drain skip rounds
183+
while (!smallController.shouldFetch(tb)) {
184+
// draining
185+
}
186+
}
187+
188+
// Even after many empty fetches, skip rounds should be capped at 4
189+
smallController.recordFetchResult(tb, false);
190+
assertThat(smallController.getRemainingSkipRounds(tb)).isLessThanOrEqualTo(4);
191+
}
192+
}

0 commit comments

Comments
 (0)