Skip to content

Commit 9e3214d

Browse files
committed
Merge branch 'dev/1.3' of https://github.com/apache/iotdb into cp-580fd6a-dev-1.3
2 parents 31c2857 + a653aee commit 9e3214d

43 files changed

Lines changed: 3347 additions & 384 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java

Lines changed: 75 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -217,115 +217,85 @@ public void testZstdCompressorLevel() throws Exception {
217217

218218
try (final SyncConfigNodeIServiceClient client =
219219
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
220-
TestUtils.executeNonQueries(
221-
senderEnv,
222-
Arrays.asList(
223-
"insert into root.db.d1(time,s1) values (1,1)",
224-
"insert into root.db.d1(time,s2) values (1,1)",
225-
"insert into root.db.d1(time,s3) values (1,1)",
226-
"insert into root.db.d1(time,s4) values (1,1)",
227-
"insert into root.db.d1(time,s5) values (1,1)",
228-
"flush"),
229-
null);
230-
231-
// Create 5 pipes with different zstd compression levels, p4 and p5 should fail.
232-
233-
try (final Connection connection = senderEnv.getConnection();
234-
final Statement statement = connection.createStatement()) {
235-
statement.execute(
236-
String.format(
237-
"create pipe p1"
238-
+ " with extractor ('extractor.pattern'='root.db.d1.s1')"
239-
+ " with connector ("
240-
+ "'connector.ip'='%s',"
241-
+ "'connector.port'='%s',"
242-
+ "'connector.compressor'='zstd, zstd',"
243-
+ "'connector.compressor.zstd.level'='3')",
244-
receiverIp, receiverPort));
245-
} catch (SQLException e) {
246-
e.printStackTrace();
247-
fail(e.getMessage());
248-
}
249-
250-
try (final Connection connection = senderEnv.getConnection();
251-
final Statement statement = connection.createStatement()) {
252-
statement.execute(
253-
String.format(
254-
"create pipe p2"
255-
+ " with extractor ('extractor.pattern'='root.db.d1.s2')"
256-
+ " with connector ("
257-
+ "'connector.ip'='%s',"
258-
+ "'connector.port'='%s',"
259-
+ "'connector.compressor'='zstd, zstd',"
260-
+ "'connector.compressor.zstd.level'='22')",
261-
receiverIp, receiverPort));
262-
} catch (SQLException e) {
263-
e.printStackTrace();
264-
fail(e.getMessage());
265-
}
266-
267-
try (final Connection connection = senderEnv.getConnection();
268-
final Statement statement = connection.createStatement()) {
269-
statement.execute(
270-
String.format(
271-
"create pipe p3"
272-
+ " with extractor ('extractor.pattern'='root.db.d1.s3')"
273-
+ " with connector ("
274-
+ "'connector.ip'='%s',"
275-
+ "'connector.port'='%s',"
276-
+ "'connector.compressor'='zstd, zstd',"
277-
+ "'connector.compressor.zstd.level'='-131072')",
278-
receiverIp, receiverPort));
279-
} catch (SQLException e) {
280-
e.printStackTrace();
281-
fail(e.getMessage());
282-
}
283-
284-
try (final Connection connection = senderEnv.getConnection();
285-
final Statement statement = connection.createStatement()) {
286-
statement.execute(
287-
String.format(
288-
"create pipe p4"
289-
+ " with extractor ('extractor.pattern'='root.db.d1.s4')"
290-
+ " with connector ("
291-
+ "'connector.ip'='%s',"
292-
+ "'connector.port'='%s',"
293-
+ "'connector.compressor'='zstd, zstd',"
294-
+ "'connector.compressor.zstd.level'='-131073')",
295-
receiverIp, receiverPort));
296-
fail();
297-
} catch (SQLException e) {
298-
// Make sure the error message in IoTDBConnector.java is returned
299-
Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
300-
}
301-
302-
try (final Connection connection = senderEnv.getConnection();
303-
final Statement statement = connection.createStatement()) {
304-
statement.execute(
305-
String.format(
306-
"create pipe p5"
307-
+ " with extractor ('extractor.pattern'='root.db.d1.s5')"
308-
+ " with connector ("
309-
+ "'connector.ip'='%s',"
310-
+ "'connector.port'='%s',"
311-
+ "'connector.compressor'='zstd, zstd',"
312-
+ "'connector.compressor.zstd.level'='23')",
313-
receiverIp, receiverPort));
314-
fail();
315-
} catch (SQLException e) {
316-
// Make sure the error message in IoTDBConnector.java is returned
317-
Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
318-
}
220+
// Create legal zstd level pipes one by one, so the assertion identifies the exact level
221+
// that fails and avoids concurrent historical TsFile splitting for this level test.
222+
createZstdPipeAndAssertData(
223+
"p1", "root.db.d1.s1", "3", receiverIp, receiverPort, "s1", handleFailure);
224+
createZstdPipeAndAssertData(
225+
"p2", "root.db.d1.s2", "22", receiverIp, receiverPort, "s2", handleFailure);
226+
createZstdPipeAndAssertData(
227+
"p3", "root.db.d1.s3", "-131072", receiverIp, receiverPort, "s3", handleFailure);
228+
229+
assertCreateZstdPipeFailed("p4", "root.db.d1.s4", "-131073", receiverIp, receiverPort);
230+
assertCreateZstdPipeFailed("p5", "root.db.d1.s5", "23", receiverIp, receiverPort);
319231

320232
final List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
321233
Assert.assertEquals(3, showPipeResult.size());
234+
}
235+
}
322236

