Skip to content

Commit 62f36fd

Browse files
committed
Support target database in write-back sink (#18058)
* Support target database in write back sink * Validate write-back sink target database * Cover write-back sink database validation semantics * Update WriteBackSink.java * Address write-back sink review comments * Clarify write-back sink target database normalization
1 parent eb42ffb commit 62f36fd

2 files changed

Lines changed: 291 additions & 3 deletions

File tree

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.it.manual;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
24+
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
25+
import org.apache.iotdb.db.it.utils.TestUtils;
26+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
27+
import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
28+
import org.apache.iotdb.rpc.TSStatusCode;
29+
30+
import org.junit.Assert;
31+
import org.junit.Test;
32+
import org.junit.experimental.categories.Category;
33+
import org.junit.runner.RunWith;
34+
35+
import java.util.Arrays;
36+
import java.util.Collections;
37+
import java.util.HashMap;
38+
import java.util.HashSet;
39+
import java.util.Map;
40+
41+
@RunWith(IoTDBTestRunner.class)
42+
@Category({MultiClusterIT2ManualCreateSchema.class})
43+
public class IoTDBPipeWriteBackSinkIT extends AbstractPipeDualManualIT {
44+
45+
@Test
46+
public void testWriteBackSinkWithTargetDatabaseForTreeModel() throws Exception {
47+
TestUtils.executeNonQueries(
48+
senderEnv,
49+
Arrays.asList(
50+
"create database root.source",
51+
"create timeseries root.source.d1.s1 with datatype=INT32,encoding=PLAIN",
52+
"create database root.target.db",
53+
"create timeseries root.target.db.d1.s1 with datatype=INT32,encoding=PLAIN"),
54+
null);
55+
56+
try (final SyncConfigNodeIServiceClient client =
57+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
58+
final Map<String, String> sourceAttributes = new HashMap<>();
59+
final Map<String, String> processorAttributes = new HashMap<>();
60+
final Map<String, String> sinkAttributes = new HashMap<>();
61+
62+
sourceAttributes.put("extractor.inclusion", "data.insert");
63+
sourceAttributes.put("extractor.forwarding-pipe-requests", "false");
64+
sourceAttributes.put("extractor.path", "root.source.**");
65+
sourceAttributes.put("user", "root");
66+
67+
sinkAttributes.put("sink", "write-back-sink");
68+
sinkAttributes.put("sink.database", "root.target.db");
69+
sinkAttributes.put("user", "root");
70+
71+
final TSStatus status =
72+
client.createPipe(
73+
new TCreatePipeReq("testPipe", sinkAttributes)
74+
.setExtractorAttributes(sourceAttributes)
75+
.setProcessorAttributes(processorAttributes));
76+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
77+
78+
Assert.assertEquals(
79+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
80+
}
81+
82+
TestUtils.executeNonQueries(
83+
senderEnv,
84+
Arrays.asList(
85+
"insert into root.source.d1(time, s1) values (1, 1)",
86+
"insert into root.source.d1(time, s1) values (2, 2)",
87+
"flush"),
88+
null);
89+
90+
TestUtils.assertDataEventuallyOnEnv(
91+
senderEnv,
92+
"select * from root.target.db.**",
93+
"Time,root.target.db.d1.s1,",
94+
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1,", "2,2,"))));
95+
}
96+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java

Lines changed: 195 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@
2020
package org.apache.iotdb.db.pipe.sink.protocol.writeback;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.conf.IoTDBConstant;
24+
import org.apache.iotdb.commons.exception.IllegalPathException;
2325
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
26+
import org.apache.iotdb.commons.path.PartialPath;
27+
import org.apache.iotdb.commons.utils.PathUtils;
2428
import org.apache.iotdb.db.auth.AuthorityChecker;
29+
import org.apache.iotdb.db.conf.IoTDBConfig;
2530
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2631
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
2732
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -36,7 +41,11 @@
3641
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
3742
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
3843
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
44+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
45+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
46+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
3947
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
48+
import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaFormatUtils;
4049
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
4150
import org.apache.iotdb.pipe.api.PipeConnector;
4251
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -52,21 +61,93 @@
5261
import org.slf4j.LoggerFactory;
5362

5463
import java.time.ZoneId;
64+
import java.util.ArrayList;
65+
import java.util.Arrays;
66+
import java.util.Collections;
67+
import java.util.Locale;
68+
import java.util.Objects;
69+
70+
import static org.apache.iotdb.commons.conf.IoTDBConstant.MAX_DATABASE_NAME_LENGTH;
5571

5672
public class WriteBackSink implements PipeConnector {
5773

5874
private static final Logger LOGGER = LoggerFactory.getLogger(WriteBackSink.class);
75+
private static final String CONNECTOR_IOTDB_DATABASE_KEY = "connector.database";
76+
private static final String SINK_IOTDB_DATABASE_KEY = "sink.database";
77+
78+
private String targetTreeModelDatabaseName;
5979

6080
@Override
6181
public void validate(final PipeParameterValidator validator) throws Exception {
62-
// Do nothing
82+
validator.validateSynonymAttributes(
83+
Collections.singletonList(CONNECTOR_IOTDB_DATABASE_KEY),
84+
Collections.singletonList(SINK_IOTDB_DATABASE_KEY),
85+
false);
86+
87+
final String targetDatabase =
88+
validator
89+
.getParameters()
90+
.getStringByKeys(CONNECTOR_IOTDB_DATABASE_KEY, SINK_IOTDB_DATABASE_KEY);
91+
if (Objects.nonNull(targetDatabase)) {
92+
validateTargetDatabase(targetDatabase);
93+
}
6394
}
6495

6596
@Override
6697
public void customize(
6798
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
6899
throws Exception {
69-
// Do nothing
100+
final String targetDatabase =
101+
parameters.getStringByKeys(CONNECTOR_IOTDB_DATABASE_KEY, SINK_IOTDB_DATABASE_KEY);
102+
if (Objects.nonNull(targetDatabase)) {
103+
targetTreeModelDatabaseName = validateTargetDatabase(targetDatabase);
104+
}
105+
}
106+
107+
private static String validateTargetDatabase(final String targetDatabase) {
108+
final String trimmedTargetDatabase = targetDatabase.trim();
109+
if (trimmedTargetDatabase.startsWith(IoTDBConstant.PATH_ROOT + ".")) {
110+
return validateAndNormalizeTreeModelDatabaseName(trimmedTargetDatabase);
111+
}
112+
113+
try {
114+
PathUtils.checkAndReturnSingleMeasurement(trimmedTargetDatabase);
115+
return validateAndNormalizeTreeModelDatabaseName(
116+
IoTDBConstant.PATH_ROOT
117+
+ IoTDBConstant.PATH_SEPARATOR
118+
+ trimmedTargetDatabase.toLowerCase(Locale.ENGLISH));
119+
} catch (final Exception e) {
120+
throw new PipeException(
121+
String.format("The target database %s is invalid.", targetDatabase), e);
122+
}
123+
}
124+
125+
private static String validateAndNormalizeTreeModelDatabaseName(final String databaseName) {
126+
try {
127+
final PartialPath databasePath = new PartialPath(databaseName);
128+
final String[] nodes = databasePath.getNodes();
129+
if (nodes.length <= 1 || !IoTDBConstant.PATH_ROOT.equals(nodes[0])) {
130+
throw new IllegalPathException(
131+
databaseName, "the database name in tree model must start with 'root.'.");
132+
}
133+
134+
final String normalizedDatabaseName = databasePath.getFullPath();
135+
MetaFormatUtils.checkDatabase(normalizedDatabaseName);
136+
137+
if (normalizedDatabaseName.length() > MAX_DATABASE_NAME_LENGTH) {
138+
throw new IllegalPathException(
139+
normalizedDatabaseName,
140+
"the length of database name shall not exceed " + MAX_DATABASE_NAME_LENGTH);
141+
}
142+
return normalizedDatabaseName;
143+
} catch (final Exception e) {
144+
throw new PipeException(
145+
String.format(
146+
"The tree model database %s is invalid. The database name should match %s "
147+
+ "and the length should not exceed %s.",
148+
databaseName, IoTDBConfig.STORAGE_GROUP_PATTERN, MAX_DATABASE_NAME_LENGTH),
149+
e);
150+
}
70151
}
71152

72153
@Override
@@ -129,6 +210,7 @@ private void doTransfer(
129210
final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
130211
final InsertBaseStatement statement =
131212
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(insertNode).constructStatement();
213+
rewriteTreeModelDatabaseNameIfNecessary(statement, null);
132214
status = statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : executeStatement(statement);
133215

134216
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -160,6 +242,8 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion
160242
pipeRawTabletInsertionEvent.convertToTablet(),
161243
pipeRawTabletInsertionEvent.isAligned())
162244
.constructStatement();
245+
rewriteTreeModelDatabaseNameIfNecessary(
246+
statement, pipeRawTabletInsertionEvent.getTreeModelDatabaseName());
163247
final TSStatus status =
164248
statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : executeStatement(statement);
165249

@@ -181,6 +265,114 @@ private static void throwWriteBackExceptionIfNecessary(
181265
throw new PipeException(exceptionMessage);
182266
}
183267

268+
private InsertBaseStatement rewriteTreeModelDatabaseNameIfNecessary(
269+
final InsertBaseStatement statement, final String sourceTreeModelDatabaseName) {
270+
if (Objects.isNull(targetTreeModelDatabaseName)) {
271+
return statement;
272+
}
273+
274+
rewriteTreeModelDatabaseName(statement, sourceTreeModelDatabaseName);
275+
return statement;
276+
}
277+
278+
private void rewriteTreeModelDatabaseName(
279+
final InsertBaseStatement statement, final String sourceTreeModelDatabaseName) {
280+
if (statement instanceof InsertRowsStatement) {
281+
((InsertRowsStatement) statement)
282+
.getInsertRowStatementList()
283+
.forEach(
284+
rowStatement ->
285+
rewriteTreeModelDatabaseName(rowStatement, sourceTreeModelDatabaseName));
286+
return;
287+
}
288+
289+
if (statement instanceof InsertRowsOfOneDeviceStatement) {
290+
final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement =
291+
(InsertRowsOfOneDeviceStatement) statement;
292+
insertRowsOfOneDeviceStatement
293+
.getInsertRowStatementList()
294+
.forEach(
295+
rowStatement ->
296+
rewriteTreeModelDatabaseName(rowStatement, sourceTreeModelDatabaseName));
297+
insertRowsOfOneDeviceStatement.setDevicePath(
298+
rewriteTreeModelDevicePath(
299+
insertRowsOfOneDeviceStatement.getDevicePath(), sourceTreeModelDatabaseName));
300+
return;
301+
}
302+
303+
if (statement instanceof InsertMultiTabletsStatement) {
304+
((InsertMultiTabletsStatement) statement)
305+
.getInsertTabletStatementList()
306+
.forEach(
307+
tabletStatement ->
308+
rewriteTreeModelDatabaseName(tabletStatement, sourceTreeModelDatabaseName));
309+
return;
310+
}
311+
312+
statement.setDevicePath(
313+
rewriteTreeModelDevicePath(statement.getDevicePath(), sourceTreeModelDatabaseName));
314+
}
315+
316+
private PartialPath rewriteTreeModelDevicePath(
317+
final PartialPath devicePath, final String sourceTreeModelDatabaseName) {
318+
if (Objects.isNull(devicePath)) {
319+
return devicePath;
320+
}
321+
322+
final String normalizedSourceTreeModelDatabaseName =
323+
Objects.nonNull(sourceTreeModelDatabaseName)
324+
? validateAndNormalizeTreeModelDatabaseName(sourceTreeModelDatabaseName)
325+
: inferTreeModelDatabaseName(devicePath);
326+
if (Objects.isNull(normalizedSourceTreeModelDatabaseName)) {
327+
return devicePath;
328+
}
329+
330+
try {
331+
final String[] sourceDatabaseNodes =
332+
new PartialPath(normalizedSourceTreeModelDatabaseName).getNodes();
333+
final String[] targetDatabaseNodes = new PartialPath(targetTreeModelDatabaseName).getNodes();
334+
final String[] deviceNodes = devicePath.getNodes();
335+
if (!startsWith(deviceNodes, sourceDatabaseNodes)) {
336+
return devicePath;
337+
}
338+
339+
final ArrayList<String> rebasedNodes =
340+
new ArrayList<>(
341+
targetDatabaseNodes.length + deviceNodes.length - sourceDatabaseNodes.length);
342+
rebasedNodes.addAll(Arrays.asList(targetDatabaseNodes));
343+
rebasedNodes.addAll(
344+
Arrays.asList(deviceNodes).subList(sourceDatabaseNodes.length, deviceNodes.length));
345+
return new PartialPath(rebasedNodes.toArray(new String[0]));
346+
} catch (final Exception e) {
347+
throw new PipeException(
348+
String.format(
349+
"Failed to rewrite tree model database from %s to %s for device path %s.",
350+
normalizedSourceTreeModelDatabaseName, targetTreeModelDatabaseName, devicePath),
351+
e);
352+
}
353+
}
354+
355+
private String inferTreeModelDatabaseName(final PartialPath devicePath) {
356+
final String[] deviceNodes = devicePath.getNodes();
357+
if (deviceNodes.length < 2 || !IoTDBConstant.PATH_ROOT.equals(deviceNodes[0])) {
358+
return null;
359+
}
360+
361+
return IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR + deviceNodes[1];
362+
}
363+
364+
private static boolean startsWith(final String[] nodes, final String[] prefixNodes) {
365+
if (nodes.length < prefixNodes.length) {
366+
return false;
367+
}
368+
for (int i = 0; i < prefixNodes.length; ++i) {
369+
if (!Objects.equals(nodes[i], prefixNodes[i])) {
370+
return false;
371+
}
372+
}
373+
return true;
374+
}
375+
184376
private TSStatus executeStatement(final InsertBaseStatement statement) {
185377
return Coordinator.getInstance()
186378
.executeForTreeModel(
@@ -199,4 +391,4 @@ private TSStatus executeStatement(final InsertBaseStatement statement) {
199391
public void close() throws Exception {
200392
// Do nothing
201393
}
202-
}
394+
}

0 commit comments

Comments
 (0)