Skip to content

Commit a7f85e5

Browse files
authored
Refactor BatchMutationCall to optimize task batching and execution flow (#230)
1 parent 6111255 commit a7f85e5

3 files changed

Lines changed: 163 additions & 34 deletions

File tree

base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
public abstract class BatchMutationCall<ReqT, RespT> implements IBatchCall<ReqT, RespT, MutationCallBatcherKey> {
4343
protected final MutationCallBatcherKey batcherKey;
4444
private final IMutationPipeline storePipeline;
45-
private Deque<MutationCallTaskBatch<ReqT, RespT>> batchCallTasks = new ArrayDeque<>();
45+
private Deque<ICallTask<ReqT, RespT, MutationCallBatcherKey>> pendingCallTasks = new ArrayDeque<>();
4646

4747
protected BatchMutationCall(IMutationPipeline storePipeline, MutationCallBatcherKey batcherKey) {
4848
this.batcherKey = batcherKey;
@@ -51,20 +51,7 @@ protected BatchMutationCall(IMutationPipeline storePipeline, MutationCallBatcher
5151

5252
@Override
5353
public final void add(ICallTask<ReqT, RespT, MutationCallBatcherKey> callTask) {
54-
MutationCallTaskBatch<ReqT, RespT> lastBatchCallTask;
55-
MutationCallBatcherKey batcherKey = callTask.batcherKey();
56-
assert callTask.batcherKey().id.equals(batcherKey.id);
57-
if ((lastBatchCallTask = batchCallTasks.peekLast()) != null) {
58-
if (!lastBatchCallTask.isBatchable(callTask)) {
59-
lastBatchCallTask = newBatch(batcherKey.ver);
60-
batchCallTasks.add(lastBatchCallTask);
61-
}
62-
lastBatchCallTask.add(callTask);
63-
} else {
64-
lastBatchCallTask = newBatch(batcherKey.ver);
65-
lastBatchCallTask.add(callTask);
66-
batchCallTasks.add(lastBatchCallTask);
67-
}
54+
pendingCallTasks.add(callTask);
6855
}
6956

7057
protected MutationCallTaskBatch<ReqT, RespT> newBatch(long ver) {
@@ -81,25 +68,56 @@ protected abstract void handleOutput(Queue<ICallTask<ReqT, RespT, MutationCallBa
8168
@Override
8269
public void reset(boolean abort) {
8370
if (abort) {
84-
batchCallTasks = new ArrayDeque<>();
71+
pendingCallTasks = new ArrayDeque<>();
8572
}
8673
}
8774

8875
@Override
8976
public CompletableFuture<Void> execute() {
90-
return execute(batchCallTasks);
77+
return executeBatches();
9178
}
9279

93-
private CompletableFuture<Void> execute(Deque<MutationCallTaskBatch<ReqT, RespT>> batchCallTasks) {
80+
private CompletableFuture<Void> executeBatches() {
9481
CompletableFuture<Void> chained = CompletableFuture.completedFuture(null);
9582
MutationCallTaskBatch<ReqT, RespT> batchCallTask;
96-
while ((batchCallTask = batchCallTasks.poll()) != null) {
83+
while ((batchCallTask = buildNextBatch()) != null) {
9784
MutationCallTaskBatch<ReqT, RespT> current = batchCallTask;
9885
chained = chained.thenCompose(v -> fireSingleBatch(current));
9986
}
10087
return chained;
10188
}
10289

90+
private MutationCallTaskBatch<ReqT, RespT> buildNextBatch() {
91+
if (pendingCallTasks.isEmpty()) {
92+
return null;
93+
}
94+
MutationCallTaskBatch<ReqT, RespT> batchCallTask = null;
95+
long batchVer = -1;
96+
int size = pendingCallTasks.size();
97+
for (int i = 0; i < size; i++) {
98+
ICallTask<ReqT, RespT, MutationCallBatcherKey> task = pendingCallTasks.pollFirst();
99+
if (task == null) {
100+
break;
101+
}
102+
if (batchCallTask == null) {
103+
batchVer = task.batcherKey().ver;
104+
batchCallTask = newBatch(batchVer);
105+
batchCallTask.add(task);
106+
continue;
107+
}
108+
if (task.batcherKey().ver != batchVer) {
109+
pendingCallTasks.addLast(task);
110+
continue;
111+
}
112+
if (batchCallTask.isBatchable(task)) {
113+
batchCallTask.add(task);
114+
} else {
115+
pendingCallTasks.addLast(task);
116+
}
117+
}
118+
return batchCallTask;
119+
}
120+
103121
private CompletableFuture<Void> fireSingleBatch(MutationCallTaskBatch<ReqT, RespT> batchCallTask) {
104122
RWCoProcInput input = makeBatch(batchCallTask.batchedTasks);
105123
long reqId = System.nanoTime();

base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java

Lines changed: 98 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.protobuf.ByteString;
3131
import java.time.Duration;
3232
import java.util.ArrayList;
33+
import java.util.Collections;
3334
import java.util.List;
3435
import java.util.TreeMap;
3536
import java.util.concurrent.CompletableFuture;
@@ -77,9 +78,11 @@ public void teardown() {
7778

7879
@Test
7980
public void addToSameBatch() {
80-
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{
81-
put(FULL_BOUNDARY, setting(id, "V1", 0));
82-
}});
81+
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {
82+
{
83+
put(FULL_BOUNDARY, setting(id, "V1", 0));
84+
}
85+
});
8386

8487
when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1);
8588
when(mutationPipeline1.execute(any()))
@@ -103,7 +106,8 @@ public void addToSameBatch() {
103106
String[] keys = request.getRwCoProc().getRaw().toStringUtf8().split("_");
104107
assertEquals(keys.length, Sets.newSet(keys).size());
105108
}
106-
// the resp order preserved
109+
Collections.sort(reqList);
110+
Collections.sort(respList);
107111
assertEquals(reqList, respList);
108112
}
109113

@@ -124,19 +128,24 @@ public void addToDifferentBatch() {
124128
int req = ThreadLocalRandom.current().nextInt(1, 1001);
125129
reqList.add(req);
126130
if (req < 500) {
127-
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{
128-
put(FULL_BOUNDARY, setting(id, "V1", 0));
129-
}});
131+
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {
132+
{
133+
put(FULL_BOUNDARY, setting(id, "V1", 0));
134+
}
135+
});
130136
} else {
131-
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{
132-
put(FULL_BOUNDARY, setting(id, "V2", 0));
133-
}});
137+
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {
138+
{
139+
put(FULL_BOUNDARY, setting(id, "V2", 0));
140+
}
141+
});
134142
}
135143
futures.add(scheduler.schedule(ByteString.copyFromUtf8(Integer.toString(req)))
136144
.thenAccept((v) -> respList.add(Integer.parseInt(v.toStringUtf8()))));
137145
}
138146
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
139-
// the resp order preserved
147+
Collections.sort(reqList);
148+
Collections.sort(respList);
140149
assertEquals(reqList, respList);
141150
}
142151

