Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.confignode.it.partition;

import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
Expand All @@ -34,6 +35,7 @@
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
Expand Down Expand Up @@ -128,4 +130,38 @@ public void testConcurrentSubmitDataPartitionTableIntegrityCheckProcedure()
Assert.assertEquals(
"The other concurrent submissions should be rejected", threadCount - 1, failCount.get());
}

@Test
public void testShowRepairDataPartitionTableProgress() throws Exception {
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {
assertRepairProgress(statement, RepairDataPartitionTableProgressState.IDLE.name(), 0.0, 0.0);

statement.execute("REPAIR DATA PARTITION TABLE");
assertRepairProgress(statement, null, 0.0, 100.0);
}
}

private static void assertRepairProgress(
final Statement statement,
final String expectedStatus,
final double minProgress,
final double maxProgress)
throws SQLException {
try (final ResultSet resultSet =
statement.executeQuery("SHOW REPAIR DATA PARTITION TABLE PROGRESS")) {
Assert.assertTrue(resultSet.next());
if (expectedStatus != null) {
Assert.assertEquals(expectedStatus, resultSet.getString("Status"));
} else {
Assert.assertNotEquals(
RepairDataPartitionTableProgressState.UNKNOWN.name(), resultSet.getString("Status"));
}
final double progress = resultSet.getDouble("Progress(%)");
Assert.assertTrue(progress >= minProgress);
Assert.assertTrue(progress <= maxProgress);
Assert.assertNotNull(resultSet.getString("Message"));
Assert.assertFalse(resultSet.next());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ keyWords
| PRIVILEGES
| PRIVILEGE_VALUE
| PROCESSLIST
| PROGRESS
| PROCESSOR
| PROPERTY
| PRUNE
Expand Down Expand Up @@ -298,4 +299,4 @@ keyWords
| OPTION
| INF
| CURRENT_TIMESTAMP
;
;
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ utilityStatement
| showQueries | showDiskUsage | showCurrentTimestamp | killQuery | grantWatermarkEmbedding
| revokeWatermarkEmbedding | loadConfiguration | loadTimeseries | loadFile
| removeFile | unloadFile | setSqlDialect | showCurrentSqlDialect | showCurrentUser
| repairDataPartitionTable
| repairDataPartitionTable | showRepairDataPartitionTableProgress
;

/**
Expand Down Expand Up @@ -1283,6 +1283,11 @@ repairDataPartitionTable
: REPAIR DATA PARTITION TABLE
;

// Show Repair Data Partition Table Progress
showRepairDataPartitionTableProgress
: SHOW REPAIR DATA PARTITION TABLE PROGRESS
;

// Explain
explain
: EXPLAIN (ANALYZE VERBOSE?)? selectStatement?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,10 @@ REPAIR
: R E P A I R
;

PROGRESS
: P R O G R E S S
;

SCHEMA_REPLICATION_FACTOR
: S C H E M A '_' R E P L I C A T I O N '_' F A C T O R
;
Expand Down Expand Up @@ -1411,4 +1415,4 @@ fragment V: [vV];
fragment W: [wW];
fragment X: [xX];
fragment Y: [yY];
fragment Z: [zZ];
fragment Z: [zZ];
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum CnToDnSyncRequestType {
COLLECT_EARLIEST_TIMESLOTS,
GENERATE_DATA_PARTITION_TABLE,
GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS,

// PartitionCache
INVALIDATE_PARTITION_CACHE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ private void buildActionMap() {
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
(req, client) ->
client.generateDataPartitionTableHeartbeat((TGenerateDataPartitionTableReq) req));
actionMapBuilder.put(
CnToDnSyncRequestType.GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS,
(req, client) -> client.getDataPartitionTableGeneratorProgress());
actionMap = actionMapBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iotdb.commons.conf.ConfigurationFileUtils;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.conf.TrimProperties;
import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
Expand Down Expand Up @@ -238,6 +239,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTable4InformationSchemaResp;
Expand Down Expand Up @@ -1180,6 +1182,18 @@ public TSStatus dataPartitionTableIntegrityCheck() {
return partitionManager.dataPartitionTableIntegrityCheck();
}

@Override
public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() {
TSStatus status = confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return new TShowRepairDataPartitionTableProgressResp(
status, RepairDataPartitionTableProgressState.UNKNOWN.name(), 0.0)
.setMessage(status.getMessage());
}

return partitionManager.showRepairDataPartitionTableProgress();
}

private void printNewCreatedDataPartition(
GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan, TDataPartitionTableResp resp) {
final String lineSeparator = System.lineSeparator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTable4InformationSchemaResp;
Expand Down Expand Up @@ -480,6 +481,8 @@ TDataPartitionTableResp getOrCreateDataPartition(

TSStatus dataPartitionTableIntegrityCheck();

TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress();

/**
* Get AuditLogger.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure;
import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
Expand Down Expand Up @@ -2468,6 +2469,18 @@ public boolean isExistUnfinishedProcedure(
return false;
}

public Optional<DataPartitionTableIntegrityCheckProcedure>
getUnfinishedDataPartitionTableIntegrityCheckProcedure() {
for (Procedure<ConfigNodeProcedureEnv> procedure : getExecutor().getProcedures().values()) {
if (!procedure.isFinished()
&& procedure instanceof DataPartitionTableIntegrityCheckProcedure) {
return Optional.of((DataPartitionTableIntegrityCheckProcedure) procedure);
}
}

return Optional.empty();
}

// ======================================================
/*
GET-SET Region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
Expand Down Expand Up @@ -539,6 +541,20 @@ public void markDataPartitionTableIntegrityCheckProcedureFinished() {
dataPartitionTableIntegrityCheckProcedureRunning.set(false);
}

public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() {
return configManager
.getProcedureManager()
.getUnfinishedDataPartitionTableIntegrityCheckProcedure()
.map(DataPartitionTableIntegrityCheckProcedure::getProgress)
.orElseGet(
() ->
new TShowRepairDataPartitionTableProgressResp(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
RepairDataPartitionTableProgressState.IDLE.name(),
0.0)
.setMessage("No running DataPartitionTable integrity check procedure"));
}

private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) {
TSStatus status = getConsensusManager().confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
Expand Down
Loading
Loading