diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java index 2092bb9239e61..2a5c9352e2542 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java @@ -277,10 +277,8 @@ public void testWriteBackSink() throws Exception { sourceAttributes.put("source.table-name", "test.*"); sourceAttributes.put("user", "root"); - processorAttributes.put("processor", "rename-database-processor"); - processorAttributes.put("processor.new-db-name", "Test1"); - sinkAttributes.put("sink", "write-back-sink"); + sinkAttributes.put("sink.database", "Test1"); sinkAttributes.put("user", "root"); final TSStatus status = diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeWriteBackSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeWriteBackSinkIT.java new file mode 100644 index 0000000000000..11edc8cf6becc --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeWriteBackSinkIT.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.treemodel.manual; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeManual; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTreeManual.class}) +public class IoTDBPipeWriteBackSinkIT extends AbstractPipeDualTreeModelManualIT { + + @Test + public void testWriteBackSinkWithTargetDatabaseForTreeModel() throws Exception { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create database root.source", + "create timeseries root.source.d1.s1 with datatype=INT32,encoding=PLAIN", + "create database root.target.db", + "create timeseries root.target.db.d1.s1 with datatype=INT32,encoding=PLAIN"), + null); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map sourceAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map sinkAttributes = new HashMap<>(); + + sourceAttributes.put("source.inclusion", "data.insert"); + sourceAttributes.put("source.forwarding-pipe-requests", "false"); + sourceAttributes.put("source.path", "root.source.**"); + sourceAttributes.put("user", "root"); + + sinkAttributes.put("sink", "write-back-sink"); + sinkAttributes.put("sink.database", "root.target.db"); + sinkAttributes.put("user", "root"); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", sinkAttributes) + .setExtractorAttributes(sourceAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + } + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.source.d1(time, s1) values (1, 1)", + "insert into root.source.d1(time, s1) values (2, 2)", + "flush"), + null); + + TestUtils.assertDataEventuallyOnEnv( + senderEnv, + "select * from root.target.db.**", + "Time,root.target.db.d1.s1,", + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1,", "2,2,")))); + } +} diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 13369fc5ee3f3..bfac58fbab178 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -1449,6 +1449,21 @@ public final class DataNodePipeMessages { // --------------------------------------------------------------------------- public static final String PLUGIN_NOT_REGISTERED_FMT = "plugin %s is not registered."; + // --------------------------------------------------------------------------- + // pipe - WriteBackSink + // --------------------------------------------------------------------------- + public static final String TABLE_MODEL_DATABASE_INVALID_FMT = + "The table-model database %s is invalid. It should not contain '%s', should match the " + + "pattern %s, and the length should not exceed %d"; + public static final String TREE_MODEL_DATABASE_INVALID_FMT = + "The tree-model database %s is invalid. It should be a legal tree-model database path, " + + "should match the pattern %s, and the length should not exceed %d"; + public static final String TARGET_TREE_MODEL_DATABASE_CANNOT_BE_USED_FOR_TABLE_MODEL_EVENTS_FMT = + "The target tree-model database %s cannot be used for table-model events because the " + + "corresponding table-model database %s is invalid."; + public static final String FAILED_TO_REWRITE_TREE_MODEL_DATABASE_FMT = + "Failed to rewrite tree-model database from %s to %s for device %s."; + // --------------------------------------------------------------------------- // pipe – PipeTransferTrackableHandler // --------------------------------------------------------------------------- diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 49ee222c3e9fa..6fbc50ac13b5e 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -1396,6 +1396,18 @@ public final class DataNodePipeMessages { // --------------------------------------------------------------------------- public static final String PLUGIN_NOT_REGISTERED_FMT = "插件 %s 未注册。"; + // --------------------------------------------------------------------------- + // pipe - WriteBackSink + // --------------------------------------------------------------------------- + public static final String TABLE_MODEL_DATABASE_INVALID_FMT = + "表模型数据库 %s 非法:不应包含 '%s',应匹配 %s,且长度不应超过 %d"; + public static final String TREE_MODEL_DATABASE_INVALID_FMT = + "树模型数据库 %s 非法:应为合法的树模型数据库路径,应匹配 %s,且长度不应超过 %d"; + public static final String TARGET_TREE_MODEL_DATABASE_CANNOT_BE_USED_FOR_TABLE_MODEL_EVENTS_FMT = + "目标树模型数据库 %s 不能用于表模型事件,因为对应的表模型数据库 %s 非法。"; + public static final String FAILED_TO_REWRITE_TREE_MODEL_DATABASE_FMT = + "将树模型数据库从 %s 重写为 %s 失败,设备为 %s。"; + // --------------------------------------------------------------------------- // pipe – PipeTransferTrackableHandler // --------------------------------------------------------------------------- diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java index 9bec114ede1c0..fe46e47234d4a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java @@ -22,14 +22,18 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.queryengine.common.SqlDialect; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent; @@ -43,6 +47,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; @@ -51,8 +56,12 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; +import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaFormatUtils; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.annotation.TableModel; @@ -75,7 +84,9 @@ import java.io.IOException; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Locale; import java.util.Objects; import java.util.Set; @@ -84,6 +95,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.apache.iotdb.commons.conf.IoTDBConstant.MAX_DATABASE_NAME_LENGTH; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_CLI_HOSTNAME; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES; @@ -103,17 +115,18 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.WRITE_BACK_CONNECTOR_SKIP_IF_DEFAULT_VALUE; import static org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause; import static org.apache.iotdb.db.exception.metadata.DatabaseNotSetException.DATABASE_NOT_SET; +import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; @TreeModel @TableModel public class WriteBackSink implements PipeConnector { private static final Logger LOGGER = LoggerFactory.getLogger(WriteBackSink.class); + private static final String CONNECTOR_IOTDB_DATABASE_KEY = "connector.database"; + private static final String SINK_IOTDB_DATABASE_KEY = "sink.database"; // Simulate the behavior of the client-to-server communication // for correctly handling data insertion in IoTDBReceiverAgent#receive method - private static final Coordinator COORDINATOR = Coordinator.getInstance(); - private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); public static final AtomicLong id = new AtomicLong(); private InternalClientSession session; @@ -121,17 +134,110 @@ public class WriteBackSink implements PipeConnector { private boolean useEventUserName; private UserEntity userEntity; - - private static final SqlParser RELATIONAL_SQL_PARSER = new SqlParser(); + private String targetTableModelDatabaseName; + private String invalidTargetTableModelDatabaseName; + private String targetTreeModelDatabaseName; private static final Set ALREADY_CREATED_DATABASES = ConcurrentHashMap.newKeySet(); + private static SessionManager getSessionManager() { + return SessionManagerHolder.INSTANCE; + } + + private static SqlParser getRelationalSqlParser() { + return SqlParserHolder.INSTANCE; + } + + private static class SessionManagerHolder { + + private static final SessionManager INSTANCE = SessionManager.getInstance(); + + private SessionManagerHolder() { + // empty constructor + } + } + + private static class SqlParserHolder { + + private static final SqlParser INSTANCE = new SqlParser(); + + private SqlParserHolder() { + // empty constructor + } + } + @Override public void validate(final PipeParameterValidator validator) throws Exception { validator.validateSynonymAttributes( Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY), Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY), false); + validator.validateSynonymAttributes( + Collections.singletonList(CONNECTOR_IOTDB_DATABASE_KEY), + Collections.singletonList(SINK_IOTDB_DATABASE_KEY), + false); + + final String targetDatabase = + validator + .getParameters() + .getStringByKeys(CONNECTOR_IOTDB_DATABASE_KEY, SINK_IOTDB_DATABASE_KEY); + if (Objects.nonNull(targetDatabase)) { + validateTargetDatabase(targetDatabase); + } + } + + private static void validateTargetDatabase(final String targetDatabase) { + if (PathUtils.isTableModelDatabase(targetDatabase)) { + validateTableModelDatabaseName(targetDatabase); + validateAndNormalizeTreeModelDatabaseName(PathUtils.qualifyDatabaseName(targetDatabase)); + return; + } + + validateAndNormalizeTreeModelDatabaseName(targetDatabase); + } + + private static void validateTableModelDatabaseName(final String databaseName) { + try { + TableConfigTaskVisitor.validateDatabaseName(databaseName); + } catch (final Exception e) { + throw new PipeException( + String.format( + DataNodePipeMessages.TABLE_MODEL_DATABASE_INVALID_FMT, + databaseName, + PATH_SEPARATOR, + IoTDBConfig.DATABASE_PATTERN, + MAX_DATABASE_NAME_LENGTH), + e); + } + } + + private static String validateAndNormalizeTreeModelDatabaseName(final String databaseName) { + try { + final PartialPath databasePath = new PartialPath(databaseName); + final String[] nodes = databasePath.getNodes(); + if (nodes.length <= 1 || !IoTDBConstant.PATH_ROOT.equals(nodes[0])) { + throw new IllegalPathException( + databaseName, "the database name in tree model must start with 'root.'."); + } + + final String normalizedDatabaseName = databasePath.getFullPath(); + MetaFormatUtils.checkDatabase(normalizedDatabaseName); + + if (normalizedDatabaseName.length() > MAX_DATABASE_NAME_LENGTH) { + throw new IllegalPathException( + normalizedDatabaseName, + "the length of database name shall not exceed " + MAX_DATABASE_NAME_LENGTH); + } + return normalizedDatabaseName; + } catch (final Exception e) { + throw new PipeException( + String.format( + DataNodePipeMessages.TREE_MODEL_DATABASE_INVALID_FMT, + databaseName, + IoTDBConfig.DATABASE_PATTERN, + MAX_DATABASE_NAME_LENGTH), + e); + } } @Override @@ -192,13 +298,20 @@ public void customize( Arrays.asList(CONNECTOR_USE_EVENT_USER_NAME_KEY, SINK_USE_EVENT_USER_NAME_KEY), CONNECTOR_USE_EVENT_USER_NAME_DEFAULT_VALUE); - if (SESSION_MANAGER.getCurrSession() == null) { - SESSION_MANAGER.registerSession(session); + final String targetDatabase = + parameters.getStringByKeys(CONNECTOR_IOTDB_DATABASE_KEY, SINK_IOTDB_DATABASE_KEY); + if (Objects.nonNull(targetDatabase)) { + customizeTargetDatabase(targetDatabase); + } + + final SessionManager sessionManager = getSessionManager(); + if (sessionManager.getCurrSession() == null) { + sessionManager.registerSession(session); } // Check the password and its expiration if (Objects.nonNull(passwordString) - && SESSION_MANAGER + && sessionManager .login( session, usernameString, @@ -215,6 +328,35 @@ public void customize( } } + private void customizeTargetDatabase(final String targetDatabase) { + targetTableModelDatabaseName = null; + invalidTargetTableModelDatabaseName = null; + targetTreeModelDatabaseName = null; + + // The sink only sees its own parameters during customization, without the pipe's isolated + // runtime model. Normalize one configured target database to both model names, and later use + // the one matching the incoming event model. + if (PathUtils.isTableModelDatabase(targetDatabase)) { + targetTableModelDatabaseName = targetDatabase.toLowerCase(Locale.ENGLISH); + targetTreeModelDatabaseName = + validateAndNormalizeTreeModelDatabaseName( + PathUtils.qualifyDatabaseName(targetTableModelDatabaseName)); + return; + } + + targetTreeModelDatabaseName = validateAndNormalizeTreeModelDatabaseName(targetDatabase); + final String tableModelDatabaseName = + PathUtils.unQualifyDatabaseName(targetTreeModelDatabaseName).toLowerCase(Locale.ENGLISH); + try { + TableConfigTaskVisitor.validateDatabaseName(tableModelDatabaseName); + targetTableModelDatabaseName = tableModelDatabaseName; + } catch (final Exception e) { + // A valid multi-level tree database like root.target.db cannot be converted to a valid + // table-model database name, but tree-model events can still use the original target. + invalidTargetTableModelDatabaseName = tableModelDatabaseName; + } + } + @Override public void handshake() throws Exception { // Do nothing @@ -266,13 +408,19 @@ private void doTransfer( final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); final String dataBaseName = pipeInsertNodeTabletInsertionEvent.isTableModelEvent() - ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() - : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName(); + ? getTargetTableModelDatabaseNameOrDefault( + pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()) + : getTargetTreeModelDatabaseNameOrDefault( + pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName()); final InsertBaseStatement insertBaseStatement; insertBaseStatement = PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(insertNode, dataBaseName) .constructStatement(); + if (!insertBaseStatement.isWriteToTable()) { + rewriteTreeModelDatabaseNameIfNecessary( + insertBaseStatement, pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName()); + } final TSStatus status = insertBaseStatement.isWriteToTable() @@ -310,8 +458,10 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion throws PipeException { final String dataBaseName = pipeRawTabletInsertionEvent.isTableModelEvent() - ? pipeRawTabletInsertionEvent.getTableModelDatabaseName() - : pipeRawTabletInsertionEvent.getTreeModelDatabaseName(); + ? getTargetTableModelDatabaseNameOrDefault( + pipeRawTabletInsertionEvent.getTableModelDatabaseName()) + : getTargetTreeModelDatabaseNameOrDefault( + pipeRawTabletInsertionEvent.getTreeModelDatabaseName()); final InsertTabletStatement insertTabletStatement = PipeTransferTabletRawReqV2.toTPipeTransferRawReq( @@ -319,6 +469,10 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion pipeRawTabletInsertionEvent.isAligned(), dataBaseName) .constructStatement(); + if (!insertTabletStatement.isWriteToTable()) { + rewriteTreeModelDatabaseNameIfNecessary( + insertTabletStatement, pipeRawTabletInsertionEvent.getTreeModelDatabaseName()); + } final TSStatus status = insertTabletStatement.isWriteToTable() @@ -362,15 +516,24 @@ private void doTransferWrapper(final PipeStatementInsertionEvent pipeStatementIn private void doTransfer(final PipeStatementInsertionEvent pipeStatementInsertionEvent) throws PipeException { - final TSStatus status = - pipeStatementInsertionEvent.isTableModelEvent() - ? executeStatementForTableModel( - pipeStatementInsertionEvent.getStatement(), - pipeStatementInsertionEvent.getTableModelDatabaseName(), - pipeStatementInsertionEvent.getUserName()) - : executeStatementForTreeModel( - pipeStatementInsertionEvent.getStatement(), - pipeStatementInsertionEvent.getUserName()); + final TSStatus status; + if (pipeStatementInsertionEvent.isTableModelEvent()) { + final String dataBaseName = + getTargetTableModelDatabaseNameOrDefault( + pipeStatementInsertionEvent.getTableModelDatabaseName()); + status = + executeStatementForTableModel( + rewriteTableModelDatabaseNameIfNecessary(pipeStatementInsertionEvent.getStatement()), + dataBaseName, + pipeStatementInsertionEvent.getUserName()); + } else { + status = + executeStatementForTreeModel( + rewriteTreeModelDatabaseNameIfNecessary( + pipeStatementInsertionEvent.getStatement(), + pipeStatementInsertionEvent.getTreeModelDatabaseName()), + pipeStatementInsertionEvent.getUserName()); + } if (status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode() && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() @@ -384,6 +547,166 @@ private void doTransfer(final PipeStatementInsertionEvent pipeStatementInsertion } } + private String getTargetTableModelDatabaseNameOrDefault(final String databaseName) { + if (Objects.nonNull(invalidTargetTableModelDatabaseName)) { + throw new PipeException( + String.format( + DataNodePipeMessages + .TARGET_TREE_MODEL_DATABASE_CANNOT_BE_USED_FOR_TABLE_MODEL_EVENTS_FMT, + targetTreeModelDatabaseName, + invalidTargetTableModelDatabaseName)); + } + + validateTableModelDatabaseName(databaseName); + return Objects.nonNull(targetTableModelDatabaseName) + ? targetTableModelDatabaseName + : databaseName; + } + + private String getTargetTreeModelDatabaseNameOrDefault(final String databaseName) { + final String sourceTreeModelDatabaseName = + validateAndNormalizeTreeModelDatabaseName(databaseName); + return Objects.nonNull(targetTreeModelDatabaseName) + ? targetTreeModelDatabaseName + : sourceTreeModelDatabaseName; + } + + private Statement rewriteTableModelDatabaseNameIfNecessary(final Statement statement) { + if (Objects.isNull(targetTableModelDatabaseName) + || !(statement instanceof InsertBaseStatement)) { + return statement; + } + + rewriteTableModelDatabaseName((InsertBaseStatement) statement); + return statement; + } + + private void rewriteTableModelDatabaseName(final InsertBaseStatement statement) { + statement.setDatabaseName(targetTableModelDatabaseName); + + if (statement instanceof InsertRowsStatement) { + ((InsertRowsStatement) statement) + .getInsertRowStatementList() + .forEach(this::rewriteTableModelDatabaseName); + } else if (statement instanceof InsertRowsOfOneDeviceStatement) { + ((InsertRowsOfOneDeviceStatement) statement) + .getInsertRowStatementList() + .forEach(this::rewriteTableModelDatabaseName); + } else if (statement instanceof InsertMultiTabletsStatement) { + ((InsertMultiTabletsStatement) statement) + .getInsertTabletStatementList() + .forEach(this::rewriteTableModelDatabaseName); + } + } + + private Statement rewriteTreeModelDatabaseNameIfNecessary( + final Statement statement, final String sourceTreeModelDatabaseName) { + if (!(statement instanceof InsertBaseStatement)) { + return statement; + } + + return rewriteTreeModelDatabaseNameIfNecessary( + (InsertBaseStatement) statement, sourceTreeModelDatabaseName); + } + + private InsertBaseStatement rewriteTreeModelDatabaseNameIfNecessary( + final InsertBaseStatement statement, final String sourceTreeModelDatabaseName) { + final String normalizedSourceTreeModelDatabaseName = + validateAndNormalizeTreeModelDatabaseName(sourceTreeModelDatabaseName); + if (Objects.isNull(targetTreeModelDatabaseName)) { + return statement; + } + + rewriteTreeModelDatabaseName(statement, normalizedSourceTreeModelDatabaseName); + return statement; + } + + private void rewriteTreeModelDatabaseName( + final InsertBaseStatement statement, final String sourceTreeModelDatabaseName) { + statement.setDatabaseName(targetTreeModelDatabaseName); + + if (statement instanceof InsertRowsStatement) { + ((InsertRowsStatement) statement) + .getInsertRowStatementList() + .forEach( + rowStatement -> + rewriteTreeModelDatabaseName(rowStatement, sourceTreeModelDatabaseName)); + return; + } + + if (statement instanceof InsertRowsOfOneDeviceStatement) { + final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement = + (InsertRowsOfOneDeviceStatement) statement; + insertRowsOfOneDeviceStatement + .getInsertRowStatementList() + .forEach( + rowStatement -> + rewriteTreeModelDatabaseName(rowStatement, sourceTreeModelDatabaseName)); + insertRowsOfOneDeviceStatement.setDevicePath( + rewriteTreeModelDevicePath( + insertRowsOfOneDeviceStatement.getDevicePath(), sourceTreeModelDatabaseName)); + return; + } + + if (statement instanceof InsertMultiTabletsStatement) { + ((InsertMultiTabletsStatement) statement) + .getInsertTabletStatementList() + .forEach( + tabletStatement -> + rewriteTreeModelDatabaseName(tabletStatement, sourceTreeModelDatabaseName)); + return; + } + + statement.setDevicePath( + rewriteTreeModelDevicePath(statement.getDevicePath(), sourceTreeModelDatabaseName)); + } + + private PartialPath rewriteTreeModelDevicePath( + final PartialPath devicePath, final String sourceTreeModelDatabaseName) { + if (Objects.isNull(devicePath) || Objects.isNull(sourceTreeModelDatabaseName)) { + return devicePath; + } + + try { + final String[] sourceDatabaseNodes = new PartialPath(sourceTreeModelDatabaseName).getNodes(); + final String[] targetDatabaseNodes = new PartialPath(targetTreeModelDatabaseName).getNodes(); + final String[] deviceNodes = devicePath.getNodes(); + // A processor may rewrite the device path before write-back sink receives the event. + // If it no longer belongs to the source database, keep it untouched to avoid corruption. + if (!startsWith(deviceNodes, sourceDatabaseNodes)) { + return devicePath; + } + + final ArrayList rebasedNodes = + new ArrayList<>( + targetDatabaseNodes.length + deviceNodes.length - sourceDatabaseNodes.length); + rebasedNodes.addAll(Arrays.asList(targetDatabaseNodes)); + rebasedNodes.addAll( + Arrays.asList(deviceNodes).subList(sourceDatabaseNodes.length, deviceNodes.length)); + return new PartialPath(rebasedNodes.toArray(new String[0])); + } catch (final Exception e) { + throw new PipeException( + String.format( + DataNodePipeMessages.FAILED_TO_REWRITE_TREE_MODEL_DATABASE_FMT, + sourceTreeModelDatabaseName, + targetTreeModelDatabaseName, + devicePath), + e); + } + } + + private static boolean startsWith(final String[] nodes, final String[] prefixNodes) { + if (nodes.length < prefixNodes.length) { + return false; + } + for (int i = 0; i < prefixNodes.length; ++i) { + if (!Objects.equals(nodes[i], prefixNodes[i])) { + return false; + } + } + return true; + } + private static void throwWriteBackExceptionIfNecessary( final TSStatus status, final String exceptionMessage) { if (status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode()) { @@ -396,7 +719,9 @@ private static void throwWriteBackExceptionIfNecessary( @Override public void close() throws Exception { if (session != null) { - SESSION_MANAGER.closeSession(session, COORDINATOR::cleanupQueryExecution, false); + getSessionManager() + .closeSession( + session, queryId -> Coordinator.getInstance().cleanupQueryExecution(queryId), false); } } @@ -413,10 +738,10 @@ private TSStatus executeStatementForTableModel( return Coordinator.getInstance() .executeForTableModel( new PipeEnrichedStatement(statement), - RELATIONAL_SQL_PARSER, + getRelationalSqlParser(), session, - SESSION_MANAGER.requestQueryId(), - SESSION_MANAGER.getSessionInfoOfPipeReceiver(session, dataBaseName), + getSessionManager().requestQueryId(), + getSessionManager().getSessionInfoOfPipeReceiver(session, dataBaseName), "", LocalExecutionPlanner.getInstance().metadata, IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()) @@ -446,10 +771,10 @@ private TSStatus executeStatementForTableModel( return Coordinator.getInstance() .executeForTableModel( new PipeEnrichedStatement(statement), - RELATIONAL_SQL_PARSER, + getRelationalSqlParser(), session, - SESSION_MANAGER.requestQueryId(), - SESSION_MANAGER.getSessionInfo(session), + getSessionManager().requestQueryId(), + getSessionManager().getSessionInfo(session), "", LocalExecutionPlanner.getInstance().metadata, IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()) @@ -534,8 +859,8 @@ private TSStatus executeStatementForTreeModel(final Statement statement, final S return Coordinator.getInstance() .executeForTreeModel( new PipeEnrichedStatement(statement), - SESSION_MANAGER.requestQueryId(), - SESSION_MANAGER.getSessionInfo(session), + getSessionManager().requestQueryId(), + getSessionManager().getSessionInfo(session), "", ClusterPartitionFetcher.getInstance(), ClusterSchemaFetcher.getInstance(), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index cf311639ee9a2..fa18695ef1f1b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment; +import org.apache.iotdb.db.pipe.event.common.statement.PipeStatementInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink; import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; @@ -31,6 +32,8 @@ import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketConnectorServer; import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketSink; +import org.apache.iotdb.db.pipe.sink.protocol.writeback.WriteBackSink; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -43,8 +46,10 @@ import org.junit.Test; import org.mockito.Mockito; +import java.lang.reflect.Field; import java.security.SecureRandom; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -279,6 +284,214 @@ public void testOpcUaSink() { } } + @Test + public void testWriteBackSinkTargetDatabaseValidation() throws Exception { + assertWriteBackSinkTargetDatabaseValid("target"); + assertWriteBackSinkTargetDatabaseValid("root.target"); + assertWriteBackSinkTargetDatabaseValid("root.target.db"); + + Assert.assertThrows(PipeException.class, () -> assertWriteBackSinkTargetDatabaseValid("a.b")); + Assert.assertThrows( + PipeException.class, () -> assertWriteBackSinkTargetDatabaseValid("a".repeat(65))); + Assert.assertThrows( + PipeException.class, () -> assertWriteBackSinkTargetDatabaseValid("root.a+b")); + Assert.assertThrows( + PipeException.class, + () -> assertWriteBackSinkTargetDatabaseValid("root." + "a".repeat(60))); + } + + @Test + public void testWriteBackSinkTargetDatabaseCustomization() throws Exception { + try (final WriteBackSink sink = createCustomizedWriteBackSink("TestTarget")) { + Assert.assertEquals( + "testtarget", getWriteBackSinkDatabaseName(sink, "targetTableModelDatabaseName")); + Assert.assertNull(getWriteBackSinkDatabaseName(sink, "invalidTargetTableModelDatabaseName")); + Assert.assertEquals( + "root.testtarget", getWriteBackSinkDatabaseName(sink, "targetTreeModelDatabaseName")); + } + + try (final WriteBackSink sink = createCustomizedWriteBackSink("root.target")) { + Assert.assertEquals( + "target", getWriteBackSinkDatabaseName(sink, "targetTableModelDatabaseName")); + Assert.assertNull(getWriteBackSinkDatabaseName(sink, "invalidTargetTableModelDatabaseName")); + Assert.assertEquals( + "root.target", getWriteBackSinkDatabaseName(sink, "targetTreeModelDatabaseName")); + } + + try (final WriteBackSink sink = createCustomizedWriteBackSink("root.target.db")) { + Assert.assertNull(getWriteBackSinkDatabaseName(sink, "targetTableModelDatabaseName")); + Assert.assertEquals( + "target.db", getWriteBackSinkDatabaseName(sink, "invalidTargetTableModelDatabaseName")); + Assert.assertEquals( + "root.target.db", getWriteBackSinkDatabaseName(sink, "targetTreeModelDatabaseName")); + } + } + + @Test + public void testWriteBackSinkRejectsInvalidTableModelDatabaseFromEvent() { + try (final WriteBackSink sink = new WriteBackSink()) { + final PipeRawTabletInsertionEvent event = createTableModelRawTabletInsertionEvent("root.a.b"); + Assert.assertThrows(PipeException.class, () -> sink.transfer(event)); + } catch (final Exception e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testWriteBackSinkRejectsInvalidTableModelDatabaseFromEventWithTargetDatabase() + throws Exception { + final PipeParameters parameters = + new PipeParameters(Collections.singletonMap("sink.database", "target")); + + try (final WriteBackSink sink = new WriteBackSink()) { + sink.validate(new PipeParameterValidator(parameters)); + sink.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + final PipeRawTabletInsertionEvent event = createTableModelRawTabletInsertionEvent("root.a.b"); + Assert.assertThrows(PipeException.class, () -> sink.transfer(event)); + } + } + + @Test + public void testWriteBackSinkRejectsInvalidTreeModelDatabaseFromEventWithTargetDatabase() + throws Exception { + final PipeParameters parameters = + new PipeParameters(Collections.singletonMap("sink.database", "root.target")); + + try (final WriteBackSink sink = new WriteBackSink()) { + sink.validate(new PipeParameterValidator(parameters)); + sink.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + final PipeRawTabletInsertionEvent event = createTreeModelRawTabletInsertionEvent("root.a+b"); + Assert.assertThrows(PipeException.class, () -> sink.transfer(event)); + } + } + + @Test + public void testWriteBackSinkRejectsInvalidStatementEventDatabases() throws Exception { + try (final WriteBackSink sink = createCustomizedWriteBackSink("target")) { + Assert.assertThrows( + PipeException.class, + () -> sink.transfer(createTableModelStatementInsertionEvent("root.a.b"))); + } + + try (final WriteBackSink sink = createCustomizedWriteBackSink("root.target")) { + Assert.assertThrows( + PipeException.class, + () -> sink.transfer(createTreeModelStatementInsertionEvent("root.a+b"))); + } + + try (final WriteBackSink sink = createCustomizedWriteBackSink("root.target.db")) { + Assert.assertThrows( + PipeException.class, + () -> sink.transfer(createTableModelStatementInsertionEvent("valid_db"))); + } + } + + @Test + public void testWriteBackSinkRejectsInvalidTableModelDatabaseFromTreeTarget() throws Exception { + final PipeParameters parameters = + new PipeParameters( + new HashMap() { + { + put("sink.database", "root.target.db"); + } + }); + + try (final WriteBackSink sink = new WriteBackSink()) { + sink.validate(new PipeParameterValidator(parameters)); + sink.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + final PipeRawTabletInsertionEvent event = createTableModelRawTabletInsertionEvent("valid_db"); + Assert.assertThrows(PipeException.class, () -> sink.transfer(event)); + } + } + + private void assertWriteBackSinkTargetDatabaseValid(final String targetDatabase) + throws Exception { + try (final WriteBackSink sink = new WriteBackSink()) { + sink.validate( + new PipeParameterValidator( + new PipeParameters(Collections.singletonMap("sink.database", targetDatabase)))); + } + } + + private WriteBackSink createCustomizedWriteBackSink(final String targetDatabase) + throws Exception { + final PipeParameters parameters = + new PipeParameters(Collections.singletonMap("sink.database", targetDatabase)); + final WriteBackSink sink = new WriteBackSink(); + sink.validate(new PipeParameterValidator(parameters)); + sink.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + return sink; + } + + private String getWriteBackSinkDatabaseName(final WriteBackSink sink, final String fieldName) + throws Exception { + final Field field = WriteBackSink.class.getDeclaredField(fieldName); + field.setAccessible(true); + return (String) field.get(sink); + } + + private PipeRawTabletInsertionEvent createTableModelRawTabletInsertionEvent( + final String databaseName) { + final List schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet("table", schemaList, 1); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1L); + return new PipeRawTabletInsertionEvent( + true, databaseName, null, null, tablet, false, "pipe", 0L, null, null, false); + } + + private PipeRawTabletInsertionEvent createTreeModelRawTabletInsertionEvent( + final String databaseName) { + final List schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet(databaseName + ".d1", schemaList, 1); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1L); + return new PipeRawTabletInsertionEvent( + false, databaseName, null, databaseName, tablet, false, "pipe", 0L, null, null, false); + } + + private PipeStatementInsertionEvent createTableModelStatementInsertionEvent( + final String databaseName) { + return createStatementInsertionEvent(true, databaseName); + } + + private PipeStatementInsertionEvent createTreeModelStatementInsertionEvent( + final String databaseName) { + return createStatementInsertionEvent(false, databaseName); + } + + private PipeStatementInsertionEvent createStatementInsertionEvent( + final boolean isTableModelEvent, final String databaseName) { + final InsertTabletStatement statement = new InsertTabletStatement(); + statement.setRamBytesUsed(1L); + return new PipeStatementInsertionEvent( + "pipe", + 0L, + null, + null, + null, + null, + null, + null, + true, + isTableModelEvent, + databaseName, + statement); + } + private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( final String pipeName, final long creationTime, final int regionId) { final List schemaList =