323-
TestUtils.assertDataEventuallyOnEnv(
324-
receiverEnv,
325-
"count timeseries",
326-
"count(timeseries),",
327-
Collections.singleton("3,"),
328-
handleFailure);
237+
private void createZstdPipeAndAssertData(
238+
final String pipeName,
239+
final String extractorPattern,
240+
final String zstdLevel,
241+
final String receiverIp,
242+
final int receiverPort,
243+
final String measurement,
244+
final Consumer<String> handleFailure) {
245+
TestUtils.executeNonQueries(
246+
senderEnv,
247+
Arrays.asList(
248+
String.format("insert into root.db.d1(time,%s) values (1,1)", measurement), "flush"),
249+
null);
250+
251+
try {
252+
createZstdPipe(pipeName, extractorPattern, zstdLevel, receiverIp, receiverPort);
253+
} catch (final SQLException e) {
254+
e.printStackTrace();
255+
fail(e.getMessage());
256+
}
257+
258+
TestUtils.assertDataEventuallyOnEnv(
259+
receiverEnv,
260+
String.format("select count(%s) from root.db.d1", measurement),
261+
String.format("count(root.db.d1.%s),", measurement),
262+
Collections.singleton("1,"),
263+
handleFailure);
264+
}
265+
266+
private void assertCreateZstdPipeFailed(
267+
final String pipeName,
268+
final String extractorPattern,
269+
final String zstdLevel,
270+
final String receiverIp,
271+
final int receiverPort) {
272+
try {
273+
createZstdPipe(pipeName, extractorPattern, zstdLevel, receiverIp, receiverPort);
274+
fail();
275+
} catch (final SQLException e) {
276+
Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
277+
}
278+
}
279+
280+
private void createZstdPipe(
281+
final String pipeName,
282+
final String extractorPattern,
283+
final String zstdLevel,
284+
final String receiverIp,
285+
final int receiverPort)
286+
throws SQLException {
287+
try (final Connection connection = senderEnv.getConnection();
288+
final Statement statement = connection.createStatement()) {
289+
statement.execute(
290+
String.format(
291+
"create pipe %s"
292+
+ " with extractor ('extractor.pattern'='%s')"
293+
+ " with connector ("
294+
+ "'connector.ip'='%s',"
295+
+ "'connector.port'='%s',"
296+
+ "'connector.compressor'='zstd, zstd',"
297+
+ "'connector.compressor.zstd.level'='%s')",
298+
pipeName, extractorPattern, receiverIp, receiverPort, zstdLevel));
329299
}
330300
}
331301
}

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ private SubscriptionPullConsumer createConsumer(
150150
.encryptedPassword(encryptedPassword)
151151
.consumerId("consumer_" + consumerGroupId)
152152
.consumerGroupId(consumerGroupId)
153+
.autoCommit(false)
153154
.buildPullConsumer();
154155
}
155156

iotdb-client/subscription/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,5 +77,10 @@
7777
<groupId>org.apache.thrift</groupId>
7878
<artifactId>libthrift</artifactId>
7979
</dependency>
80+
<dependency>
81+
<groupId>junit</groupId>
82+
<artifactId>junit</artifactId>
83+
<scope>test</scope>
84+
</dependency>
8085
</dependencies>
8186
</project>
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.session.subscription.consumer;
21+
22+
import java.util.OptionalLong;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.function.LongSupplier;
25+
26+
final class EmptyPollLogThrottler {
27+
28+
private static final long DEFAULT_LOG_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(1);
29+
30+
private final long logIntervalNanos;
31+
private final LongSupplier ticker;
32+
33+
private long consecutiveEmptyPollCount;
34+
private long lastLogTimeNanos;
35+
private boolean hasLoggedEmptyPoll;
36+
37+
EmptyPollLogThrottler() {
38+
this(DEFAULT_LOG_INTERVAL_NANOS, System::nanoTime);
39+
}
40+
41+
EmptyPollLogThrottler(final long logIntervalNanos, final LongSupplier ticker) {
42+
this.logIntervalNanos = Math.max(logIntervalNanos, 1);
43+
this.ticker = ticker;
44+
}
45+
46+
synchronized OptionalLong markEmptyPollAndMaybeGetCount() {
47+
consecutiveEmptyPollCount++;
48+
final long currentTimeNanos = ticker.getAsLong();
49+
if (!hasLoggedEmptyPoll || currentTimeNanos - lastLogTimeNanos >= logIntervalNanos) {
50+
hasLoggedEmptyPoll = true;
51+
lastLogTimeNanos = currentTimeNanos;
52+
return OptionalLong.of(consecutiveEmptyPollCount);
53+
}
54+
return OptionalLong.empty();
55+
}
56+
57+
synchronized void reset() {
58+
consecutiveEmptyPollCount = 0;
59+
lastLogTimeNanos = 0;
60+
hasLoggedEmptyPoll = false;
61+
}
62+
}

iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.List;
3535
import java.util.Map;
3636
import java.util.Objects;
37+
import java.util.OptionalLong;
3738
import java.util.Properties;
3839
import java.util.Set;
3940
import java.util.SortedMap;
@@ -65,6 +66,8 @@ public class SubscriptionPullConsumer extends SubscriptionConsumer {
6566

6667
private SortedMap<Long, Set<SubscriptionCommitContext>> uncommittedCommitContexts;
6768

69+
private final EmptyPollLogThrottler emptyPollLogThrottler = new EmptyPollLogThrottler();
70+
6871
private final AtomicBoolean isClosed = new AtomicBoolean(true);
6972

7073
@Override
@@ -115,6 +118,7 @@ public synchronized void open() throws SubscriptionException {
115118

116119
// set isClosed to false before submitting workers
117120
isClosed.set(false);
121+
emptyPollLogThrottler.reset();
118122

119123
// submit auto poll worker if enabling auto commit
120124
if (autoCommit) {
@@ -181,14 +185,22 @@ public List<SubscriptionMessage> poll(final Set<String> topicNames, final long t
181185

182186
final List<SubscriptionMessage> messages = multiplePoll(parsedTopicNames, timeoutMs);
183187
if (messages.isEmpty()) {
184-
LOGGER.info(
185-
"SubscriptionPullConsumer {} poll empty message from topics {} after {} millisecond(s)",
186-
this,
187-
CollectionUtils.getLimitedString(parsedTopicNames, 32),
188-
timeoutMs);
188+
final OptionalLong consecutiveEmptyPollCount =
189+
emptyPollLogThrottler.markEmptyPollAndMaybeGetCount();
190+
if (consecutiveEmptyPollCount.isPresent()) {
191+
LOGGER.info(
192+
"SubscriptionPullConsumer {} poll empty message from topics {} after {} millisecond(s), "
193+
+ "consecutive empty polls: {}",
194+
this,
195+
CollectionUtils.getLimitedString(parsedTopicNames, 32),
196+
timeoutMs,
197+
consecutiveEmptyPollCount.getAsLong());
198+
}
189199
return messages;
190200
}
191201

202+
emptyPollLogThrottler.reset();
203+
192204
// add to uncommitted messages
193205
if (autoCommit) {
194206
final long currentTimestamp = System.currentTimeMillis();

iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.Objects;
34+
import java.util.OptionalLong;
3435
import java.util.Properties;
3536
import java.util.concurrent.ScheduledFuture;
3637
import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,6 +57,8 @@ public class SubscriptionPushConsumer extends SubscriptionConsumer {
5657
private final long autoPollIntervalMs;
5758
private final long autoPollTimeoutMs;
5859

60+
private final EmptyPollLogThrottler emptyPollLogThrottler = new EmptyPollLogThrottler();
61+
5962
private final AtomicBoolean isClosed = new AtomicBoolean(true);
6063

6164
protected SubscriptionPushConsumer(final Builder builder) {
@@ -121,6 +124,7 @@ public synchronized void open() throws SubscriptionException {
121124

122125
// set isClosed to false before submitting workers
123126
isClosed.set(false);
127+
emptyPollLogThrottler.reset();
124128

125129
// submit auto poll worker
126130
submitAutoPollWorker();
@@ -176,14 +180,21 @@ public void run() {
176180
final List<SubscriptionMessage> messages =
177181
multiplePoll(subscribedTopics.keySet(), autoPollTimeoutMs);
178182
if (messages.isEmpty()) {
179-
LOGGER.info(
180-
"SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s)",
181-
this,
182-
CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32),
183-
autoPollTimeoutMs);
183+
final OptionalLong consecutiveEmptyPollCount =
184+
emptyPollLogThrottler.markEmptyPollAndMaybeGetCount();
185+
if (consecutiveEmptyPollCount.isPresent()) {
186+
LOGGER.info(
187+
"SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s), "
188+
+ "consecutive empty polls: {}",
189+
SubscriptionPushConsumer.this,
190+
CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32),
191+
autoPollTimeoutMs,
192+
consecutiveEmptyPollCount.getAsLong());
193+
}
184194
return;
185195
}
186196

197+
emptyPollLogThrottler.reset();
187198
if (ackStrategy.equals(AckStrategy.BEFORE_CONSUME)) {
188199
ack(messages);
189200
}

0 commit comments

Comments
 (0)