Skip to content

Commit 74dbb9e

Browse files
committed
Limit subscription in-flight event accumulation
1 parent fcd52d7 commit 74dbb9e

3 files changed

Lines changed: 106 additions & 19 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ public SubscriptionEvent poll(final String consumerId) {
223223
private SubscriptionEvent pollInternal(final String consumerId) {
224224
states.markPollRequest();
225225

226+
if (shouldThrottlePollByInFlightEvents()) {
227+
return null;
228+
}
229+
226230
if (prefetchingQueue.isEmpty()) {
227231
states.markMissingPrefetch();
228232
try {
@@ -254,6 +258,22 @@ private SubscriptionEvent pollInternal(final String consumerId) {
254258
return null;
255259
}
256260

261+
private boolean shouldThrottlePollByInFlightEvents() {
262+
if (!states.shouldThrottlePoll()) {
263+
return false;
264+
}
265+
266+
remapInFlightEventsSnapshot(committedCleaner, pollableNacker);
267+
if (!states.shouldThrottlePoll()) {
268+
return false;
269+
}
270+
271+
LOGGER.debug(
272+
"Subscription: SubscriptionPrefetchingQueue {} throttles poll because too many events are in flight.",
273+
this);
274+
return true;
275+
}
276+
257277
public SubscriptionEvent pollV2(final String consumerId, final PollTimer timer) {
258278
acquireReadLock();
259279
try {
@@ -268,6 +288,10 @@ private SubscriptionEvent pollInternalV2(final String consumerId, final PollTime
268288

269289
// do-while ensures at least one poll
270290
do {
291+
if (shouldThrottlePollByInFlightEvents()) {
292+
return null;
293+
}
294+
271295
SubscriptionEvent event;
272296
try {
273297
if (prefetchingQueue.isEmpty()) {
@@ -976,6 +1000,10 @@ public long getSubscriptionUncommittedEventCount() {
9761000
return inFlightEvents.size();
9771001
}
9781002

1003+
public long getSubscriptionRetainedEventCount() {
1004+
return prefetchingQueue.size() + inFlightEvents.size();
1005+
}
1006+
9791007
public long getCurrentCommitId() {
9801008
return commitIdGenerator.get();
9811009
}
@@ -1023,6 +1051,7 @@ public Map<String, String> coreReportMessage() {
10231051
result.put("size of inputPendingQueue", String.valueOf(inputPendingQueue.size()));
10241052
result.put("size of prefetchingQueue", String.valueOf(prefetchingQueue.size()));
10251053
result.put("size of inFlightEvents", String.valueOf(inFlightEvents.size()));
1054+
result.put("size of retainedEvents", String.valueOf(getSubscriptionRetainedEventCount()));
10261055
result.put("commitIdGenerator", commitIdGenerator.toString());
10271056
result.put("states", states.toString());
10281057
result.put("isCompleted", String.valueOf(isCompleted));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,12 @@ public boolean shouldPrefetch() {
106106
}
107107

108108
// 1.3. local event count
109-
if (hasTooManyPrefetchedLocalEvent()) {
109+
if (hasTooManyRetainedLocalEvent()) {
110110
return false;
111111
}
112112

113113
// 1.4. global event count
114-
if (hasTooManyPrefetchedGlobalEvent()) {
114+
if (hasTooManyRetainedGlobalEvent()) {
115115
return false;
116116
}
117117

@@ -132,24 +132,40 @@ public boolean shouldPrefetch() {
132132
return (System.currentTimeMillis() - lastPollRequestTimestamp) * pollRate() > 1000;
133133
}
134134

135+
public boolean shouldThrottlePoll() {
136+
return hasTooManyInFlightLocalEvent() || hasTooManyInFlightGlobalEvent();
137+
}
138+
135139
private boolean isMemoryEnough() {
136140
return PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
137141
* PREFETCH_MEMORY_THRESHOLD
138142
> PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes();
139143
}
140144

141-
private boolean hasTooManyPrefetchedLocalEvent() {
142-
return prefetchingQueue.getPrefetchedEventCount() > PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD;
145+
private boolean hasTooManyRetainedLocalEvent() {
146+
return prefetchingQueue.getSubscriptionRetainedEventCount()
147+
> PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD;
143148
}
144149

145-
private boolean hasTooManyPrefetchedGlobalEvent() {
146-
// The number of prefetched events in the current prefetching queue > floor(t / number of
150+
private boolean hasTooManyRetainedGlobalEvent() {
151+
// The number of retained events in the current prefetching queue > floor(t / number of
147152
// prefetching queues), where t is an adjustable parameter.
148-
return prefetchingQueue.getPrefetchedEventCount()
153+
return prefetchingQueue.getSubscriptionRetainedEventCount()
149154
* SubscriptionAgent.broker().getPrefetchingQueueCount()
150155
> PREFETCH_EVENT_GLOBAL_COUNT_THRESHOLD;
151156
}
152157

158+
private boolean hasTooManyInFlightLocalEvent() {
159+
return prefetchingQueue.getSubscriptionUncommittedEventCount()
160+
>= PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD;
161+
}
162+
163+
private boolean hasTooManyInFlightGlobalEvent() {
164+
return prefetchingQueue.getSubscriptionUncommittedEventCount()
165+
* SubscriptionAgent.broker().getPrefetchingQueueCount()
166+
>= PREFETCH_EVENT_GLOBAL_COUNT_THRESHOLD;
167+
}
168+
153169
private boolean isMissingRateTooHigh() {
154170
return missingRate() > PREFETCH_MISSING_RATE_THRESHOLD;
155171
}
@@ -169,6 +185,8 @@ public String toString() {
169185
.add("pollRate", pollRate())
170186
.add("missingRate", missingRate())
171187
.add("disorderCause", disorderCauseCounter.getCount())
188+
.add("retainedEventCount", prefetchingQueue.getSubscriptionRetainedEventCount())
189+
.add("inFlightEventCount", prefetchingQueue.getSubscriptionUncommittedEventCount())
172190
.toString();
173191
}
174192
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -586,8 +586,11 @@ public SubscriptionEvent poll(final String consumerId, final RegionProgress regi
586586
if (pendingSeekRequest != null) {
587587
return null;
588588
}
589+
if (shouldThrottlePollByInFlightEvents()) {
590+
return null;
591+
}
589592
final SubscriptionEvent event = pollInternal(consumerId);
590-
if (Objects.nonNull(event) && prefetchingQueue.size() < MAX_PREFETCHING_QUEUE_SIZE) {
593+
if (Objects.nonNull(event) && hasAvailableEventSlot()) {
591594
requestPrefetch();
592595
} else if (Objects.isNull(event) && shouldRecoverPrefetchBindingAfterEmptyPoll()) {
593596
requestPrefetch();
@@ -598,6 +601,25 @@ public SubscriptionEvent poll(final String consumerId, final RegionProgress regi
598601
}
599602
}
600603

604+
private boolean shouldThrottlePollByInFlightEvents() {
605+
if (inFlightEvents.size() < MAX_PREFETCHING_QUEUE_SIZE) {
606+
return false;
607+
}
608+
609+
recycleInFlightEvents();
610+
if (inFlightEvents.size() < MAX_PREFETCHING_QUEUE_SIZE) {
611+
return false;
612+
}
613+
614+
LOGGER.debug(
615+
"ConsensusPrefetchingQueue {}: throttles poll because too many events are in flight", this);
616+
return true;
617+
}
618+
619+
private boolean hasAvailableEventSlot() {
620+
return getSubscriptionRetainedEventCount() < MAX_PREFETCHING_QUEUE_SIZE;
621+
}
622+
601623
private synchronized void initPrefetch(final RegionProgress regionProgress) {
602624
if (prefetchInitialized) {
603625
return; // double-check under synchronization
@@ -1197,7 +1219,7 @@ public PrefetchRoundResult drivePrefetchOnce() {
11971219
applyPendingSubscriptionWalReset(observedSeekGeneration);
11981220
recycleInFlightEvents();
11991221

1200-
if (!isActive || prefetchingQueue.size() >= MAX_PREFETCHING_QUEUE_SIZE) {
1222+
if (!isActive || !hasAvailableEventSlot()) {
12011223
return computeIdleRoundResult();
12021224
}
12031225

@@ -1244,7 +1266,7 @@ public PrefetchRoundResult drivePrefetchOnce() {
12441266
return PrefetchRoundResult.rescheduleNow();
12451267
}
12461268

1247-
if (!lingerBatch.isEmpty() && lingerBatch.firstTabletTimeMs > 0L) {
1269+
if (!lingerBatch.isEmpty() && lingerBatch.firstTabletTimeMs > 0L && hasAvailableEventSlot()) {
12481270
final long lingerElapsedMs = System.currentTimeMillis() - lingerBatch.firstTabletTimeMs;
12491271
if (lingerElapsedMs >= batchMaxDelayMs) {
12501272
if (seekGeneration.get() != observedSeekGeneration) {
@@ -1331,8 +1353,11 @@ private PrefetchRoundResult computeIdleRoundResult() {
13311353
if (isClosed || !prefetchInitialized || !isActive) {
13321354
return PrefetchRoundResult.dormant();
13331355
}
1334-
if (prefetchingQueue.size() >= MAX_PREFETCHING_QUEUE_SIZE) {
1335-
return PrefetchRoundResult.dormant();
1356+
if (!hasAvailableEventSlot()) {
1357+
return inFlightEvents.isEmpty()
1358+
? PrefetchRoundResult.dormant()
1359+
: PrefetchRoundResult.rescheduleAfter(
1360+
SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalMs());
13361361
}
13371362
if (hasImmediatePrefetchableWork()) {
13381363
return PrefetchRoundResult.rescheduleNow();
@@ -1598,7 +1623,7 @@ private boolean pumpFromSubscriptionWAL(
15981623
int entriesRead = 0;
15991624
while (entriesRead < maxWalEntries
16001625
&& subscriptionWALIterator.hasNext()
1601-
&& prefetchingQueue.size() < MAX_PREFETCHING_QUEUE_SIZE) {
1626+
&& hasAvailableEventSlot()) {
16021627
try {
16031628
final IndexedConsensusRequest walEntry = subscriptionWALIterator.next();
16041629
entriesRead++;
@@ -1879,7 +1904,7 @@ private boolean drainBufferedRealtimeWriters(
18791904
drainRealtimeWriters(batchState, maxTablets, maxBatchBytes);
18801905

18811906
final int bufferedAfter = getRealtimeBufferedEntryCount();
1882-
if (bufferedAfter == 0 || prefetchingQueue.size() >= MAX_PREFETCHING_QUEUE_SIZE) {
1907+
if (bufferedAfter == 0 || !hasAvailableEventSlot()) {
18831908
return true;
18841909
}
18851910

@@ -2066,13 +2091,19 @@ private boolean canAcceptCommitContext(
20662091
}
20672092

20682093
public boolean ack(final String consumerId, final SubscriptionCommitContext commitContext) {
2094+
final boolean acked;
20692095
acquireReadLock();
20702096
try {
2071-
return canAcceptCommitContext(commitContext, "ack", false)
2072-
&& ackInternal(consumerId, commitContext);
2097+
acked =
2098+
canAcceptCommitContext(commitContext, "ack", false)
2099+
&& ackInternal(consumerId, commitContext);
20732100
} finally {
20742101
releaseReadLock();
20752102
}
2103+
if (acked && hasAvailableEventSlot()) {
2104+
requestPrefetch();
2105+
}
2106+
return acked;
20762107
}
20772108

20782109
private boolean ackInternal(
@@ -2148,6 +2179,7 @@ public boolean nack(final String consumerId, final SubscriptionCommitContext com
21482179
* in multi-region iteration where only one queue owns the event.
21492180
*/
21502181
public boolean ackSilent(final String consumerId, final SubscriptionCommitContext commitContext) {
2182+
final boolean ackedResult;
21512183
acquireReadLock();
21522184
try {
21532185
if (!canAcceptCommitContext(commitContext, "ack", true)) {
@@ -2190,10 +2222,14 @@ public boolean ackSilent(final String consumerId, final SubscriptionCommitContex
21902222
ev.cleanUp(false);
21912223
return null;
21922224
});
2193-
return acked.get();
2225+
ackedResult = acked.get();
21942226
} finally {
21952227
releaseReadLock();
21962228
}
2229+
if (ackedResult && hasAvailableEventSlot()) {
2230+
requestPrefetch();
2231+
}
2232+
return ackedResult;
21972233
}
21982234

21992235
private WriterId extractCommitWriterId(final SubscriptionCommitContext commitContext) {
@@ -2754,8 +2790,8 @@ private void maybeInjectWatermark() {
27542790
}
27552791
final long intervalMs =
27562792
SubscriptionConfig.getInstance().getSubscriptionConsensusWatermarkIntervalMs();
2757-
if (intervalMs <= 0) {
2758-
return; // Watermark disabled
2793+
if (intervalMs <= 0 || !hasAvailableEventSlot()) {
2794+
return; // Watermark disabled or the queue is full
27592795
}
27602796
final long now = System.currentTimeMillis();
27612797
if (now - lastWatermarkEmitTimeMs >= intervalMs) {
@@ -3111,6 +3147,10 @@ public long getSubscriptionUncommittedEventCount() {
31113147
return inFlightEvents.size();
31123148
}
31133149

3150+
public long getSubscriptionRetainedEventCount() {
3151+
return prefetchingQueue.size() + inFlightEvents.size();
3152+
}
3153+
31143154
/** Exposes the current seek generation for runtime tests and metrics. */
31153155
public long getCurrentSeekGeneration() {
31163156
return seekGeneration.get();

0 commit comments

Comments
 (0)