Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,18 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.MULTI_DATA_DIRS;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitModsVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitRegionReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileResourceVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.getReplicaDataNodeIds;

/**
* Tree-model coverage for IoTConsensus region migration over multiple data dirs: a deletion (mods)
Expand All @@ -56,9 +61,6 @@
@Category({ClusterIT.class})
public class IoTDBRegionMigrateWithDeletionMultiDataDirIT {

private static final String MULTI_DATA_DIRS =
"data/datanode/data/disk0,data/datanode/data/disk1,data/datanode/data/disk2";

@Before
public void setUp() throws Exception {
EnvFactory.getEnv()
Expand All @@ -83,13 +85,21 @@ public void testRegionMigratePreservesDeletionWithMultiDataDirs() throws Excepti
statement.execute(
"INSERT INTO root.db.d1(timestamp, s1) VALUES (100, 100), (200, 200), (300, 300)");
statement.execute("FLUSH");
statement.execute("DELETE FROM root.db.d1.s1 WHERE time <= 200");
statement.execute("FLUSH");

Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader =
getDataRegionMapWithLeader(statement);
int dataRegionIdForTest =
dataRegionMapWithLeader.keySet().stream().max(Integer::compareTo).orElseThrow();
Set<Integer> initialReplicaDataNodeIds =
getReplicaDataNodeIds(statement, dataRegionIdForTest);

awaitTsFileVisibleOnReplicas("root.db", dataRegionIdForTest, initialReplicaDataNodeIds);
awaitTsFileResourceVisibleOnReplicas(
statement, "root.db", dataRegionIdForTest, initialReplicaDataNodeIds);

statement.execute("DELETE FROM root.db.d1.s1 WHERE time <= 200");
statement.execute("FLUSH");
awaitModsVisibleOnReplicas("root.db", dataRegionIdForTest, initialReplicaDataNodeIds);
assertDeletionVisibleOnAllReplicas(dataRegionIdForTest, 1);

Pair<Integer, Set<Integer>> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest);
Expand All @@ -107,25 +117,8 @@ public void testRegionMigratePreservesDeletionWithMultiDataDirs() throws Excepti
String.format(
"migrate region %d from %d to %d", dataRegionIdForTest, leaderId, destDataNodeId));

final int finalDestDataNodeId = destDataNodeId;
Awaitility.await()
.atMost(10, TimeUnit.MINUTES)
.pollDelay(1, TimeUnit.SECONDS)
.pollInterval(2, TimeUnit.SECONDS)
.untilAsserted(
() -> {
try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) {
boolean migrated = false;
while (showRegions.next()) {
if (showRegions.getInt("RegionId") == dataRegionIdForTest
&& showRegions.getInt("DataNodeId") == finalDestDataNodeId) {
migrated = true;
break;
}
}
Assert.assertTrue(migrated);
}
});
awaitRegionReplicas(statement, dataRegionIdForTest, Set.of(followerId, destDataNodeId));
awaitModsVisibleOnReplicas("root.db", dataRegionIdForTest, Set.of(destDataNodeId));

assertDeletionVisibleOnAllReplicas(dataRegionIdForTest, 1);
}
Expand Down Expand Up @@ -164,19 +157,4 @@ private void assertDeletionVisibleOnReplica(DataNodeWrapper dataNodeWrapper, int
}
}
}

private Set<Integer> getReplicaDataNodeIds(Statement statement, int dataRegionId)
throws Exception {
Set<Integer> replicaDataNodeIds = new HashSet<>();
try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) {
while (showRegions.next()) {
if ("DataRegion".equals(showRegions.getString("Type"))
&& showRegions.getInt("RegionId") == dataRegionId) {
replicaDataNodeIds.add(showRegions.getInt("DataNodeId"));
}
}
}
Assert.assertFalse(replicaDataNodeIds.isEmpty());
return replicaDataNodeIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,18 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.MULTI_DATA_DIRS;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitModsVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitRegionReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileResourceVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.awaitTsFileVisibleOnReplicas;
import static org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1.RegionMigrateFileAssertions.getReplicaDataNodeIds;

/**
* Table-model twin of {@link IoTDBRegionMigrateWithDeletionMultiDataDirIT}: a deletion (mods) must
Expand All @@ -56,9 +61,6 @@
@Category({TableClusterIT.class})
public class IoTDBRegionMigrateWithDeletionMultiDataDirTableIT {

private static final String MULTI_DATA_DIRS =
"data/datanode/data/disk0,data/datanode/data/disk1,data/datanode/data/disk2";

@Before
public void setUp() throws Exception {
EnvFactory.getEnv()
Expand All @@ -84,13 +86,21 @@ public void testRegionMigratePreservesDeletionWithMultiDataDirs() throws Excepti
statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)");
statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100), (200, 200), (300, 300)");
statement.execute("FLUSH");
statement.execute("DELETE FROM t1 WHERE time <= 200");
statement.execute("FLUSH");

Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader =
getDataRegionMapWithLeader(statement);
int dataRegionIdForTest =
dataRegionMapWithLeader.keySet().stream().max(Integer::compareTo).orElseThrow();
Set<Integer> initialReplicaDataNodeIds =
getReplicaDataNodeIds(statement, dataRegionIdForTest);

awaitTsFileVisibleOnReplicas("test", dataRegionIdForTest, initialReplicaDataNodeIds);
awaitTsFileResourceVisibleOnReplicas(
statement, "test", dataRegionIdForTest, initialReplicaDataNodeIds);

statement.execute("DELETE FROM t1 WHERE time <= 200");
statement.execute("FLUSH");
awaitModsVisibleOnReplicas("test", dataRegionIdForTest, initialReplicaDataNodeIds);
assertDeletionVisibleOnAllReplicas(statement, dataRegionIdForTest, 1);

Pair<Integer, Set<Integer>> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest);
Expand All @@ -108,25 +118,8 @@ public void testRegionMigratePreservesDeletionWithMultiDataDirs() throws Excepti
String.format(
"migrate region %d from %d to %d", dataRegionIdForTest, leaderId, destDataNodeId));

final int finalDestDataNodeId = destDataNodeId;
Awaitility.await()
.atMost(10, TimeUnit.MINUTES)
.pollDelay(1, TimeUnit.SECONDS)
.pollInterval(2, TimeUnit.SECONDS)
.untilAsserted(
() -> {
try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) {
boolean migrated = false;
while (showRegions.next()) {
if (showRegions.getInt("RegionId") == dataRegionIdForTest
&& showRegions.getInt("DataNodeId") == finalDestDataNodeId) {
migrated = true;
break;
}
}
Assert.assertTrue(migrated);
}
});
awaitRegionReplicas(statement, dataRegionIdForTest, Set.of(followerId, destDataNodeId));
awaitModsVisibleOnReplicas("test", dataRegionIdForTest, Set.of(destDataNodeId));

assertDeletionVisibleOnAllReplicas(statement, dataRegionIdForTest, 1);
}
Expand Down Expand Up @@ -167,19 +160,4 @@ private void assertDeletionVisibleOnReplica(DataNodeWrapper dataNodeWrapper, int
}
}
}

private Set<Integer> getReplicaDataNodeIds(Statement statement, int dataRegionId)
throws Exception {
Set<Integer> replicaDataNodeIds = new HashSet<>();
try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) {
while (showRegions.next()) {
if ("DataRegion".equals(showRegions.getString("Type"))
&& showRegions.getInt("RegionId") == dataRegionId) {
replicaDataNodeIds.add(showRegions.getInt("DataNodeId"));
}
}
}
Assert.assertFalse(replicaDataNodeIds.isEmpty());
return replicaDataNodeIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;

import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;

import org.awaitility.Awaitility;
import org.junit.Assert;

import java.io.File;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

final class RegionMigrateFileAssertions {

static final String MULTI_DATA_DIRS =
"data/datanode/data/disk0,data/datanode/data/disk1,data/datanode/data/disk2";

private static final String SEQUENCE_FOLDER = "sequence";
private static final String TSFILE_SUFFIX = ".tsfile";
private static final String TSFILE_RESOURCE_SUFFIX = ".tsfile.resource";
private static final String MODS_SUFFIX = ".mods2";

private RegionMigrateFileAssertions() {}

static void awaitTsFileVisibleOnReplicas(
String database, int dataRegionId, Set<Integer> dataNodeIds) {
awaitFileVisibleOnReplicas(database, dataRegionId, dataNodeIds, TSFILE_SUFFIX);
}

static void awaitTsFileResourceVisibleOnReplicas(
String database, int dataRegionId, Set<Integer> dataNodeIds) {
awaitFileVisibleOnReplicas(database, dataRegionId, dataNodeIds, TSFILE_RESOURCE_SUFFIX);
}

static void awaitTsFileResourceVisibleOnReplicas(
Statement statement, String database, int dataRegionId, Set<Integer> dataNodeIds) {
awaitFileVisibleOnReplicas(
statement, database, dataRegionId, dataNodeIds, TSFILE_RESOURCE_SUFFIX);
}

static void awaitModsVisibleOnReplicas(
String database, int dataRegionId, Set<Integer> dataNodeIds) {
awaitFileVisibleOnReplicas(database, dataRegionId, dataNodeIds, MODS_SUFFIX);
}

static void awaitRegionReplicas(
Statement statement, int dataRegionId, Set<Integer> expectedReplicaDataNodeIds) {
Awaitility.await()
.atMost(10, TimeUnit.MINUTES)
.pollDelay(1, TimeUnit.SECONDS)
.pollInterval(2, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assert.assertEquals(
expectedReplicaDataNodeIds, getReplicaDataNodeIds(statement, dataRegionId)));
}

static Set<Integer> getReplicaDataNodeIds(Statement statement, int dataRegionId)
throws Exception {
Set<Integer> replicaDataNodeIds = new HashSet<>();
try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) {
while (showRegions.next()) {
if ("DataRegion".equals(showRegions.getString("Type"))
&& showRegions.getInt("RegionId") == dataRegionId) {
replicaDataNodeIds.add(showRegions.getInt("DataNodeId"));
}
}
}
Assert.assertFalse(replicaDataNodeIds.isEmpty());
return replicaDataNodeIds;
}

private static void awaitFileVisibleOnReplicas(
String database, int dataRegionId, Set<Integer> dataNodeIds, String suffix) {
awaitFileVisibleOnReplicas(null, database, dataRegionId, dataNodeIds, suffix);
}

private static void awaitFileVisibleOnReplicas(
Statement flushStatement,
String database,
int dataRegionId,
Set<Integer> dataNodeIds,
String suffix) {
for (int dataNodeId : dataNodeIds) {
DataNodeWrapper dataNodeWrapper =
EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).orElseThrow();
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.pollDelay(500, TimeUnit.MILLISECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.untilAsserted(
() -> {
if (flushStatement != null) {
flushStatement.execute("FLUSH");
}
Assert.assertTrue(
String.format(
"Expected file with suffix %s for database %s region %d on DataNode %d",
suffix, database, dataRegionId, dataNodeId),
containsSequenceFileWithSuffix(
dataNodeWrapper, database, dataRegionId, suffix));
});
}
}

private static boolean containsSequenceFileWithSuffix(
DataNodeWrapper dataNodeWrapper, String database, int dataRegionId, String suffix) {
for (String dataDir : MULTI_DATA_DIRS.split(",")) {
File regionDir =
new File(
dataNodeWrapper.getNodePath(),
dataDir
+ File.separator
+ SEQUENCE_FOLDER
+ File.separator
+ database
+ File.separator
+ dataRegionId);
if (containsFileWithSuffix(regionDir, suffix)) {
return true;
}
}
return false;
}

private static boolean containsFileWithSuffix(File file, String suffix) {
if (!file.exists()) {
return false;
}
if (file.isFile()) {
// IoTConsensus followers can create an open zero-byte TsFile before a later FLUSH closes it.
return file.getName().endsWith(suffix) && (TSFILE_SUFFIX.equals(suffix) || file.length() > 0);
}
File[] children = file.listFiles();
if (children == null) {
return false;
}
for (File child : children) {
if (containsFileWithSuffix(child, suffix)) {
return true;
}
}
return false;
}
}
Loading