From 23e376f3dfcaa699edf52255b9de2d54c31ca8bf Mon Sep 17 00:00:00 2001
From: Caideyipi <87789683+Caideyipi@users.noreply.github.com>
Date: Tue, 30 Jun 2026 15:34:46 +0800
Subject: [PATCH] Throttle empty subscription poll logs (#18063)
* Throttle empty subscription poll logs
* Use i18n messages for empty poll logs
# Conflicts:
# iotdb-client/subscription/src/main/i18n/en/org/apache/iotdb/rpc/subscription/i18n/SubscriptionMessages.java
# iotdb-client/subscription/src/main/i18n/zh/org/apache/iotdb/rpc/subscription/i18n/SubscriptionMessages.java
# iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottler.java
# iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java
---
iotdb-client/subscription/pom.xml | 5 ++
.../consumer/EmptyPollLogThrottler.java | 62 ++++++++++++++++++
.../consumer/SubscriptionPullConsumer.java | 22 +++++--
.../consumer/SubscriptionPushConsumer.java | 21 +++++--
.../consumer/EmptyPollLogThrottlerTest.java | 63 +++++++++++++++++++
5 files changed, 163 insertions(+), 10 deletions(-)
create mode 100644 iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottler.java
create mode 100644 iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottlerTest.java
diff --git a/iotdb-client/subscription/pom.xml b/iotdb-client/subscription/pom.xml
index 5c0c30e55bbe8..675e2678fa70d 100644
--- a/iotdb-client/subscription/pom.xml
+++ b/iotdb-client/subscription/pom.xml
@@ -77,5 +77,10 @@
org.apache.thrift
libthrift
+
+ junit
+ junit
+ test
+
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottler.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottler.java
new file mode 100644
index 0000000000000..296eedcd0104e
--- /dev/null
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.consumer;
+
+import java.util.OptionalLong;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
+
+final class EmptyPollLogThrottler {
+
+ private static final long DEFAULT_LOG_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(1);
+
+ private final long logIntervalNanos;
+ private final LongSupplier ticker;
+
+ private long consecutiveEmptyPollCount;
+ private long lastLogTimeNanos;
+ private boolean hasLoggedEmptyPoll;
+
+ EmptyPollLogThrottler() {
+ this(DEFAULT_LOG_INTERVAL_NANOS, System::nanoTime);
+ }
+
+ EmptyPollLogThrottler(final long logIntervalNanos, final LongSupplier ticker) {
+ this.logIntervalNanos = Math.max(logIntervalNanos, 1);
+ this.ticker = ticker;
+ }
+
+ synchronized OptionalLong markEmptyPollAndMaybeGetCount() {
+ consecutiveEmptyPollCount++;
+ final long currentTimeNanos = ticker.getAsLong();
+ if (!hasLoggedEmptyPoll || currentTimeNanos - lastLogTimeNanos >= logIntervalNanos) {
+ hasLoggedEmptyPoll = true;
+ lastLogTimeNanos = currentTimeNanos;
+ return OptionalLong.of(consecutiveEmptyPollCount);
+ }
+ return OptionalLong.empty();
+ }
+
+ synchronized void reset() {
+ consecutiveEmptyPollCount = 0;
+ lastLogTimeNanos = 0;
+ hasLoggedEmptyPoll = false;
+ }
+}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index 227843811b15b..80eaf34b868a0 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
@@ -65,6 +66,8 @@ public class SubscriptionPullConsumer extends SubscriptionConsumer {
private SortedMap> uncommittedCommitContexts;
+ private final EmptyPollLogThrottler emptyPollLogThrottler = new EmptyPollLogThrottler();
+
private final AtomicBoolean isClosed = new AtomicBoolean(true);
@Override
@@ -115,6 +118,7 @@ public synchronized void open() throws SubscriptionException {
// set isClosed to false before submitting workers
isClosed.set(false);
+ emptyPollLogThrottler.reset();
// submit auto poll worker if enabling auto commit
if (autoCommit) {
@@ -181,14 +185,22 @@ public List poll(final Set topicNames, final long t
final List messages = multiplePoll(parsedTopicNames, timeoutMs);
if (messages.isEmpty()) {
- LOGGER.info(
- "SubscriptionPullConsumer {} poll empty message from topics {} after {} millisecond(s)",
- this,
- CollectionUtils.getLimitedString(parsedTopicNames, 32),
- timeoutMs);
+ final OptionalLong consecutiveEmptyPollCount =
+ emptyPollLogThrottler.markEmptyPollAndMaybeGetCount();
+ if (consecutiveEmptyPollCount.isPresent()) {
+ LOGGER.info(
+ "SubscriptionPullConsumer {} poll empty message from topics {} after {} millisecond(s), "
+ + "consecutive empty polls: {}",
+ this,
+ CollectionUtils.getLimitedString(parsedTopicNames, 32),
+ timeoutMs,
+ consecutiveEmptyPollCount.getAsLong());
+ }
return messages;
}
+ emptyPollLogThrottler.reset();
+
// add to uncommitted messages
if (autoCommit) {
final long currentTimestamp = System.currentTimeMillis();
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index b70633bef35a8..fd28e81660960 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.OptionalLong;
import java.util.Properties;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,6 +57,8 @@ public class SubscriptionPushConsumer extends SubscriptionConsumer {
private final long autoPollIntervalMs;
private final long autoPollTimeoutMs;
+ private final EmptyPollLogThrottler emptyPollLogThrottler = new EmptyPollLogThrottler();
+
private final AtomicBoolean isClosed = new AtomicBoolean(true);
protected SubscriptionPushConsumer(final Builder builder) {
@@ -121,6 +124,7 @@ public synchronized void open() throws SubscriptionException {
// set isClosed to false before submitting workers
isClosed.set(false);
+ emptyPollLogThrottler.reset();
// submit auto poll worker
submitAutoPollWorker();
@@ -176,14 +180,21 @@ public void run() {
final List messages =
multiplePoll(subscribedTopics.keySet(), autoPollTimeoutMs);
if (messages.isEmpty()) {
- LOGGER.info(
- "SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s)",
- this,
- CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32),
- autoPollTimeoutMs);
+ final OptionalLong consecutiveEmptyPollCount =
+ emptyPollLogThrottler.markEmptyPollAndMaybeGetCount();
+ if (consecutiveEmptyPollCount.isPresent()) {
+ LOGGER.info(
+ "SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s), "
+ + "consecutive empty polls: {}",
+ SubscriptionPushConsumer.this,
+ CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32),
+ autoPollTimeoutMs,
+ consecutiveEmptyPollCount.getAsLong());
+ }
return;
}
+ emptyPollLogThrottler.reset();
if (ackStrategy.equals(AckStrategy.BEFORE_CONSUME)) {
ack(messages);
}
diff --git a/iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottlerTest.java b/iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottlerTest.java
new file mode 100644
index 0000000000000..a86aa1a8a7981
--- /dev/null
+++ b/iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottlerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.consumer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.OptionalLong;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class EmptyPollLogThrottlerTest {
+
+ @Test
+ public void testThrottleConsecutiveEmptyPollLogs() {
+ final AtomicLong ticker = new AtomicLong();
+ final EmptyPollLogThrottler throttler = new EmptyPollLogThrottler(100, ticker::get);
+
+ OptionalLong logCount = throttler.markEmptyPollAndMaybeGetCount();
+ Assert.assertTrue(logCount.isPresent());
+ Assert.assertEquals(1, logCount.getAsLong());
+
+ ticker.addAndGet(99);
+ logCount = throttler.markEmptyPollAndMaybeGetCount();
+ Assert.assertFalse(logCount.isPresent());
+
+ ticker.incrementAndGet();
+ logCount = throttler.markEmptyPollAndMaybeGetCount();
+ Assert.assertTrue(logCount.isPresent());
+ Assert.assertEquals(3, logCount.getAsLong());
+ }
+
+ @Test
+ public void testResetMakesNextEmptyPollLoggable() {
+ final AtomicLong ticker = new AtomicLong();
+ final EmptyPollLogThrottler throttler = new EmptyPollLogThrottler(100, ticker::get);
+
+ Assert.assertTrue(throttler.markEmptyPollAndMaybeGetCount().isPresent());
+ Assert.assertFalse(throttler.markEmptyPollAndMaybeGetCount().isPresent());
+
+ throttler.reset();
+
+ final OptionalLong logCount = throttler.markEmptyPollAndMaybeGetCount();
+ Assert.assertTrue(logCount.isPresent());
+ Assert.assertEquals(1, logCount.getAsLong());
+ }
+}