From 23e376f3dfcaa699edf52255b9de2d54c31ca8bf Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 30 Jun 2026 15:34:46 +0800 Subject: [PATCH] Throttle empty subscription poll logs (#18063) * Throttle empty subscription poll logs * Use i18n messages for empty poll logs # Conflicts: # iotdb-client/subscription/src/main/i18n/en/org/apache/iotdb/rpc/subscription/i18n/SubscriptionMessages.java # iotdb-client/subscription/src/main/i18n/zh/org/apache/iotdb/rpc/subscription/i18n/SubscriptionMessages.java # iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottler.java # iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java --- iotdb-client/subscription/pom.xml | 5 ++ .../consumer/EmptyPollLogThrottler.java | 62 ++++++++++++++++++ .../consumer/SubscriptionPullConsumer.java | 22 +++++-- .../consumer/SubscriptionPushConsumer.java | 21 +++++-- .../consumer/EmptyPollLogThrottlerTest.java | 63 +++++++++++++++++++ 5 files changed, 163 insertions(+), 10 deletions(-) create mode 100644 iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottler.java create mode 100644 iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottlerTest.java diff --git a/iotdb-client/subscription/pom.xml b/iotdb-client/subscription/pom.xml index 5c0c30e55bbe8..675e2678fa70d 100644 --- a/iotdb-client/subscription/pom.xml +++ b/iotdb-client/subscription/pom.xml @@ -77,5 +77,10 @@ org.apache.thrift libthrift + + junit + junit + test + diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottler.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottler.java new file mode 100644 index 0000000000000..296eedcd0104e --- /dev/null +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottler.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.session.subscription.consumer; + +import java.util.OptionalLong; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; + +final class EmptyPollLogThrottler { + + private static final long DEFAULT_LOG_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(1); + + private final long logIntervalNanos; + private final LongSupplier ticker; + + private long consecutiveEmptyPollCount; + private long lastLogTimeNanos; + private boolean hasLoggedEmptyPoll; + + EmptyPollLogThrottler() { + this(DEFAULT_LOG_INTERVAL_NANOS, System::nanoTime); + } + + EmptyPollLogThrottler(final long logIntervalNanos, final LongSupplier ticker) { + this.logIntervalNanos = Math.max(logIntervalNanos, 1); + this.ticker = ticker; + } + + synchronized OptionalLong markEmptyPollAndMaybeGetCount() { + consecutiveEmptyPollCount++; + final long currentTimeNanos = ticker.getAsLong(); + if (!hasLoggedEmptyPoll || currentTimeNanos - lastLogTimeNanos >= logIntervalNanos) { + hasLoggedEmptyPoll = true; + lastLogTimeNanos = currentTimeNanos; + return OptionalLong.of(consecutiveEmptyPollCount); + } + return OptionalLong.empty(); + } + + synchronized void reset() { + consecutiveEmptyPollCount = 0; + lastLogTimeNanos = 0; + hasLoggedEmptyPoll = false; + } +} diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java index 227843811b15b..80eaf34b868a0 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.OptionalLong; import java.util.Properties; import java.util.Set; import java.util.SortedMap; @@ -65,6 +66,8 @@ public class SubscriptionPullConsumer extends SubscriptionConsumer { private SortedMap> uncommittedCommitContexts; + private final EmptyPollLogThrottler emptyPollLogThrottler = new EmptyPollLogThrottler(); + private final AtomicBoolean isClosed = new AtomicBoolean(true); @Override @@ -115,6 +118,7 @@ public synchronized void open() throws SubscriptionException { // set isClosed to false before submitting workers isClosed.set(false); + emptyPollLogThrottler.reset(); // submit auto poll worker if enabling auto commit if (autoCommit) { @@ -181,14 +185,22 @@ public List poll(final Set topicNames, final long t final List messages = multiplePoll(parsedTopicNames, timeoutMs); if (messages.isEmpty()) { - LOGGER.info( - "SubscriptionPullConsumer {} poll empty message from topics {} after {} millisecond(s)", - this, - CollectionUtils.getLimitedString(parsedTopicNames, 32), - timeoutMs); + final OptionalLong consecutiveEmptyPollCount = + emptyPollLogThrottler.markEmptyPollAndMaybeGetCount(); + if (consecutiveEmptyPollCount.isPresent()) { + LOGGER.info( + "SubscriptionPullConsumer {} poll empty message from topics {} after {} millisecond(s), " + + "consecutive empty polls: {}", + this, + CollectionUtils.getLimitedString(parsedTopicNames, 32), + timeoutMs, + consecutiveEmptyPollCount.getAsLong()); + } return messages; } + emptyPollLogThrottler.reset(); + // add to uncommitted messages if (autoCommit) { final long currentTimestamp = System.currentTimeMillis(); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java index b70633bef35a8..fd28e81660960 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.OptionalLong; import java.util.Properties; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,6 +57,8 @@ public class SubscriptionPushConsumer extends SubscriptionConsumer { private final long autoPollIntervalMs; private final long autoPollTimeoutMs; + private final EmptyPollLogThrottler emptyPollLogThrottler = new EmptyPollLogThrottler(); + private final AtomicBoolean isClosed = new AtomicBoolean(true); protected SubscriptionPushConsumer(final Builder builder) { @@ -121,6 +124,7 @@ public synchronized void open() throws SubscriptionException { // set isClosed to false before submitting workers isClosed.set(false); + emptyPollLogThrottler.reset(); // submit auto poll worker submitAutoPollWorker(); @@ -176,14 +180,21 @@ public void run() { final List messages = multiplePoll(subscribedTopics.keySet(), autoPollTimeoutMs); if (messages.isEmpty()) { - LOGGER.info( - "SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s)", - this, - CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32), - autoPollTimeoutMs); + final OptionalLong consecutiveEmptyPollCount = + emptyPollLogThrottler.markEmptyPollAndMaybeGetCount(); + if (consecutiveEmptyPollCount.isPresent()) { + LOGGER.info( + "SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s), " + + "consecutive empty polls: {}", + SubscriptionPushConsumer.this, + CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32), + autoPollTimeoutMs, + consecutiveEmptyPollCount.getAsLong()); + } return; } + emptyPollLogThrottler.reset(); if (ackStrategy.equals(AckStrategy.BEFORE_CONSUME)) { ack(messages); } diff --git a/iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottlerTest.java b/iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottlerTest.java new file mode 100644 index 0000000000000..a86aa1a8a7981 --- /dev/null +++ b/iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/EmptyPollLogThrottlerTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.session.subscription.consumer; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.OptionalLong; +import java.util.concurrent.atomic.AtomicLong; + +public class EmptyPollLogThrottlerTest { + + @Test + public void testThrottleConsecutiveEmptyPollLogs() { + final AtomicLong ticker = new AtomicLong(); + final EmptyPollLogThrottler throttler = new EmptyPollLogThrottler(100, ticker::get); + + OptionalLong logCount = throttler.markEmptyPollAndMaybeGetCount(); + Assert.assertTrue(logCount.isPresent()); + Assert.assertEquals(1, logCount.getAsLong()); + + ticker.addAndGet(99); + logCount = throttler.markEmptyPollAndMaybeGetCount(); + Assert.assertFalse(logCount.isPresent()); + + ticker.incrementAndGet(); + logCount = throttler.markEmptyPollAndMaybeGetCount(); + Assert.assertTrue(logCount.isPresent()); + Assert.assertEquals(3, logCount.getAsLong()); + } + + @Test + public void testResetMakesNextEmptyPollLoggable() { + final AtomicLong ticker = new AtomicLong(); + final EmptyPollLogThrottler throttler = new EmptyPollLogThrottler(100, ticker::get); + + Assert.assertTrue(throttler.markEmptyPollAndMaybeGetCount().isPresent()); + Assert.assertFalse(throttler.markEmptyPollAndMaybeGetCount().isPresent()); + + throttler.reset(); + + final OptionalLong logCount = throttler.markEmptyPollAndMaybeGetCount(); + Assert.assertTrue(logCount.isPresent()); + Assert.assertEquals(1, logCount.getAsLong()); + } +}