Skip to content

Commit d00d476

Browse files
authored
Stabilize ZSTD compressor level pipe IT (#17917) (#18066)
* Strictly isolate pipe tree and table visibility * Add pipe visibility unit test coverage * Add pipe static meta visibility edge tests * Add pipe table visibility filter test * Fix table pipe RPC visibility in ITs * Fix zstd compressor level IT assertion * it-fix * Update IoTDBPipeSinkCompressionIT.java * Stabilize zstd compressor level pipe IT * Keep only zstd compression pipe IT changes (cherry picked from commit d737f4b)
1 parent 59243c5 commit d00d476

1 file changed

Lines changed: 75 additions & 105 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSinkCompressionIT.java

Lines changed: 75 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -217,115 +217,85 @@ public void testZstdCompressorLevel() throws Exception {
217217

218218
try (final SyncConfigNodeIServiceClient client =
219219
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
220-
TestUtils.executeNonQueries(
221-
senderEnv,
222-
Arrays.asList(
223-
"insert into root.db.d1(time,s1) values (1,1)",
224-
"insert into root.db.d1(time,s2) values (1,1)",
225-
"insert into root.db.d1(time,s3) values (1,1)",
226-
"insert into root.db.d1(time,s4) values (1,1)",
227-
"insert into root.db.d1(time,s5) values (1,1)",
228-
"flush"),
229-
null);
230-
231-
// Create 5 pipes with different zstd compression levels, p4 and p5 should fail.
232-
233-
try (final Connection connection = senderEnv.getConnection();
234-
final Statement statement = connection.createStatement()) {
235-
statement.execute(
236-
String.format(
237-
"create pipe p1"
238-
+ " with extractor ('extractor.pattern'='root.db.d1.s1')"
239-
+ " with connector ("
240-
+ "'connector.ip'='%s',"
241-
+ "'connector.port'='%s',"
242-
+ "'connector.compressor'='zstd, zstd',"
243-
+ "'connector.compressor.zstd.level'='3')",
244-
receiverIp, receiverPort));
245-
} catch (SQLException e) {
246-
e.printStackTrace();
247-
fail(e.getMessage());
248-
}
249-
250-
try (final Connection connection = senderEnv.getConnection();
251-
final Statement statement = connection.createStatement()) {
252-
statement.execute(
253-
String.format(
254-
"create pipe p2"
255-
+ " with extractor ('extractor.pattern'='root.db.d1.s2')"
256-
+ " with connector ("
257-
+ "'connector.ip'='%s',"
258-
+ "'connector.port'='%s',"
259-
+ "'connector.compressor'='zstd, zstd',"
260-
+ "'connector.compressor.zstd.level'='22')",
261-
receiverIp, receiverPort));
262-
} catch (SQLException e) {
263-
e.printStackTrace();
264-
fail(e.getMessage());
265-
}
266-
267-
try (final Connection connection = senderEnv.getConnection();
268-
final Statement statement = connection.createStatement()) {
269-
statement.execute(
270-
String.format(
271-
"create pipe p3"
272-
+ " with extractor ('extractor.pattern'='root.db.d1.s3')"
273-
+ " with connector ("
274-
+ "'connector.ip'='%s',"
275-
+ "'connector.port'='%s',"
276-
+ "'connector.compressor'='zstd, zstd',"
277-
+ "'connector.compressor.zstd.level'='-131072')",
278-
receiverIp, receiverPort));
279-
} catch (SQLException e) {
280-
e.printStackTrace();
281-
fail(e.getMessage());
282-
}
283-
284-
try (final Connection connection = senderEnv.getConnection();
285-
final Statement statement = connection.createStatement()) {
286-
statement.execute(
287-
String.format(
288-
"create pipe p4"
289-
+ " with extractor ('extractor.pattern'='root.db.d1.s4')"
290-
+ " with connector ("
291-
+ "'connector.ip'='%s',"
292-
+ "'connector.port'='%s',"
293-
+ "'connector.compressor'='zstd, zstd',"
294-
+ "'connector.compressor.zstd.level'='-131073')",
295-
receiverIp, receiverPort));
296-
fail();
297-
} catch (SQLException e) {
298-
// Make sure the error message in IoTDBConnector.java is returned
299-
Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
300-
}
301-
302-
try (final Connection connection = senderEnv.getConnection();
303-
final Statement statement = connection.createStatement()) {
304-
statement.execute(
305-
String.format(
306-
"create pipe p5"
307-
+ " with extractor ('extractor.pattern'='root.db.d1.s5')"
308-
+ " with connector ("
309-
+ "'connector.ip'='%s',"
310-
+ "'connector.port'='%s',"
311-
+ "'connector.compressor'='zstd, zstd',"
312-
+ "'connector.compressor.zstd.level'='23')",
313-
receiverIp, receiverPort));
314-
fail();
315-
} catch (SQLException e) {
316-
// Make sure the error message in IoTDBConnector.java is returned
317-
Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
318-
}
220+
// Create legal zstd level pipes one by one, so the assertion identifies the exact level
221+
// that fails and avoids concurrent historical TsFile splitting for this level test.
222+
createZstdPipeAndAssertData(
223+
"p1", "root.db.d1.s1", "3", receiverIp, receiverPort, "s1", handleFailure);
224+
createZstdPipeAndAssertData(
225+
"p2", "root.db.d1.s2", "22", receiverIp, receiverPort, "s2", handleFailure);
226+
createZstdPipeAndAssertData(
227+
"p3", "root.db.d1.s3", "-131072", receiverIp, receiverPort, "s3", handleFailure);
228+
229+
assertCreateZstdPipeFailed("p4", "root.db.d1.s4", "-131073", receiverIp, receiverPort);
230+
assertCreateZstdPipeFailed("p5", "root.db.d1.s5", "23", receiverIp, receiverPort);
319231

320232
final List<TShowPipeInfo> showPipeResult = client.showPipe(new TShowPipeReq()).pipeInfoList;
321233
Assert.assertEquals(3, showPipeResult.size());
234+
}
235+
}
322236

323-
TestUtils.assertDataEventuallyOnEnv(
324-
receiverEnv,
325-
"count timeseries",
326-
"count(timeseries),",
327-
Collections.singleton("3,"),
328-
handleFailure);
237+
private void createZstdPipeAndAssertData(
238+
final String pipeName,
239+
final String extractorPattern,
240+
final String zstdLevel,
241+
final String receiverIp,
242+
final int receiverPort,
243+
final String measurement,
244+
final Consumer<String> handleFailure) {
245+
TestUtils.executeNonQueries(
246+
senderEnv,
247+
Arrays.asList(
248+
String.format("insert into root.db.d1(time,%s) values (1,1)", measurement), "flush"),
249+
null);
250+
251+
try {
252+
createZstdPipe(pipeName, extractorPattern, zstdLevel, receiverIp, receiverPort);
253+
} catch (final SQLException e) {
254+
e.printStackTrace();
255+
fail(e.getMessage());
256+
}
257+
258+
TestUtils.assertDataEventuallyOnEnv(
259+
receiverEnv,
260+
String.format("select count(%s) from root.db.d1", measurement),
261+
String.format("count(root.db.d1.%s),", measurement),
262+
Collections.singleton("1,"),
263+
handleFailure);
264+
}
265+
266+
private void assertCreateZstdPipeFailed(
267+
final String pipeName,
268+
final String extractorPattern,
269+
final String zstdLevel,
270+
final String receiverIp,
271+
final int receiverPort) {
272+
try {
273+
createZstdPipe(pipeName, extractorPattern, zstdLevel, receiverIp, receiverPort);
274+
fail();
275+
} catch (final SQLException e) {
276+
Assert.assertTrue(e.getMessage().contains("Zstd compression level should be in the range"));
277+
}
278+
}
279+
280+
private void createZstdPipe(
281+
final String pipeName,
282+
final String extractorPattern,
283+
final String zstdLevel,
284+
final String receiverIp,
285+
final int receiverPort)
286+
throws SQLException {
287+
try (final Connection connection = senderEnv.getConnection();
288+
final Statement statement = connection.createStatement()) {
289+
statement.execute(
290+
String.format(
291+
"create pipe %s"
292+
+ " with extractor ('extractor.pattern'='%s')"
293+
+ " with connector ("
294+
+ "'connector.ip'='%s',"
295+
+ "'connector.port'='%s',"
296+
+ "'connector.compressor'='zstd, zstd',"
297+
+ "'connector.compressor.zstd.level'='%s')",
298+
pipeName, extractorPattern, receiverIp, receiverPort, zstdLevel));
329299
}
330300
}
331301
}

0 commit comments

Comments
 (0)