@@ -166,4 +175,82 @@ public void executeManySmallBatchesNoRecursion() {
166175
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
167176
assertEquals(execCount.get(), n);
168177
}
178+
179+
@Test
180+
public void reScanWhenHitNonBatchable() {
181+
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {
182+
{
183+
put(FULL_BOUNDARY, setting(id, "V1", 0));
184+
}
185+
});
186+
when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1);
187+
when(mutationPipeline1.execute(any()))
188+
.thenReturn(CompletableFuture.supplyAsync(() -> KVRangeRWReply.newBuilder().build()));
189+
190+
MutationCallScheduler<ByteString, ByteString, TestBatchMutationCall> scheduler =
191+
new MutationCallScheduler<>(NonBatchableBatchCall::new, Duration.ofMillis(1000).toNanos(), storeClient) {
192+
@Override
193+
protected ByteString rangeKey(ByteString call) {
194+
return call;
195+
}
196+
};
197+
List<CompletableFuture<Void>> futures = new ArrayList<>();
198+
List<ByteString> reqs = List.of(
199+
ByteString.copyFromUtf8("k1"),
200+
ByteString.copyFromUtf8("k_dup"), // will mark non-batchable in first batch
201+
ByteString.copyFromUtf8("k2"));
202+
List<ByteString> resps = new CopyOnWriteArrayList<>();
203+
reqs.forEach(req -> futures.add(scheduler.schedule(req).thenAccept(resps::add)));
204+
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
205+
List<String> reqSorted = reqs.stream().map(ByteString::toStringUtf8).sorted().toList();
206+
List<String> respSorted = resps.stream().map(ByteString::toStringUtf8).sorted().toList();
207+
assertEquals(reqSorted, respSorted);
208+
}
209+
210+
@Test
211+
public void mixDifferentVersions() {
212+
when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1);
213+
when(storeClient.createMutationPipeline("V2")).thenReturn(mutationPipeline2);
214+
when(mutationPipeline1.execute(any()))
215+
.thenReturn(CompletableFuture.supplyAsync(() -> KVRangeRWReply.newBuilder().build()));
216+
when(mutationPipeline2.execute(any()))
217+
.thenReturn(CompletableFuture.supplyAsync(() -> KVRangeRWReply.newBuilder().build()));
218+
TestMutationCallScheduler scheduler = new TestMutationCallScheduler(storeClient, Duration.ofMillis(1000));
219+
List<CompletableFuture<Void>> futures = new ArrayList<>();
220+
List<ByteString> reqs = new ArrayList<>();
221+
List<ByteString> resps = new CopyOnWriteArrayList<>();
222+
for (int i = 0; i < 20; i++) {
223+
ByteString req = ByteString.copyFromUtf8("k" + i);
224+
reqs.add(req);
225+
if (i % 2 == 0) {
226+
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {
227+
{
228+
put(FULL_BOUNDARY, setting(id, "V1", 0));
229+
}
230+
});
231+
} else {
232+
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {
233+
{
234+
put(FULL_BOUNDARY, setting(id, "V2", 1));
235+
}
236+
});
237+
}
238+
futures.add(scheduler.schedule(req).thenAccept(resps::add));
239+
}
240+
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
241+
List<String> reqSorted = reqs.stream().map(ByteString::toStringUtf8).sorted().toList();
242+
List<String> respSorted = resps.stream().map(ByteString::toStringUtf8).sorted().toList();
243+
assertEquals(reqSorted, respSorted);
244+
}
245+
246+
private static class NonBatchableBatchCall extends TestBatchMutationCall {
247+
protected NonBatchableBatchCall(IMutationPipeline pipeline, MutationCallBatcherKey batcherKey) {
248+
super(pipeline, batcherKey);
249+
}
250+
251+
@Override
252+
protected NonBatchableFirstBatch newBatch(long ver) {
253+
return new NonBatchableFirstBatch(ver);
254+
}
255+
}
169256
}

