diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/commit/IoTDBMigrateMultiRegionForIoTV1IT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/commit/IoTDBMigrateMultiRegionForIoTV1IT.java new file mode 100644 index 0000000000000..7e9be5ccf9904 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/commit/IoTDBMigrateMultiRegionForIoTV1IT.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.commit; + +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; + +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly; + +@Category({ClusterIT.class}) +@RunWith(IoTDBTestRunner.class) +public class IoTDBMigrateMultiRegionForIoTV1IT extends IoTDBRegionOperationReliabilityITFramework { + private static final String MULTI_REGION_MIGRATE_FORMAT = "migrate region %s from %d to %d"; + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBMigrateMultiRegionForIoTV1IT.class); + + /** + * Migrate multiple regions from one source DataNode to one destination DataNode in a single + * statement: {@code migrate region r1,r2 from src to dest}. Both regions must leave the source + * and land on the destination. + */ + @Test + public void multiRegionMigrateTest() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setDataReplicationFactor(1) + .setSchemaReplicationFactor(1); + + EnvFactory.getEnv().initClusterEnvironment(1, 5); + + try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + final Statement statement = makeItCloseQuietly(connection.createStatement()); + SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + // prepare data + statement.execute(INSERTION1); + statement.execute(FLUSH_COMMAND); + + Map> regionMap = getAllRegionMap(statement); + Set allDataNodeId = getAllDataNodes(statement); + + // With replication factor 1 every region lives on exactly one DataNode. Pick a source + // DataNode that hosts at least two regions, then migrate all its regions to a fresh + // destination DataNode. + int sourceDataNode = selectDataNodeHostingMultipleRegions(regionMap); + List selectedRegions = regionsOnDataNode(regionMap, sourceDataNode); + int destDataNode = + findDataNodeNotContainsAnyRegion(allDataNodeId, regionMap, selectedRegions); + + LOGGER.info( + "Multi-region migrate: regions {} from DataNode {} to DataNode {}", + selectedRegions, + sourceDataNode, + destDataNode); + + String command = + String.format( + MULTI_REGION_MIGRATE_FORMAT, + selectedRegions.stream().map(String::valueOf).collect(Collectors.joining(",")), + sourceDataNode, + destDataNode); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .until( + () -> { + try { + statement.execute(command); + return true; + } catch (Exception e) { + String errorMessage = e.getMessage(); + if (errorMessage != null + && errorMessage.contains("successfully submitted") + && errorMessage.contains("failed to submit")) { + LOGGER.warn("Multi-region migrate partially succeeded: {}", errorMessage); + return true; + } + LOGGER.warn("Multi-region migrate failed, retrying: {}", errorMessage); + return false; + } + }); + + Predicate migratePredicate = + tShowRegionResp -> { + Map> newRegionMap = + getRunningRegionMap(tShowRegionResp.getRegionInfoList()); + return selectedRegions.stream() + .allMatch( + regionId -> { + Set dataNodes = newRegionMap.get(regionId); + return dataNodes != null + && dataNodes.contains(destDataNode) + && !dataNodes.contains(sourceDataNode); + }); + }; + + awaitUntilSuccess( + client, + selectedRegions.get(0), + migratePredicate, + Optional.of(destDataNode), + Optional.of(sourceDataNode)); + + regionMap = getAllRegionMap(statement); + for (int regionId : selectedRegions) { + Assert.assertTrue( + "Region " + regionId + " should be on destination DataNode " + destDataNode, + regionMap.get(regionId).contains(destDataNode)); + Assert.assertFalse( + "Region " + regionId + " should have left source DataNode " + sourceDataNode, + regionMap.get(regionId).contains(sourceDataNode)); + } + LOGGER.info("Multi-region migrate test passed"); + } + } + + private int selectDataNodeHostingMultipleRegions(Map> regionMap) { + Map regionCountPerDataNode = + regionMap.values().stream() + .flatMap(Set::stream) + .collect(Collectors.groupingBy(dataNodeId -> dataNodeId, Collectors.counting())); + return regionCountPerDataNode.entrySet().stream() + .filter(entry -> entry.getValue() >= 2) + .map(Map.Entry::getKey) + .findFirst() + .orElseThrow( + () -> new RuntimeException("Cannot find a DataNode hosting at least two regions")); + } + + private List regionsOnDataNode(Map> regionMap, int dataNodeId) { + return regionMap.entrySet().stream() + .filter(entry -> entry.getValue().contains(dataNodeId)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + private int findDataNodeNotContainsAnyRegion( + Set allDataNodeId, Map> regionMap, List regionIds) { + return allDataNodeId.stream() + .filter( + dataNodeId -> + regionIds.stream() + .noneMatch(regionId -> regionMap.get(regionId).contains(dataNodeId))) + .findFirst() + .orElseThrow( + () -> + new RuntimeException( + "Cannot find DataNode that doesn't contain any of the regions")); + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java index 4ce111490701a..90ef5fb75db61 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeNormalIT.java @@ -155,6 +155,13 @@ public void success1C4DIoTTestUseTableSQL() throws Exception { successTest(2, 3, 1, 4, 1, 2, true, SQLModel.TABLE_MODEL_SQL, ConsensusFactory.IOT_CONSENSUS); } + @Test + public void success1C5DRemoveTwoDataNodesUseSQL() throws Exception { + // Setup 1C5D, and remove 2D in a single "remove datanode a, b" statement; 3 DataNodes remain + // which is enough to keep both the data (factor 2) and schema (factor 3) replicas. + successTest(2, 3, 1, 5, 2, 2, true, SQLModel.TREE_MODEL_SQL, ConsensusFactory.IOT_CONSENSUS); + } + // @Test public void success1C4DIoTV2TestUseTableSQL() throws Exception { // Setup 1C4D, and remove 1D, this test should success diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index 2f91b573aef0e..8a9e6d7d62fee 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -577,7 +577,7 @@ getSeriesSlotList // ---- Migrate Region migrateRegion - : MIGRATE REGION regionId=INTEGER_LITERAL FROM fromId=INTEGER_LITERAL TO toId=INTEGER_LITERAL + : MIGRATE REGION regionIds+=INTEGER_LITERAL (COMMA regionIds+=INTEGER_LITERAL)* FROM fromId=INTEGER_LITERAL TO toId=INTEGER_LITERAL ; reconstructRegion @@ -598,7 +598,7 @@ verifyConnection // ---- Remove DataNode removeDataNode - : REMOVE DATANODE dataNodeId=INTEGER_LITERAL + : REMOVE DATANODE dataNodeIds+=INTEGER_LITERAL (COMMA dataNodeIds+=INTEGER_LITERAL)* ; // ---- Remove ConfigNode diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 2beeea6b4af4f..6b47128eab99d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -179,6 +179,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -823,6 +824,7 @@ public Set getRemovingDataNodeIds() { */ private TSStatus checkMigrateRegion( TMigrateRegionReq migrateRegionReq, + int regionId, TConsensusGroupId regionGroupId, TDataNodeLocation originalDataNode, TDataNodeLocation destDataNode, @@ -847,7 +849,7 @@ private TSStatus checkMigrateRegion( failMessage = String.format( "Submit RegionMigrateProcedure failed, because the original DataNode %s doesn't contain Region %s", - migrateRegionReq.getFromId(), migrateRegionReq.getRegionId()); + migrateRegionReq.getFromId(), regionId); } else if (configManager .getPartitionManager() .getAllReplicaSets(destDataNode.getDataNodeId()) @@ -856,7 +858,7 @@ private TSStatus checkMigrateRegion( failMessage = String.format( "Submit RegionMigrateProcedure failed, because the target DataNode %s already contains Region %s", - migrateRegionReq.getToId(), migrateRegionReq.getRegionId()); + migrateRegionReq.getToId(), regionId); } if (failMessage != null) { @@ -1121,76 +1123,129 @@ private String checkRegionOperationModelCorrectness(TConsensusGroupId regionId, public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) { try (AutoCloseableLock ignoredLock = AutoCloseableLock.acquire(env.getSubmitRegionMigrateLock())) { - TConsensusGroupId regionGroupId; - Optional optional = - configManager - .getPartitionManager() - .generateTConsensusGroupIdByRegionId(migrateRegionReq.getRegionId()); - if (optional.isPresent()) { - regionGroupId = optional.get(); - } else { - LOGGER.error(ManagerMessages.GET_REGION_GROUP_ID_FAIL); + // The source and destination DataNodes are fixed for the whole statement, so resolve them + // once and reuse them for every region. + final TDataNodeConfiguration originalDataNodeConfiguration = + configManager.getNodeManager().getRegisteredDataNode(migrateRegionReq.getFromId()); + final TDataNodeConfiguration destDataNodeConfiguration = + configManager.getNodeManager().getRegisteredDataNode(migrateRegionReq.getToId()); + if (originalDataNodeConfiguration == null) { return new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()) - .setMessage(ManagerMessages.GET_REGION_GROUP_ID_FAIL); + .setMessage( + String.format( + "Source DataNode %s does not exist in the cluster", + migrateRegionReq.getFromId())); } - - // find original dn and dest dn - final TDataNodeLocation originalDataNode = - configManager - .getNodeManager() - .getRegisteredDataNode(migrateRegionReq.getFromId()) - .getLocation(); - final TDataNodeLocation destDataNode = - configManager - .getNodeManager() - .getRegisteredDataNode(migrateRegionReq.getToId()) - .getLocation(); - // select coordinator for adding peer - RegionMaintainHandler handler = env.getRegionMaintainHandler(); - // TODO: choose the DataNode which has lowest load - final TDataNodeLocation coordinatorForAddPeer = - handler - .filterDataNodeWithOtherRegionReplica( - regionGroupId, - destDataNode, - NodeStatus.Running, - NodeStatus.Removing, - NodeStatus.ReadOnly) - .orElse(null); - // Select coordinator for removing peer - // For now, destDataNode temporarily acts as the coordinatorForRemovePeer - final TDataNodeLocation coordinatorForRemovePeer = destDataNode; - - TSStatus status = - checkMigrateRegion( - migrateRegionReq, - regionGroupId, - originalDataNode, - destDataNode, - coordinatorForAddPeer); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return status; + if (destDataNodeConfiguration == null) { + return new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()) + .setMessage( + String.format( + "Target DataNode %s does not exist in the cluster", + migrateRegionReq.getToId())); + } + final TDataNodeLocation originalDataNode = originalDataNodeConfiguration.getLocation(); + final TDataNodeLocation destDataNode = destDataNodeConfiguration.getLocation(); + final RegionMaintainHandler handler = env.getRegionMaintainHandler(); + + TSStatus resp = new TSStatus(); + StringBuilder messageBuilder = new StringBuilder(); + int total = 0, success = 0; + // dedup region ids while preserving the user-specified order + for (int theRegionId : new LinkedHashSet<>(migrateRegionReq.getRegionIds())) { + total++; + TSStatus subStatus = + migrateOneRegion( + migrateRegionReq, theRegionId, originalDataNode, destDataNode, handler); + if (subStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + messageBuilder.append("region ").append(theRegionId).append(": Successfully submitted\n"); + success++; + } else { + messageBuilder + .append("region ") + .append(theRegionId) + .append(": ") + .append(subStatus.getMessage()) + .append('\n'); + } + resp.addToSubStatus(subStatus); } - // finally, submit procedure - this.executor.submitProcedure( - new RegionMigrateProcedure( - regionGroupId, - originalDataNode, - destDataNode, - coordinatorForAddPeer, - coordinatorForRemovePeer)); - LOGGER.info( - ManagerMessages - .MIGRATEREGION_SUBMIT_REGIONMIGRATEPROCEDURE_SUCCESSFULLY_REGION_ORIGIN_DATANODE, - regionGroupId, - originalDataNode, - destDataNode, - coordinatorForAddPeer, - coordinatorForRemovePeer); + messageBuilder.insert( + 0, + String.format( + "Total regions: %d, successfully submitted: %d, failed to submit: %d\n", + total, success, total - success)); + resp.setCode( + total == success + ? TSStatusCode.SUCCESS_STATUS.getStatusCode() + : TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); + resp.setMessage(messageBuilder.toString()); + return resp; + } + } - return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + private TSStatus migrateOneRegion( + TMigrateRegionReq migrateRegionReq, + int theRegionId, + TDataNodeLocation originalDataNode, + TDataNodeLocation destDataNode, + RegionMaintainHandler handler) { + TConsensusGroupId regionGroupId; + Optional optional = + configManager.getPartitionManager().generateTConsensusGroupIdByRegionId(theRegionId); + if (optional.isPresent()) { + regionGroupId = optional.get(); + } else { + LOGGER.error(ManagerMessages.GET_REGION_GROUP_ID_FAIL); + return new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()) + .setMessage(ManagerMessages.GET_REGION_GROUP_ID_FAIL); + } + + // select coordinator for adding peer + // TODO: choose the DataNode which has lowest load + final TDataNodeLocation coordinatorForAddPeer = + handler + .filterDataNodeWithOtherRegionReplica( + regionGroupId, + destDataNode, + NodeStatus.Running, + NodeStatus.Removing, + NodeStatus.ReadOnly) + .orElse(null); + // Select coordinator for removing peer + // For now, destDataNode temporarily acts as the coordinatorForRemovePeer + final TDataNodeLocation coordinatorForRemovePeer = destDataNode; + + TSStatus status = + checkMigrateRegion( + migrateRegionReq, + theRegionId, + regionGroupId, + originalDataNode, + destDataNode, + coordinatorForAddPeer); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } + + // finally, submit procedure + this.executor.submitProcedure( + new RegionMigrateProcedure( + regionGroupId, + originalDataNode, + destDataNode, + coordinatorForAddPeer, + coordinatorForRemovePeer)); + LOGGER.info( + ManagerMessages + .MIGRATEREGION_SUBMIT_REGIONMIGRATEPROCEDURE_SUCCESSFULLY_REGION_ORIGIN_DATANODE, + regionGroupId, + originalDataNode, + destDataNode, + coordinatorForAddPeer, + coordinatorForRemovePeer); + + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } public TSStatus reconstructRegion(TReconstructRegionReq req) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 420c3c8285f4c..bdbe47ee90421 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -3635,7 +3635,7 @@ public SettableFuture migrateRegion(final MigrateRegionTask mi CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final TMigrateRegionReq tMigrateRegionReq = new TMigrateRegionReq( - migrateRegionTask.getStatement().getRegionId(), + migrateRegionTask.getStatement().getRegionIds(), migrateRegionTask.getStatement().getFromId(), migrateRegionTask.getStatement().getToId(), migrateRegionTask.getModel()); @@ -3667,11 +3667,16 @@ public SettableFuture removeDataNode( invalidNodeIds.removeAll(validNodeIds); if (!invalidNodeIds.isEmpty()) { - LOGGER.info(DataNodeQueryMessages.CANNOT_REMOVE_INVALID_NODEIDS, invalidNodeIds); - nodeIds.removeAll(invalidNodeIds); + LOGGER.error(DataNodeQueryMessages.CANNOT_REMOVE_INVALID_NODEIDS, invalidNodeIds); + future.setException( + new IOException( + "The DataNode(s) to be removed " + + invalidNodeIds + + " are not in the cluster, or the input format is incorrect.")); + return future; } - if (nodeIds.size() != 1) { + if (nodeIds.isEmpty()) { LOGGER.error( "The DataNode to be removed is not in the cluster, or the input format is incorrect."); future.setException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/MigrateRegionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/MigrateRegionTask.java index 8c6f2bfe36ba5..d99f9a36c164e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/MigrateRegionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/MigrateRegionTask.java @@ -41,7 +41,7 @@ public MigrateRegionTask(MigrateRegionStatement migrateRegionStatement) { public MigrateRegionTask(MigrateRegion migrateRegion) { this.statement = new MigrateRegionStatement( - migrateRegion.getRegionId(), migrateRegion.getFromId(), migrateRegion.getToId()); + migrateRegion.getRegionIds(), migrateRegion.getFromId(), migrateRegion.getToId()); this.model = Model.TABLE; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index b9e2fe21c7a54..083144b0c4e4a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -4632,10 +4632,10 @@ public Statement visitCountTimeSlotList(IoTDBSqlParser.CountTimeSlotListContext @Override public Statement visitMigrateRegion(IoTDBSqlParser.MigrateRegionContext ctx) { + List regionIds = + ctx.regionIds.stream().map(token -> Integer.parseInt(token.getText())).collect(toList()); return new MigrateRegionStatement( - Integer.parseInt(ctx.regionId.getText()), - Integer.parseInt(ctx.fromId.getText()), - Integer.parseInt(ctx.toId.getText())); + regionIds, Integer.parseInt(ctx.fromId.getText()), Integer.parseInt(ctx.toId.getText())); } @Override @@ -4666,7 +4666,7 @@ public Statement visitRemoveRegion(IoTDBSqlParser.RemoveRegionContext ctx) { @Override public Statement visitRemoveDataNode(IoTDBSqlParser.RemoveDataNodeContext ctx) { List nodeIds = - Collections.singletonList(Integer.parseInt(ctx.INTEGER_LITERAL().getText())); + ctx.dataNodeIds.stream().map(token -> Integer.parseInt(token.getText())).collect(toList()); return new RemoveDataNodeStatement(nodeIds); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/MigrateRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/MigrateRegion.java index 3a305c961360d..a24f99b407696 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/MigrateRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/MigrateRegion.java @@ -36,19 +36,20 @@ public class MigrateRegion extends Statement { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(MigrateRegion.class); - private final int regionId; + private final List regionIds; private final int fromId; private final int toId; - public MigrateRegion(int regionId, int fromId, int toId) { - this(null, regionId, fromId, toId); + public MigrateRegion(List regionIds, int fromId, int toId) { + this(null, regionIds, fromId, toId); } - public MigrateRegion(@Nullable NodeLocation location, int regionId, int fromId, int toId) { + public MigrateRegion( + @Nullable NodeLocation location, List regionIds, int fromId, int toId) { super(location); - this.regionId = regionId; + this.regionIds = regionIds; this.fromId = fromId; this.toId = toId; } @@ -60,7 +61,7 @@ public List getChildren() { @Override public int hashCode() { - return Objects.hash(MigrateRegion.class, regionId, fromId, toId); + return Objects.hash(MigrateRegion.class, regionIds, fromId, toId); } @Override @@ -72,12 +73,12 @@ public boolean equals(Object obj) { return false; } MigrateRegion another = (MigrateRegion) obj; - return regionId == another.regionId && fromId == another.fromId && toId == another.toId; + return regionIds.equals(another.regionIds) && fromId == another.fromId && toId == another.toId; } @Override public String toString() { - return String.format("migrate region %d from %d to %d", regionId, fromId, toId); + return String.format("migrate region %s from %d to %d", regionIds, fromId, toId); } @Override @@ -85,8 +86,8 @@ public R accept(IAstVisitor visitor, C context) { return ((AstVisitor) visitor).visitMigrateRegion(this, context); } - public int getRegionId() { - return regionId; + public List getRegionIds() { + return regionIds; } public int getFromId() { @@ -101,6 +102,7 @@ public int getToId() { public long ramBytesUsed() { long size = INSTANCE_SIZE; size += AstMemoryEstimationHelper.getEstimatedSizeOfNodeLocation(getLocationInternal()); + size += AstMemoryEstimationHelper.getEstimatedSizeOfIntegerList(regionIds); return size; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index bd56b82042d52..0068483033b8f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -1571,10 +1571,10 @@ public Node visitShowSeriesSlotListStatement( @Override public Node visitMigrateRegionStatement(RelationalSqlParser.MigrateRegionStatementContext ctx) { + List regionIds = + ctx.regionIds.stream().map(token -> Integer.parseInt(token.getText())).collect(toList()); return new MigrateRegion( - Integer.parseInt(ctx.regionId.getText()), - Integer.parseInt(ctx.fromId.getText()), - Integer.parseInt(ctx.toId.getText())); + regionIds, Integer.parseInt(ctx.fromId.getText()), Integer.parseInt(ctx.toId.getText())); } @Override @@ -1606,7 +1606,7 @@ public Node visitRemoveRegionStatement(RelationalSqlParser.RemoveRegionStatement @Override public Node visitRemoveDataNodeStatement(RelationalSqlParser.RemoveDataNodeStatementContext ctx) { List nodeIds = - Collections.singletonList(Integer.parseInt(ctx.INTEGER_VALUE().getText())); + ctx.dataNodeIds.stream().map(token -> Integer.parseInt(token.getText())).collect(toList()); return new RemoveDataNode(nodeIds); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/MigrateRegionStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/MigrateRegionStatement.java index f13323542bbef..169665645ff60 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/MigrateRegionStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/MigrateRegionStatement.java @@ -33,26 +33,26 @@ * *

