From 6796450c6e23becd8f20cf75192062668672c079 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 29 Jun 2026 16:49:14 +0800 Subject: [PATCH 1/6] Support target database in write back sink --- .../manual/basic/IoTDBPipeDataSinkIT.java | 4 +- .../manual/IoTDBPipeWriteBackSinkIT.java | 96 ++++++++ .../protocol/writeback/WriteBackSink.java | 215 +++++++++++++++++- 3 files changed, 307 insertions(+), 8 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeWriteBackSinkIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java index 22ed58cb608a7..cacf8fb1395f3 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java @@ -266,10 +266,8 @@ public void testWriteBackSink() throws Exception { sourceAttributes.put("source.table-name", "test.*"); sourceAttributes.put("user", "root"); - processorAttributes.put("processor", "rename-database-processor"); - processorAttributes.put("processor.new-db-name", "Test1"); - sinkAttributes.put("sink", "write-back-sink"); + sinkAttributes.put("sink.database", "Test1"); sinkAttributes.put("user", "root"); final TSStatus status = diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeWriteBackSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeWriteBackSinkIT.java new file mode 100644 index 0000000000000..83ca3cda73dda --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeWriteBackSinkIT.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.treemodel.manual; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeManual; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTreeManual.class}) +public class IoTDBPipeWriteBackSinkIT extends AbstractPipeDualTreeModelManualIT { + + @Test + public void testWriteBackSinkWithTargetDatabaseForTreeModel() throws Exception { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create database root.source", + "create timeseries root.source.d1.s1 with datatype=INT32,encoding=PLAIN", + "create database root.target", + "create timeseries root.target.d1.s1 with datatype=INT32,encoding=PLAIN"), + null); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map sourceAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map sinkAttributes = new HashMap<>(); + + sourceAttributes.put("source.inclusion", "data.insert"); + sourceAttributes.put("source.forwarding-pipe-requests", "false"); + sourceAttributes.put("source.path", "root.source.**"); + sourceAttributes.put("user", "root"); + + sinkAttributes.put("sink", "write-back-sink"); + sinkAttributes.put("sink.database", "target"); + sinkAttributes.put("user", "root"); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", sinkAttributes) + .setExtractorAttributes(sourceAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + } + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.source.d1(time, s1) values (1, 1)", + "insert into root.source.d1(time, s1) values (2, 2)", + "flush"), + null); + + TestUtils.assertDataEventuallyOnEnv( + senderEnv, + "select * from root.target.**", + "Time,root.target.d1.s1,", + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1,", "2,2,")))); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java index 9bec114ede1c0..5a19d79510bf9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java @@ -25,11 +25,14 @@ import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.queryengine.common.SqlDialect; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent; @@ -43,6 +46,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; @@ -51,6 +55,9 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; @@ -75,7 +82,9 @@ import java.io.IOException; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Locale; import java.util.Objects; import java.util.Set; @@ -84,6 +93,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.apache.iotdb.commons.conf.IoTDBConstant.MAX_DATABASE_NAME_LENGTH; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_CLI_HOSTNAME; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES; @@ -103,12 +113,15 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.WRITE_BACK_CONNECTOR_SKIP_IF_DEFAULT_VALUE; import static org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause; import static org.apache.iotdb.db.exception.metadata.DatabaseNotSetException.DATABASE_NOT_SET; +import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; @TreeModel @TableModel public class WriteBackSink implements PipeConnector { private static final Logger LOGGER = LoggerFactory.getLogger(WriteBackSink.class); + private static final String CONNECTOR_IOTDB_DATABASE_KEY = "connector.database"; + private static final String SINK_IOTDB_DATABASE_KEY = "sink.database"; // Simulate the behavior of the client-to-server communication // for correctly handling data insertion in IoTDBReceiverAgent#receive method @@ -121,6 +134,8 @@ public class WriteBackSink implements PipeConnector { private boolean useEventUserName; private UserEntity userEntity; + private String targetTableModelDatabaseName; + private String targetTreeModelDatabaseName; private static final SqlParser RELATIONAL_SQL_PARSER = new SqlParser(); @@ -132,6 +147,37 @@ public void validate(final PipeParameterValidator validator) throws Exception { Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY), Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY), false); + validator.validateSynonymAttributes( + Collections.singletonList(CONNECTOR_IOTDB_DATABASE_KEY), + Collections.singletonList(SINK_IOTDB_DATABASE_KEY), + false); + + final String targetDatabase = + validator + .getParameters() + .getStringByKeys(CONNECTOR_IOTDB_DATABASE_KEY, SINK_IOTDB_DATABASE_KEY); + if (Objects.nonNull(targetDatabase)) { + validateTargetDatabase(targetDatabase); + } + } + + private static void validateTargetDatabase(final String targetDatabase) { + try { + TableConfigTaskVisitor.validateDatabaseName(PathUtils.unQualifyDatabaseName(targetDatabase)); + } catch (final Exception e) { + throw new PipeException( + String.format( + "The target database %s is invalid. It should be a table-model database name or " + + "the corresponding one-level tree-model database path like root.%s. It should " + + "not contain '%s', should match the pattern %s, and the length should not " + + "exceed %d", + targetDatabase, + PathUtils.unQualifyDatabaseName(targetDatabase), + PATH_SEPARATOR, + IoTDBConfig.DATABASE_PATTERN, + MAX_DATABASE_NAME_LENGTH), + e); + } } @Override @@ -192,6 +238,14 @@ public void customize( Arrays.asList(CONNECTOR_USE_EVENT_USER_NAME_KEY, SINK_USE_EVENT_USER_NAME_KEY), CONNECTOR_USE_EVENT_USER_NAME_DEFAULT_VALUE); + final String targetDatabase = + parameters.getStringByKeys(CONNECTOR_IOTDB_DATABASE_KEY, SINK_IOTDB_DATABASE_KEY); + if (Objects.nonNull(targetDatabase)) { + targetTableModelDatabaseName = + PathUtils.unQualifyDatabaseName(targetDatabase).toLowerCase(Locale.ENGLISH); + targetTreeModelDatabaseName = PathUtils.qualifyDatabaseName(targetTableModelDatabaseName); + } + if (SESSION_MANAGER.getCurrSession() == null) { SESSION_MANAGER.registerSession(session); } @@ -266,13 +320,18 @@ private void doTransfer( final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); final String dataBaseName = pipeInsertNodeTabletInsertionEvent.isTableModelEvent() - ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() + ? getTargetTableModelDatabaseNameOrDefault( + pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()) : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName(); final InsertBaseStatement insertBaseStatement; insertBaseStatement = PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(insertNode, dataBaseName) .constructStatement(); + if (!insertBaseStatement.isWriteToTable()) { + rewriteTreeModelDatabaseNameIfNecessary( + insertBaseStatement, pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName()); + } final TSStatus status = insertBaseStatement.isWriteToTable() @@ -310,7 +369,8 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion throws PipeException { final String dataBaseName = pipeRawTabletInsertionEvent.isTableModelEvent() - ? pipeRawTabletInsertionEvent.getTableModelDatabaseName() + ? getTargetTableModelDatabaseNameOrDefault( + pipeRawTabletInsertionEvent.getTableModelDatabaseName()) : pipeRawTabletInsertionEvent.getTreeModelDatabaseName(); final InsertTabletStatement insertTabletStatement = @@ -319,6 +379,10 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion pipeRawTabletInsertionEvent.isAligned(), dataBaseName) .constructStatement(); + if (!insertTabletStatement.isWriteToTable()) { + rewriteTreeModelDatabaseNameIfNecessary( + insertTabletStatement, pipeRawTabletInsertionEvent.getTreeModelDatabaseName()); + } final TSStatus status = insertTabletStatement.isWriteToTable() @@ -365,11 +429,15 @@ private void doTransfer(final PipeStatementInsertionEvent pipeStatementInsertion final TSStatus status = pipeStatementInsertionEvent.isTableModelEvent() ? executeStatementForTableModel( - pipeStatementInsertionEvent.getStatement(), - pipeStatementInsertionEvent.getTableModelDatabaseName(), + rewriteTableModelDatabaseNameIfNecessary( + pipeStatementInsertionEvent.getStatement()), + getTargetTableModelDatabaseNameOrDefault( + pipeStatementInsertionEvent.getTableModelDatabaseName()), pipeStatementInsertionEvent.getUserName()) : executeStatementForTreeModel( - pipeStatementInsertionEvent.getStatement(), + rewriteTreeModelDatabaseNameIfNecessary( + pipeStatementInsertionEvent.getStatement(), + pipeStatementInsertionEvent.getTreeModelDatabaseName()), pipeStatementInsertionEvent.getUserName()); if (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() @@ -384,6 +452,143 @@ private void doTransfer(final PipeStatementInsertionEvent pipeStatementInsertion } } + private String getTargetTableModelDatabaseNameOrDefault(final String databaseName) { + return Objects.nonNull(targetTableModelDatabaseName) + ? targetTableModelDatabaseName + : databaseName; + } + + private Statement rewriteTableModelDatabaseNameIfNecessary(final Statement statement) { + if (Objects.isNull(targetTableModelDatabaseName) + || !(statement instanceof InsertBaseStatement)) { + return statement; + } + + rewriteTableModelDatabaseName((InsertBaseStatement) statement); + return statement; + } + + private void rewriteTableModelDatabaseName(final InsertBaseStatement statement) { + statement.setDatabaseName(targetTableModelDatabaseName); + + if (statement instanceof InsertRowsStatement) { + ((InsertRowsStatement) statement) + .getInsertRowStatementList() + .forEach(this::rewriteTableModelDatabaseName); + } else if (statement instanceof InsertRowsOfOneDeviceStatement) { + ((InsertRowsOfOneDeviceStatement) statement) + .getInsertRowStatementList() + .forEach(this::rewriteTableModelDatabaseName); + } else if (statement instanceof InsertMultiTabletsStatement) { + ((InsertMultiTabletsStatement) statement) + .getInsertTabletStatementList() + .forEach(this::rewriteTableModelDatabaseName); + } + } + + private Statement rewriteTreeModelDatabaseNameIfNecessary( + final Statement statement, final String sourceTreeModelDatabaseName) { + if (!(statement instanceof InsertBaseStatement)) { + return statement; + } + + return rewriteTreeModelDatabaseNameIfNecessary( + (InsertBaseStatement) statement, sourceTreeModelDatabaseName); + } + + private InsertBaseStatement rewriteTreeModelDatabaseNameIfNecessary( + final InsertBaseStatement statement, final String sourceTreeModelDatabaseName) { + if (Objects.isNull(targetTreeModelDatabaseName)) { + return statement; + } + + rewriteTreeModelDatabaseName( + statement, PathUtils.qualifyDatabaseName(sourceTreeModelDatabaseName)); + return statement; + } + + private void rewriteTreeModelDatabaseName( + final InsertBaseStatement statement, final String sourceTreeModelDatabaseName) { + statement.setDatabaseName(targetTreeModelDatabaseName); + + if (statement instanceof InsertRowsStatement) { + ((InsertRowsStatement) statement) + .getInsertRowStatementList() + .forEach( + rowStatement -> + rewriteTreeModelDatabaseName(rowStatement, sourceTreeModelDatabaseName)); + return; + } + + if (statement instanceof InsertRowsOfOneDeviceStatement) { + final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement = + (InsertRowsOfOneDeviceStatement) statement; + insertRowsOfOneDeviceStatement + .getInsertRowStatementList() + .forEach( + rowStatement -> + rewriteTreeModelDatabaseName(rowStatement, sourceTreeModelDatabaseName)); + insertRowsOfOneDeviceStatement.setDevicePath( + rewriteTreeModelDevicePath( + insertRowsOfOneDeviceStatement.getDevicePath(), sourceTreeModelDatabaseName)); + return; + } + + if (statement instanceof InsertMultiTabletsStatement) { + ((InsertMultiTabletsStatement) statement) + .getInsertTabletStatementList() + .forEach( + tabletStatement -> + rewriteTreeModelDatabaseName(tabletStatement, sourceTreeModelDatabaseName)); + return; + } + + statement.setDevicePath( + rewriteTreeModelDevicePath(statement.getDevicePath(), sourceTreeModelDatabaseName)); + } + + private PartialPath rewriteTreeModelDevicePath( + final PartialPath devicePath, final String sourceTreeModelDatabaseName) { + if (Objects.isNull(devicePath) || Objects.isNull(sourceTreeModelDatabaseName)) { + return devicePath; + } + + try { + final String[] sourceDatabaseNodes = new PartialPath(sourceTreeModelDatabaseName).getNodes(); + final String[] targetDatabaseNodes = new PartialPath(targetTreeModelDatabaseName).getNodes(); + final String[] deviceNodes = devicePath.getNodes(); + if (!startsWith(deviceNodes, sourceDatabaseNodes)) { + return devicePath; + } + + final ArrayList rebasedNodes = + new ArrayList<>( + targetDatabaseNodes.length + deviceNodes.length - sourceDatabaseNodes.length); + rebasedNodes.addAll(Arrays.asList(targetDatabaseNodes)); + rebasedNodes.addAll( + Arrays.asList(deviceNodes).subList(sourceDatabaseNodes.length, deviceNodes.length)); + return new PartialPath(rebasedNodes.toArray(new String[0])); + } catch (final Exception e) { + throw new PipeException( + String.format( + "Failed to rewrite tree-model database from %s to %s for device %s.", + sourceTreeModelDatabaseName, targetTreeModelDatabaseName, devicePath), + e); + } + } + + private static boolean startsWith(final String[] nodes, final String[] prefixNodes) { + if (nodes.length < prefixNodes.length) { + return false; + } + for (int i = 0; i < prefixNodes.length; ++i) { + if (!Objects.equals(nodes[i], prefixNodes[i])) { + return false; + } + } + return true; + } + private static void throwWriteBackExceptionIfNecessary( final TSStatus status, final String exceptionMessage) { if (status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode()) { From 8feb79835ab45f0bd5491884347356fba2d17bbc Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 30 Jun 2026 09:33:21 +0800 Subject: [PATCH 2/6] Validate write-back sink target database --- .../manual/IoTDBPipeWriteBackSinkIT.java | 10 +- .../protocol/writeback/WriteBackSink.java | 142 ++++++++++++++---- .../iotdb/db/pipe/sink/PipeSinkTest.java | 109 ++++++++++++++ 3 files changed, 226 insertions(+), 35 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeWriteBackSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeWriteBackSinkIT.java index 83ca3cda73dda..11edc8cf6becc 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeWriteBackSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeWriteBackSinkIT.java @@ -49,8 +49,8 @@ public void testWriteBackSinkWithTargetDatabaseForTreeModel() throws Exception { Arrays.asList( "create database root.source", "create timeseries root.source.d1.s1 with datatype=INT32,encoding=PLAIN", - "create database root.target", - "create timeseries root.target.d1.s1 with datatype=INT32,encoding=PLAIN"), + "create database root.target.db", + "create timeseries root.target.db.d1.s1 with datatype=INT32,encoding=PLAIN"), null); try (final SyncConfigNodeIServiceClient client = @@ -65,7 +65,7 @@ public void testWriteBackSinkWithTargetDatabaseForTreeModel() throws Exception { sourceAttributes.put("user", "root"); sinkAttributes.put("sink", "write-back-sink"); - sinkAttributes.put("sink.database", "target"); + sinkAttributes.put("sink.database", "root.target.db"); sinkAttributes.put("user", "root"); final TSStatus status = @@ -89,8 +89,8 @@ public void testWriteBackSinkWithTargetDatabaseForTreeModel() throws Exception { TestUtils.assertDataEventuallyOnEnv( senderEnv, - "select * from root.target.**", - "Time,root.target.d1.s1,", + "select * from root.target.db.**", + "Time,root.target.db.d1.s1,", Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1,", "2,2,")))); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java index 5a19d79510bf9..bdc0ba409b918 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException; @@ -60,6 +61,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; +import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaFormatUtils; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.annotation.TableModel; @@ -135,6 +137,7 @@ public class WriteBackSink implements PipeConnector { private UserEntity userEntity; private String targetTableModelDatabaseName; + private String invalidTargetTableModelDatabaseName; private String targetTreeModelDatabaseName; private static final SqlParser RELATIONAL_SQL_PARSER = new SqlParser(); @@ -162,20 +165,52 @@ public void validate(final PipeParameterValidator validator) throws Exception { } private static void validateTargetDatabase(final String targetDatabase) { + if (PathUtils.isTableModelDatabase(targetDatabase)) { + validateTableModelDatabaseName(targetDatabase); + validateAndNormalizeTreeModelDatabaseName(PathUtils.qualifyDatabaseName(targetDatabase)); + return; + } + + validateAndNormalizeTreeModelDatabaseName(targetDatabase); + } + + private static void validateTableModelDatabaseName(final String databaseName) { try { - TableConfigTaskVisitor.validateDatabaseName(PathUtils.unQualifyDatabaseName(targetDatabase)); + TableConfigTaskVisitor.validateDatabaseName(databaseName); } catch (final Exception e) { throw new PipeException( String.format( - "The target database %s is invalid. It should be a table-model database name or " - + "the corresponding one-level tree-model database path like root.%s. It should " - + "not contain '%s', should match the pattern %s, and the length should not " - + "exceed %d", - targetDatabase, - PathUtils.unQualifyDatabaseName(targetDatabase), - PATH_SEPARATOR, - IoTDBConfig.DATABASE_PATTERN, - MAX_DATABASE_NAME_LENGTH), + "The table-model database %s is invalid. It should not contain '%s', should match " + + "the pattern %s, and the length should not exceed %d", + databaseName, PATH_SEPARATOR, IoTDBConfig.DATABASE_PATTERN, MAX_DATABASE_NAME_LENGTH), + e); + } + } + + private static String validateAndNormalizeTreeModelDatabaseName(final String databaseName) { + try { + final PartialPath databasePath = new PartialPath(databaseName); + final String[] nodes = databasePath.getNodes(); + if (nodes.length <= 1 || !IoTDBConstant.PATH_ROOT.equals(nodes[0])) { + throw new IllegalPathException( + databaseName, "the database name in tree model must start with 'root.'."); + } + + final String normalizedDatabaseName = databasePath.getFullPath(); + MetaFormatUtils.checkDatabase(normalizedDatabaseName); + + if (normalizedDatabaseName.length() > MAX_DATABASE_NAME_LENGTH) { + throw new IllegalPathException( + normalizedDatabaseName, + "the length of database name shall not exceed " + MAX_DATABASE_NAME_LENGTH); + } + return normalizedDatabaseName; + } catch (final Exception e) { + throw new PipeException( + String.format( + "The tree-model database %s is invalid. It should be a legal tree-model database " + + "path, should match the pattern %s, and the length should not exceed %d", + databaseName, IoTDBConfig.DATABASE_PATTERN, MAX_DATABASE_NAME_LENGTH), e); } } @@ -241,9 +276,7 @@ public void customize( final String targetDatabase = parameters.getStringByKeys(CONNECTOR_IOTDB_DATABASE_KEY, SINK_IOTDB_DATABASE_KEY); if (Objects.nonNull(targetDatabase)) { - targetTableModelDatabaseName = - PathUtils.unQualifyDatabaseName(targetDatabase).toLowerCase(Locale.ENGLISH); - targetTreeModelDatabaseName = PathUtils.qualifyDatabaseName(targetTableModelDatabaseName); + customizeTargetDatabase(targetDatabase); } if (SESSION_MANAGER.getCurrSession() == null) { @@ -269,6 +302,30 @@ public void customize( } } + private void customizeTargetDatabase(final String targetDatabase) { + targetTableModelDatabaseName = null; + invalidTargetTableModelDatabaseName = null; + targetTreeModelDatabaseName = null; + + if (PathUtils.isTableModelDatabase(targetDatabase)) { + targetTableModelDatabaseName = targetDatabase.toLowerCase(Locale.ENGLISH); + targetTreeModelDatabaseName = + validateAndNormalizeTreeModelDatabaseName( + PathUtils.qualifyDatabaseName(targetTableModelDatabaseName)); + return; + } + + targetTreeModelDatabaseName = validateAndNormalizeTreeModelDatabaseName(targetDatabase); + final String tableModelDatabaseName = + PathUtils.unQualifyDatabaseName(targetTreeModelDatabaseName).toLowerCase(Locale.ENGLISH); + try { + TableConfigTaskVisitor.validateDatabaseName(tableModelDatabaseName); + targetTableModelDatabaseName = tableModelDatabaseName; + } catch (final Exception e) { + invalidTargetTableModelDatabaseName = tableModelDatabaseName; + } + } + @Override public void handshake() throws Exception { // Do nothing @@ -322,7 +379,8 @@ private void doTransfer( pipeInsertNodeTabletInsertionEvent.isTableModelEvent() ? getTargetTableModelDatabaseNameOrDefault( pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()) - : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName(); + : getTargetTreeModelDatabaseNameOrDefault( + pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName()); final InsertBaseStatement insertBaseStatement; insertBaseStatement = @@ -371,7 +429,8 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion pipeRawTabletInsertionEvent.isTableModelEvent() ? getTargetTableModelDatabaseNameOrDefault( pipeRawTabletInsertionEvent.getTableModelDatabaseName()) - : pipeRawTabletInsertionEvent.getTreeModelDatabaseName(); + : getTargetTreeModelDatabaseNameOrDefault( + pipeRawTabletInsertionEvent.getTreeModelDatabaseName()); final InsertTabletStatement insertTabletStatement = PipeTransferTabletRawReqV2.toTPipeTransferRawReq( @@ -426,19 +485,24 @@ private void doTransferWrapper(final PipeStatementInsertionEvent pipeStatementIn private void doTransfer(final PipeStatementInsertionEvent pipeStatementInsertionEvent) throws PipeException { - final TSStatus status = - pipeStatementInsertionEvent.isTableModelEvent() - ? executeStatementForTableModel( - rewriteTableModelDatabaseNameIfNecessary( - pipeStatementInsertionEvent.getStatement()), - getTargetTableModelDatabaseNameOrDefault( - pipeStatementInsertionEvent.getTableModelDatabaseName()), - pipeStatementInsertionEvent.getUserName()) - : executeStatementForTreeModel( - rewriteTreeModelDatabaseNameIfNecessary( - pipeStatementInsertionEvent.getStatement(), - pipeStatementInsertionEvent.getTreeModelDatabaseName()), - pipeStatementInsertionEvent.getUserName()); + final TSStatus status; + if (pipeStatementInsertionEvent.isTableModelEvent()) { + final String dataBaseName = + getTargetTableModelDatabaseNameOrDefault( + pipeStatementInsertionEvent.getTableModelDatabaseName()); + status = + executeStatementForTableModel( + rewriteTableModelDatabaseNameIfNecessary(pipeStatementInsertionEvent.getStatement()), + dataBaseName, + pipeStatementInsertionEvent.getUserName()); + } else { + status = + executeStatementForTreeModel( + rewriteTreeModelDatabaseNameIfNecessary( + pipeStatementInsertionEvent.getStatement(), + pipeStatementInsertionEvent.getTreeModelDatabaseName()), + pipeStatementInsertionEvent.getUserName()); + } if (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() @@ -453,11 +517,28 @@ private void doTransfer(final PipeStatementInsertionEvent pipeStatementInsertion } private String getTargetTableModelDatabaseNameOrDefault(final String databaseName) { + if (Objects.nonNull(invalidTargetTableModelDatabaseName)) { + throw new PipeException( + String.format( + "The target tree-model database %s cannot be used for table-model events because " + + "the corresponding table-model database %s is invalid.", + targetTreeModelDatabaseName, invalidTargetTableModelDatabaseName)); + } + + validateTableModelDatabaseName(databaseName); return Objects.nonNull(targetTableModelDatabaseName) ? targetTableModelDatabaseName : databaseName; } + private String getTargetTreeModelDatabaseNameOrDefault(final String databaseName) { + final String sourceTreeModelDatabaseName = + validateAndNormalizeTreeModelDatabaseName(databaseName); + return Objects.nonNull(targetTreeModelDatabaseName) + ? targetTreeModelDatabaseName + : sourceTreeModelDatabaseName; + } + private Statement rewriteTableModelDatabaseNameIfNecessary(final Statement statement) { if (Objects.isNull(targetTableModelDatabaseName) || !(statement instanceof InsertBaseStatement)) { @@ -498,12 +579,13 @@ private Statement rewriteTreeModelDatabaseNameIfNecessary( private InsertBaseStatement rewriteTreeModelDatabaseNameIfNecessary( final InsertBaseStatement statement, final String sourceTreeModelDatabaseName) { + final String normalizedSourceTreeModelDatabaseName = + validateAndNormalizeTreeModelDatabaseName(sourceTreeModelDatabaseName); if (Objects.isNull(targetTreeModelDatabaseName)) { return statement; } - rewriteTreeModelDatabaseName( - statement, PathUtils.qualifyDatabaseName(sourceTreeModelDatabaseName)); + rewriteTreeModelDatabaseName(statement, normalizedSourceTreeModelDatabaseName); return statement; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index cf311639ee9a2..b74f956181367 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketConnectorServer; import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketSink; +import org.apache.iotdb.db.pipe.sink.protocol.writeback.WriteBackSink; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -45,6 +46,7 @@ import java.security.SecureRandom; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -279,6 +281,113 @@ public void testOpcUaSink() { } } + @Test + public void testWriteBackSinkTargetDatabaseValidation() throws Exception { + assertWriteBackSinkTargetDatabaseValid("target"); + assertWriteBackSinkTargetDatabaseValid("root.target"); + assertWriteBackSinkTargetDatabaseValid("root.target.db"); + + Assert.assertThrows(PipeException.class, () -> assertWriteBackSinkTargetDatabaseValid("a.b")); + Assert.assertThrows( + PipeException.class, () -> assertWriteBackSinkTargetDatabaseValid("root.a+b")); + } + + @Test + public void testWriteBackSinkRejectsInvalidTableModelDatabaseFromEvent() { + try (final WriteBackSink sink = new WriteBackSink()) { + final PipeRawTabletInsertionEvent event = createTableModelRawTabletInsertionEvent("root.a.b"); + Assert.assertThrows(PipeException.class, () -> sink.transfer(event)); + } catch (final Exception e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testWriteBackSinkRejectsInvalidTableModelDatabaseFromEventWithTargetDatabase() + throws Exception { + final PipeParameters parameters = + new PipeParameters(Collections.singletonMap("sink.database", "target")); + + try (final WriteBackSink sink = new WriteBackSink()) { + sink.validate(new PipeParameterValidator(parameters)); + sink.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + final PipeRawTabletInsertionEvent event = createTableModelRawTabletInsertionEvent("root.a.b"); + Assert.assertThrows(PipeException.class, () -> sink.transfer(event)); + } + } + + @Test + public void testWriteBackSinkRejectsInvalidTreeModelDatabaseFromEventWithTargetDatabase() + throws Exception { + final PipeParameters parameters = + new PipeParameters(Collections.singletonMap("sink.database", "root.target")); + + try (final WriteBackSink sink = new WriteBackSink()) { + sink.validate(new PipeParameterValidator(parameters)); + sink.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + final PipeRawTabletInsertionEvent event = createTreeModelRawTabletInsertionEvent("root.a+b"); + Assert.assertThrows(PipeException.class, () -> sink.transfer(event)); + } + } + + @Test + public void testWriteBackSinkRejectsInvalidTableModelDatabaseFromTreeTarget() throws Exception { + final PipeParameters parameters = + new PipeParameters( + new HashMap() { + { + put("sink.database", "root.target.db"); + } + }); + + try (final WriteBackSink sink = new WriteBackSink()) { + sink.validate(new PipeParameterValidator(parameters)); + sink.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + final PipeRawTabletInsertionEvent event = createTableModelRawTabletInsertionEvent("valid_db"); + Assert.assertThrows(PipeException.class, () -> sink.transfer(event)); + } + } + + private void assertWriteBackSinkTargetDatabaseValid(final String targetDatabase) + throws Exception { + try (final WriteBackSink sink = new WriteBackSink()) { + sink.validate( + new PipeParameterValidator( + new PipeParameters(Collections.singletonMap("sink.database", targetDatabase)))); + } + } + + private PipeRawTabletInsertionEvent createTableModelRawTabletInsertionEvent( + final String databaseName) { + final List schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet("table", schemaList, 1); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1L); + return new PipeRawTabletInsertionEvent( + true, databaseName, null, null, tablet, false, "pipe", 0L, null, null, false); + } + + private PipeRawTabletInsertionEvent createTreeModelRawTabletInsertionEvent( + final String databaseName) { + final List schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet(databaseName + ".d1", schemaList, 1); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1L); + return new PipeRawTabletInsertionEvent( + false, databaseName, null, databaseName, tablet, false, "pipe", 0L, null, null, false); + } + private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( final String pipeName, final long creationTime, final int regionId) { final List schemaList = From 129d06a4b7418285d38f3eaf3b3fe147c13b8d71 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 30 Jun 2026 10:06:31 +0800 Subject: [PATCH 3/6] Cover write-back sink database validation semantics --- .../iotdb/db/pipe/sink/PipeSinkTest.java | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index b74f956181367..fa18695ef1f1b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment; +import org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink; import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; @@ -32,6 +33,7 @@ import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketConnectorServer; import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketSink; import org.apache.iotdb.db.pipe.sink.protocol.writeback.WriteBackSink; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -44,6 +46,7 @@ import org.junit.Test; import org.mockito.Mockito; +import java.lang.reflect.Field; import java.security.SecureRandom; import java.util.Arrays; import java.util.Collections; @@ -288,8 +291,40 @@ public void testWriteBackSinkTargetDatabaseValidation() throws Exception { assertWriteBackSinkTargetDatabaseValid("root.target.db"); Assert.assertThrows(PipeException.class, () -> assertWriteBackSinkTargetDatabaseValid("a.b")); + Assert.assertThrows( + PipeException.class, () -> assertWriteBackSinkTargetDatabaseValid("a".repeat(65))); Assert.assertThrows( PipeException.class, () -> assertWriteBackSinkTargetDatabaseValid("root.a+b")); + Assert.assertThrows( + PipeException.class, + () -> assertWriteBackSinkTargetDatabaseValid("root." + "a".repeat(60))); + } + + @Test + public void testWriteBackSinkTargetDatabaseCustomization() throws Exception { + try (final WriteBackSink sink = createCustomizedWriteBackSink("TestTarget")) { + Assert.assertEquals( + "testtarget", getWriteBackSinkDatabaseName(sink, "targetTableModelDatabaseName")); + Assert.assertNull(getWriteBackSinkDatabaseName(sink, "invalidTargetTableModelDatabaseName")); + Assert.assertEquals( + "root.testtarget", getWriteBackSinkDatabaseName(sink, "targetTreeModelDatabaseName")); + } + + try (final WriteBackSink sink = createCustomizedWriteBackSink("root.target")) { + Assert.assertEquals( + "target", getWriteBackSinkDatabaseName(sink, "targetTableModelDatabaseName")); + Assert.assertNull(getWriteBackSinkDatabaseName(sink, "invalidTargetTableModelDatabaseName")); + Assert.assertEquals( + "root.target", getWriteBackSinkDatabaseName(sink, "targetTreeModelDatabaseName")); + } + + try (final WriteBackSink sink = createCustomizedWriteBackSink("root.target.db")) { + Assert.assertNull(getWriteBackSinkDatabaseName(sink, "targetTableModelDatabaseName")); + Assert.assertEquals( + "target.db", getWriteBackSinkDatabaseName(sink, "invalidTargetTableModelDatabaseName")); + Assert.assertEquals( + "root.target.db", getWriteBackSinkDatabaseName(sink, "targetTreeModelDatabaseName")); + } } @Test @@ -336,6 +371,27 @@ public void testWriteBackSinkRejectsInvalidTreeModelDatabaseFromEventWithTargetD } } + @Test + public void testWriteBackSinkRejectsInvalidStatementEventDatabases() throws Exception { + try (final WriteBackSink sink = createCustomizedWriteBackSink("target")) { + Assert.assertThrows( + PipeException.class, + () -> sink.transfer(createTableModelStatementInsertionEvent("root.a.b"))); + } + + try (final WriteBackSink sink = createCustomizedWriteBackSink("root.target")) { + Assert.assertThrows( + PipeException.class, + () -> sink.transfer(createTreeModelStatementInsertionEvent("root.a+b"))); + } + + try (final WriteBackSink sink = createCustomizedWriteBackSink("root.target.db")) { + Assert.assertThrows( + PipeException.class, + () -> sink.transfer(createTableModelStatementInsertionEvent("valid_db"))); + } + } + @Test public void testWriteBackSinkRejectsInvalidTableModelDatabaseFromTreeTarget() throws Exception { final PipeParameters parameters = @@ -366,6 +422,25 @@ private void assertWriteBackSinkTargetDatabaseValid(final String targetDatabase) } } + private WriteBackSink createCustomizedWriteBackSink(final String targetDatabase) + throws Exception { + final PipeParameters parameters = + new PipeParameters(Collections.singletonMap("sink.database", targetDatabase)); + final WriteBackSink sink = new WriteBackSink(); + sink.validate(new PipeParameterValidator(parameters)); + sink.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + return sink; + } + + private String getWriteBackSinkDatabaseName(final WriteBackSink sink, final String fieldName) + throws Exception { + final Field field = WriteBackSink.class.getDeclaredField(fieldName); + field.setAccessible(true); + return (String) field.get(sink); + } + private PipeRawTabletInsertionEvent createTableModelRawTabletInsertionEvent( final String databaseName) { final List schemaList = @@ -388,6 +463,35 @@ private PipeRawTabletInsertionEvent createTreeModelRawTabletInsertionEvent( false, databaseName, null, databaseName, tablet, false, "pipe", 0L, null, null, false); } + private PipeStatementInsertionEvent createTableModelStatementInsertionEvent( + final String databaseName) { + return createStatementInsertionEvent(true, databaseName); + } + + private PipeStatementInsertionEvent createTreeModelStatementInsertionEvent( + final String databaseName) { + return createStatementInsertionEvent(false, databaseName); + } + + private PipeStatementInsertionEvent createStatementInsertionEvent( + final boolean isTableModelEvent, final String databaseName) { + final InsertTabletStatement statement = new InsertTabletStatement(); + statement.setRamBytesUsed(1L); + return new PipeStatementInsertionEvent( + "pipe", + 0L, + null, + null, + null, + null, + null, + null, + true, + isTableModelEvent, + databaseName, + statement); + } + private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( final String pipeName, final long creationTime, final int regionId) { final List schemaList = From ba02f8fd65ee0ecd6c952d1285a75ebd8a00e9cd Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 30 Jun 2026 10:46:12 +0800 Subject: [PATCH 4/6] Update WriteBackSink.java --- .../protocol/writeback/WriteBackSink.java | 57 +++++++++++++------ 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java index bdc0ba409b918..2009492f9ac3b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java @@ -127,8 +127,6 @@ public class WriteBackSink implements PipeConnector { // Simulate the behavior of the client-to-server communication // for correctly handling data insertion in IoTDBReceiverAgent#receive method - private static final Coordinator COORDINATOR = Coordinator.getInstance(); - private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); public static final AtomicLong id = new AtomicLong(); private InternalClientSession session; @@ -140,10 +138,34 @@ public class WriteBackSink implements PipeConnector { private String invalidTargetTableModelDatabaseName; private String targetTreeModelDatabaseName; - private static final SqlParser RELATIONAL_SQL_PARSER = new SqlParser(); - private static final Set ALREADY_CREATED_DATABASES = ConcurrentHashMap.newKeySet(); + private static SessionManager getSessionManager() { + return SessionManagerHolder.INSTANCE; + } + + private static SqlParser getRelationalSqlParser() { + return SqlParserHolder.INSTANCE; + } + + private static class SessionManagerHolder { + + private static final SessionManager INSTANCE = SessionManager.getInstance(); + + private SessionManagerHolder() { + // empty constructor + } + } + + private static class SqlParserHolder { + + private static final SqlParser INSTANCE = new SqlParser(); + + private SqlParserHolder() { + // empty constructor + } + } + @Override public void validate(final PipeParameterValidator validator) throws Exception { validator.validateSynonymAttributes( @@ -279,13 +301,14 @@ public void customize( customizeTargetDatabase(targetDatabase); } - if (SESSION_MANAGER.getCurrSession() == null) { - SESSION_MANAGER.registerSession(session); + final SessionManager sessionManager = getSessionManager(); + if (sessionManager.getCurrSession() == null) { + sessionManager.registerSession(session); } // Check the password and its expiration if (Objects.nonNull(passwordString) - && SESSION_MANAGER + && sessionManager .login( session, usernameString, @@ -683,7 +706,9 @@ private static void throwWriteBackExceptionIfNecessary( @Override public void close() throws Exception { if (session != null) { - SESSION_MANAGER.closeSession(session, COORDINATOR::cleanupQueryExecution, false); + getSessionManager() + .closeSession( + session, queryId -> Coordinator.getInstance().cleanupQueryExecution(queryId), false); } } @@ -700,10 +725,10 @@ private TSStatus executeStatementForTableModel( return Coordinator.getInstance() .executeForTableModel( new PipeEnrichedStatement(statement), - RELATIONAL_SQL_PARSER, + getRelationalSqlParser(), session, - SESSION_MANAGER.requestQueryId(), - SESSION_MANAGER.getSessionInfoOfPipeReceiver(session, dataBaseName), + getSessionManager().requestQueryId(), + getSessionManager().getSessionInfoOfPipeReceiver(session, dataBaseName), "", LocalExecutionPlanner.getInstance().metadata, IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()) @@ -733,10 +758,10 @@ private TSStatus executeStatementForTableModel( return Coordinator.getInstance() .executeForTableModel( new PipeEnrichedStatement(statement), - RELATIONAL_SQL_PARSER, + getRelationalSqlParser(), session, - SESSION_MANAGER.requestQueryId(), - SESSION_MANAGER.getSessionInfo(session), + getSessionManager().requestQueryId(), + getSessionManager().getSessionInfo(session), "", LocalExecutionPlanner.getInstance().metadata, IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()) @@ -821,8 +846,8 @@ private TSStatus executeStatementForTreeModel(final Statement statement, final S return Coordinator.getInstance() .executeForTreeModel( new PipeEnrichedStatement(statement), - SESSION_MANAGER.requestQueryId(), - SESSION_MANAGER.getSessionInfo(session), + getSessionManager().requestQueryId(), + getSessionManager().getSessionInfo(session), "", ClusterPartitionFetcher.getInstance(), ClusterSchemaFetcher.getInstance(), From 798326326c842ec28105c026f2b9acfbe16897fa Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 1 Jul 2026 11:24:26 +0800 Subject: [PATCH 5/6] Address write-back sink review comments --- .../iotdb/db/i18n/DataNodePipeMessages.java | 15 ++++++++ .../iotdb/db/i18n/DataNodePipeMessages.java | 12 +++++++ .../protocol/writeback/WriteBackSink.java | 34 +++++++++++++------ 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 050de467f2687..bce3cf922abf5 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -1446,6 +1446,21 @@ public final class DataNodePipeMessages { // --------------------------------------------------------------------------- public static final String PLUGIN_NOT_REGISTERED_FMT = "plugin %s is not registered."; + // --------------------------------------------------------------------------- + // pipe - WriteBackSink + // --------------------------------------------------------------------------- + public static final String TABLE_MODEL_DATABASE_INVALID_FMT = + "The table-model database %s is invalid. It should not contain '%s', should match the " + + "pattern %s, and the length should not exceed %d"; + public static final String TREE_MODEL_DATABASE_INVALID_FMT = + "The tree-model database %s is invalid. It should be a legal tree-model database path, " + + "should match the pattern %s, and the length should not exceed %d"; + public static final String TARGET_TREE_MODEL_DATABASE_CANNOT_BE_USED_FOR_TABLE_MODEL_EVENTS_FMT = + "The target tree-model database %s cannot be used for table-model events because the " + + "corresponding table-model database %s is invalid."; + public static final String FAILED_TO_REWRITE_TREE_MODEL_DATABASE_FMT = + "Failed to rewrite tree-model database from %s to %s for device %s."; + // --------------------------------------------------------------------------- // pipe – PipeTransferTrackableHandler // --------------------------------------------------------------------------- diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 147d977d57269..c6c7ead998a89 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -1394,6 +1394,18 @@ public final class DataNodePipeMessages { // --------------------------------------------------------------------------- public static final String PLUGIN_NOT_REGISTERED_FMT = "插件 %s 未注册。"; + // --------------------------------------------------------------------------- + // pipe - WriteBackSink + // --------------------------------------------------------------------------- + public static final String TABLE_MODEL_DATABASE_INVALID_FMT = + "表模型数据库 %s 非法:不应包含 '%s',应匹配 %s,且长度不应超过 %d"; + public static final String TREE_MODEL_DATABASE_INVALID_FMT = + "树模型数据库 %s 非法:应为合法的树模型数据库路径,应匹配 %s,且长度不应超过 %d"; + public static final String TARGET_TREE_MODEL_DATABASE_CANNOT_BE_USED_FOR_TABLE_MODEL_EVENTS_FMT = + "目标树模型数据库 %s 不能用于表模型事件,因为对应的表模型数据库 %s 非法。"; + public static final String FAILED_TO_REWRITE_TREE_MODEL_DATABASE_FMT = + "将树模型数据库从 %s 重写为 %s 失败,设备为 %s。"; + // --------------------------------------------------------------------------- // pipe – PipeTransferTrackableHandler // --------------------------------------------------------------------------- diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java index 2009492f9ac3b..f957a8f6dbcb6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java @@ -202,9 +202,11 @@ private static void validateTableModelDatabaseName(final String databaseName) { } catch (final Exception e) { throw new PipeException( String.format( - "The table-model database %s is invalid. It should not contain '%s', should match " - + "the pattern %s, and the length should not exceed %d", - databaseName, PATH_SEPARATOR, IoTDBConfig.DATABASE_PATTERN, MAX_DATABASE_NAME_LENGTH), + DataNodePipeMessages.TABLE_MODEL_DATABASE_INVALID_FMT, + databaseName, + PATH_SEPARATOR, + IoTDBConfig.DATABASE_PATTERN, + MAX_DATABASE_NAME_LENGTH), e); } } @@ -230,9 +232,10 @@ private static String validateAndNormalizeTreeModelDatabaseName(final String dat } catch (final Exception e) { throw new PipeException( String.format( - "The tree-model database %s is invalid. It should be a legal tree-model database " - + "path, should match the pattern %s, and the length should not exceed %d", - databaseName, IoTDBConfig.DATABASE_PATTERN, MAX_DATABASE_NAME_LENGTH), + DataNodePipeMessages.TREE_MODEL_DATABASE_INVALID_FMT, + databaseName, + IoTDBConfig.DATABASE_PATTERN, + MAX_DATABASE_NAME_LENGTH), e); } } @@ -330,6 +333,8 @@ private void customizeTargetDatabase(final String targetDatabase) { invalidTargetTableModelDatabaseName = null; targetTreeModelDatabaseName = null; + // Write-back sink may receive both table-model and tree-model events from one pipe. + // Normalize one configured target database to both model names so each event can be rewritten. if (PathUtils.isTableModelDatabase(targetDatabase)) { targetTableModelDatabaseName = targetDatabase.toLowerCase(Locale.ENGLISH); targetTreeModelDatabaseName = @@ -345,6 +350,8 @@ private void customizeTargetDatabase(final String targetDatabase) { TableConfigTaskVisitor.validateDatabaseName(tableModelDatabaseName); targetTableModelDatabaseName = tableModelDatabaseName; } catch (final Exception e) { + // A valid multi-level tree database like root.target.db cannot be converted to a valid + // table-model database name, but tree-model events can still use the original target. invalidTargetTableModelDatabaseName = tableModelDatabaseName; } } @@ -543,9 +550,10 @@ private String getTargetTableModelDatabaseNameOrDefault(final String databaseNam if (Objects.nonNull(invalidTargetTableModelDatabaseName)) { throw new PipeException( String.format( - "The target tree-model database %s cannot be used for table-model events because " - + "the corresponding table-model database %s is invalid.", - targetTreeModelDatabaseName, invalidTargetTableModelDatabaseName)); + DataNodePipeMessages + .TARGET_TREE_MODEL_DATABASE_CANNOT_BE_USED_FOR_TABLE_MODEL_EVENTS_FMT, + targetTreeModelDatabaseName, + invalidTargetTableModelDatabaseName)); } validateTableModelDatabaseName(databaseName); @@ -662,6 +670,8 @@ private PartialPath rewriteTreeModelDevicePath( final String[] sourceDatabaseNodes = new PartialPath(sourceTreeModelDatabaseName).getNodes(); final String[] targetDatabaseNodes = new PartialPath(targetTreeModelDatabaseName).getNodes(); final String[] deviceNodes = devicePath.getNodes(); + // A processor may rewrite the device path before write-back sink receives the event. + // If it no longer belongs to the source database, keep it untouched to avoid corruption. if (!startsWith(deviceNodes, sourceDatabaseNodes)) { return devicePath; } @@ -676,8 +686,10 @@ private PartialPath rewriteTreeModelDevicePath( } catch (final Exception e) { throw new PipeException( String.format( - "Failed to rewrite tree-model database from %s to %s for device %s.", - sourceTreeModelDatabaseName, targetTreeModelDatabaseName, devicePath), + DataNodePipeMessages.FAILED_TO_REWRITE_TREE_MODEL_DATABASE_FMT, + sourceTreeModelDatabaseName, + targetTreeModelDatabaseName, + devicePath), e); } } From 9558aaec7e085eddcf5c433a9da4761a2adc256f Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 1 Jul 2026 16:45:36 +0800 Subject: [PATCH 6/6] Clarify write-back sink target database normalization --- .../iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java index f957a8f6dbcb6..fe46e47234d4a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java @@ -333,8 +333,9 @@ private void customizeTargetDatabase(final String targetDatabase) { invalidTargetTableModelDatabaseName = null; targetTreeModelDatabaseName = null; - // Write-back sink may receive both table-model and tree-model events from one pipe. - // Normalize one configured target database to both model names so each event can be rewritten. + // The sink only sees its own parameters during customization, without the pipe's isolated + // runtime model. Normalize one configured target database to both model names, and later use + // the one matching the incoming event model. if (PathUtils.isTableModelDatabase(targetDatabase)) { targetTableModelDatabaseName = targetDatabase.toLowerCase(Locale.ENGLISH); targetTreeModelDatabaseName =