base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@
1919

2020
package org.apache.bifromq.basekv.client.scheduler;
2121

22-
import org.apache.bifromq.basekv.client.IMutationPipeline;
23-
import org.apache.bifromq.basekv.store.proto.RWCoProcInput;
24-
import org.apache.bifromq.basekv.store.proto.RWCoProcOutput;
25-
import org.apache.bifromq.basescheduler.ICallTask;
2622
import com.google.common.collect.Iterables;
2723
import com.google.protobuf.ByteString;
2824
import java.util.HashSet;
2925
import java.util.Iterator;
3026
import java.util.Queue;
3127
import java.util.Set;
28+
import org.apache.bifromq.basekv.client.IMutationPipeline;
29+
import org.apache.bifromq.basekv.store.proto.RWCoProcInput;
30+
import org.apache.bifromq.basekv.store.proto.RWCoProcOutput;
31+
import org.apache.bifromq.basescheduler.ICallTask;
3232

3333
public class TestBatchMutationCall extends BatchMutationCall<ByteString, ByteString> {
3434
protected TestBatchMutationCall(IMutationPipeline pipeline, MutationCallBatcherKey batcherKey) {
@@ -86,4 +86,28 @@ protected boolean isBatchable(ICallTask<ByteString, ByteString, MutationCallBatc
8686
return !keys.contains(callTask.call());
8787
}
8888
}
89+
90+
static class NonBatchableFirstBatch extends MutationCallTaskBatch<ByteString, ByteString> {
91+
private final Set<ByteString> keys = new HashSet<>();
92+
private boolean sawNonBatchable;
93+
94+
protected NonBatchableFirstBatch(long ver) {
95+
super(ver);
96+
}
97+
98+
@Override
99+
protected void add(ICallTask<ByteString, ByteString, MutationCallBatcherKey> callTask) {
100+
super.add(callTask);
101+
keys.add(callTask.call());
102+
}
103+
104+
@Override
105+
protected boolean isBatchable(ICallTask<ByteString, ByteString, MutationCallBatcherKey> callTask) {
106+
if (!sawNonBatchable && callTask.call().toStringUtf8().contains("dup")) {
107+
sawNonBatchable = true;
108+
return false;
109+
}
110+
return !keys.contains(callTask.call());
111+
}
112+
}
89113
}

0 commit comments

Comments
 (0)