Here is the syntax definition: * - *

MIGRATE REGION regionid=INTEGER_LITERAL FROM fromid=INTEGER_LITERAL TO toid=INTEGERLITERAL + *

MIGRATE REGION regionIds+=INTEGER_LITERAL (COMMA regionIds+=INTEGER_LITERAL)* FROM + * fromid=INTEGER_LITERAL TO toid=INTEGER_LITERAL */ -// TODO: Whether to support more complex migration, such as, migrate all region from 1, 2 to 5, 6 public class MigrateRegionStatement extends Statement implements IConfigStatement { - private final int regionId; + private final List regionIds; private final int fromId; private final int toId; - public MigrateRegionStatement(int regionId, int fromId, int toId) { + public MigrateRegionStatement(List regionIds, int fromId, int toId) { super(); - this.regionId = regionId; + this.regionIds = regionIds; this.fromId = fromId; this.toId = toId; } - public int getRegionId() { - return regionId; + public List getRegionIds() { + return regionIds; } public int getFromId() { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/MigrateRegionMultiRegionParseTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/MigrateRegionMultiRegionParseTest.java new file mode 100644 index 0000000000000..77a573355dbe8 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/MigrateRegionMultiRegionParseTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.parser; + +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateRegionStatement; + +import org.junit.Test; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Parsing tests for the tree-model SQL that lets MIGRATE REGION move multiple regions from a single + * source DataNode to a single destination DataNode in one statement. + */ +public class MigrateRegionMultiRegionParseTest { + + private static Statement parse(String sql) { + return StatementGenerator.createStatement(sql, ZoneId.systemDefault()); + } + + @Test + public void testMigrateSingleRegion() { + Statement statement = parse("migrate region 1 from 2 to 3"); + assertTrue(statement instanceof MigrateRegionStatement); + MigrateRegionStatement migrateRegionStatement = (MigrateRegionStatement) statement; + assertEquals(Collections.singletonList(1), migrateRegionStatement.getRegionIds()); + assertEquals(2, migrateRegionStatement.getFromId()); + assertEquals(3, migrateRegionStatement.getToId()); + } + + @Test + public void testMigrateMultipleRegions() { + Statement statement = parse("migrate region 1, 2, 3 from 4 to 5"); + assertTrue(statement instanceof MigrateRegionStatement); + MigrateRegionStatement migrateRegionStatement = (MigrateRegionStatement) statement; + assertEquals(Arrays.asList(1, 2, 3), migrateRegionStatement.getRegionIds()); + assertEquals(4, migrateRegionStatement.getFromId()); + assertEquals(5, migrateRegionStatement.getToId()); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/RemoveDataNodeMultiNodeParseTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/RemoveDataNodeMultiNodeParseTest.java new file mode 100644 index 0000000000000..d71e443904575 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/RemoveDataNodeMultiNodeParseTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.parser; + +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.RemoveDataNodeStatement; + +import org.junit.Test; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Parsing tests for the tree-model SQL that lets REMOVE DATANODE remove multiple DataNodes in a + * single statement. + */ +public class RemoveDataNodeMultiNodeParseTest { + + private static Statement parse(String sql) { + return StatementGenerator.createStatement(sql, ZoneId.systemDefault()); + } + + @Test + public void testRemoveSingleDataNode() { + Statement statement = parse("remove datanode 3"); + assertTrue(statement instanceof RemoveDataNodeStatement); + assertEquals(Collections.singleton(3), ((RemoveDataNodeStatement) statement).getNodeIds()); + } + + @Test + public void testRemoveMultipleDataNodes() { + Statement statement = parse("remove datanode 3, 4, 5"); + assertTrue(statement instanceof RemoveDataNodeStatement); + RemoveDataNodeStatement removeDataNodeStatement = (RemoveDataNodeStatement) statement; + assertEquals(3, removeDataNodeStatement.getNodeIds().size()); + assertTrue(removeDataNodeStatement.getNodeIds().containsAll(Arrays.asList(3, 4, 5))); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/MigrateRegionMultiRegionStatementTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/MigrateRegionMultiRegionStatementTest.java new file mode 100644 index 0000000000000..72ff20f1392cf --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/MigrateRegionMultiRegionStatementTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.parser; + +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.InternalClientSession; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.MigrateRegion; + +import org.junit.Before; +import org.junit.Test; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Parsing tests for the table-model SQL that lets MIGRATE REGION move multiple regions from a + * single source DataNode to a single destination DataNode in one statement. + */ +public class MigrateRegionMultiRegionStatementTest { + + private SqlParser sqlParser; + private IClientSession clientSession; + + @Before + public void setUp() { + sqlParser = new SqlParser(); + clientSession = new InternalClientSession("testClient"); + } + + private Statement parse(String sql) { + return sqlParser.createStatement(sql, ZoneId.systemDefault(), clientSession); + } + + @Test + public void testMigrateSingleRegion() { + Statement statement = parse("migrate region 1 from 2 to 3"); + assertTrue(statement instanceof MigrateRegion); + MigrateRegion migrateRegion = (MigrateRegion) statement; + assertEquals(Collections.singletonList(1), migrateRegion.getRegionIds()); + assertEquals(2, migrateRegion.getFromId()); + assertEquals(3, migrateRegion.getToId()); + } + + @Test + public void testMigrateMultipleRegions() { + Statement statement = parse("migrate region 1, 2, 3 from 4 to 5"); + assertTrue(statement instanceof MigrateRegion); + MigrateRegion migrateRegion = (MigrateRegion) statement; + assertEquals(Arrays.asList(1, 2, 3), migrateRegion.getRegionIds()); + assertEquals(4, migrateRegion.getFromId()); + assertEquals(5, migrateRegion.getToId()); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/RemoveDataNodeMultiNodeStatementTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/RemoveDataNodeMultiNodeStatementTest.java new file mode 100644 index 0000000000000..06a794f2bdbdd --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/RemoveDataNodeMultiNodeStatementTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.parser; + +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.InternalClientSession; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveDataNode; + +import org.junit.Before; +import org.junit.Test; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Parsing tests for the table-model SQL that lets REMOVE DATANODE remove multiple DataNodes in a + * single statement. + */ +public class RemoveDataNodeMultiNodeStatementTest { + + private SqlParser sqlParser; + private IClientSession clientSession; + + @Before + public void setUp() { + sqlParser = new SqlParser(); + clientSession = new InternalClientSession("testClient"); + } + + private Statement parse(String sql) { + return sqlParser.createStatement(sql, ZoneId.systemDefault(), clientSession); + } + + @Test + public void testRemoveSingleDataNode() { + Statement statement = parse("remove datanode 3"); + assertTrue(statement instanceof RemoveDataNode); + assertEquals(Collections.singletonList(3), ((RemoveDataNode) statement).getNodeIds()); + } + + @Test + public void testRemoveMultipleDataNodes() { + Statement statement = parse("remove datanode 3, 4, 5"); + assertTrue(statement instanceof RemoveDataNode); + assertEquals(Arrays.asList(3, 4, 5), ((RemoveDataNode) statement).getNodeIds()); + } +} diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index 5ae0c5d39e1bc..6bebd2bc08eee 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -644,7 +644,7 @@ showSeriesSlotListStatement ; migrateRegionStatement - : MIGRATE REGION regionId=INTEGER_VALUE FROM fromId=INTEGER_VALUE TO toId=INTEGER_VALUE + : MIGRATE REGION regionIds+=INTEGER_VALUE (',' regionIds+=INTEGER_VALUE)* FROM fromId=INTEGER_VALUE TO toId=INTEGER_VALUE ; reconstructRegionStatement @@ -660,7 +660,7 @@ removeRegionStatement ; removeDataNodeStatement - : REMOVE DATANODE dataNodeId=INTEGER_VALUE + : REMOVE DATANODE dataNodeIds+=INTEGER_VALUE (',' dataNodeIds+=INTEGER_VALUE)* ; removeConfigNodeStatement diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 852460fb60e8b..d5ad892880222 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -341,7 +341,7 @@ struct TGetSeriesSlotListResp { } struct TMigrateRegionReq { - 1: required i32 regionId + 1: required list regionIds 2: required i32 fromId 3: required i32 toId 4: required common.Model model