diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java index e93856cacd1e5..da54c39479e0d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.it.partition; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; @@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -128,4 +130,38 @@ public void testConcurrentSubmitDataPartitionTableIntegrityCheckProcedure() Assert.assertEquals( "The other concurrent submissions should be rejected", threadCount - 1, failCount.get()); } + + @Test + public void testShowRepairDataPartitionTableProgress() throws Exception { + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement()) { + assertRepairProgress(statement, RepairDataPartitionTableProgressState.IDLE.name(), 0.0, 0.0); + + statement.execute("REPAIR DATA PARTITION TABLE"); + assertRepairProgress(statement, null, 0.0, 100.0); + } + } + + private static void assertRepairProgress( + final Statement statement, + final String expectedStatus, + final double minProgress, + final double maxProgress) + throws SQLException { + try (final ResultSet resultSet = + statement.executeQuery("SHOW REPAIR DATA PARTITION TABLE PROGRESS")) { + Assert.assertTrue(resultSet.next()); + if (expectedStatus != null) { + Assert.assertEquals(expectedStatus, resultSet.getString("Status")); + } else { + Assert.assertNotEquals( + RepairDataPartitionTableProgressState.UNKNOWN.name(), resultSet.getString("Status")); + } + final double progress = resultSet.getDouble("Progress(%)"); + Assert.assertTrue(progress >= minProgress); + Assert.assertTrue(progress <= maxProgress); + Assert.assertNotNull(resultSet.getString("Message")); + Assert.assertFalse(resultSet.next()); + } + } } diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 index e96d84e892c8c..135b674554b52 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 @@ -189,6 +189,7 @@ keyWords | PRIVILEGES | PRIVILEGE_VALUE | PROCESSLIST + | PROGRESS | PROCESSOR | PROPERTY | PRUNE @@ -298,4 +299,4 @@ keyWords | OPTION | INF | CURRENT_TIMESTAMP - ; \ No newline at end of file + ; diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index 2f91b573aef0e..b106fac032d79 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -96,7 +96,7 @@ utilityStatement | showQueries | showDiskUsage | showCurrentTimestamp | killQuery | grantWatermarkEmbedding | revokeWatermarkEmbedding | loadConfiguration | loadTimeseries | loadFile | removeFile | unloadFile | setSqlDialect | showCurrentSqlDialect | showCurrentUser - | repairDataPartitionTable + | repairDataPartitionTable | showRepairDataPartitionTableProgress ; /** @@ -1283,6 +1283,11 @@ repairDataPartitionTable : REPAIR DATA PARTITION TABLE ; +// Show Repair Data Partition Table Progress +showRepairDataPartitionTableProgress + : SHOW REPAIR DATA PARTITION TABLE PROGRESS + ; + // Explain explain : EXPLAIN (ANALYZE VERBOSE?)? selectStatement? diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 index 3950de50ea477..4d915459c39a8 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 @@ -1215,6 +1215,10 @@ REPAIR : R E P A I R ; +PROGRESS + : P R O G R E S S + ; + SCHEMA_REPLICATION_FACTOR : S C H E M A '_' R E P L I C A T I O N '_' F A C T O R ; @@ -1411,4 +1415,4 @@ fragment V: [vV]; fragment W: [wW]; fragment X: [xX]; fragment Y: [yY]; -fragment Z: [zZ]; \ No newline at end of file +fragment Z: [zZ]; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java index 790fd637d616a..ca7fc7a37b2af 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java @@ -41,6 +41,7 @@ public enum CnToDnSyncRequestType { COLLECT_EARLIEST_TIMESLOTS, GENERATE_DATA_PARTITION_TABLE, GENERATE_DATA_PARTITION_TABLE_HEART_BEAT, + GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS, // PartitionCache INVALIDATE_PARTITION_CACHE, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java index 14c833fb2fc18..f35b6d906a525 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java @@ -151,6 +151,9 @@ private void buildActionMap() { CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT, (req, client) -> client.generateDataPartitionTableHeartbeat((TGenerateDataPartitionTableReq) req)); + actionMapBuilder.put( + CnToDnSyncRequestType.GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS, + (req, client) -> client.getDataPartitionTableGeneratorProgress()); actionMap = actionMapBuilder.build(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index da10bb2228378..90766221ddde0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -47,6 +47,7 @@ import org.apache.iotdb.commons.conf.ConfigurationFileUtils; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.conf.TrimProperties; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.MeasurementPath; @@ -238,6 +239,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp; import org.apache.iotdb.confignode.rpc.thrift.TShowTable4InformationSchemaResp; @@ -1180,6 +1182,18 @@ public TSStatus dataPartitionTableIntegrityCheck() { return partitionManager.dataPartitionTableIntegrityCheck(); } + @Override + public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() { + TSStatus status = confirmLeader(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return new TShowRepairDataPartitionTableProgressResp( + status, RepairDataPartitionTableProgressState.UNKNOWN.name(), 0.0) + .setMessage(status.getMessage()); + } + + return partitionManager.showRepairDataPartitionTableProgress(); + } + private void printNewCreatedDataPartition( GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan, TDataPartitionTableResp resp) { final String lineSeparator = System.lineSeparator(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index fa708dbaeca83..1dc097b34440b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -153,6 +153,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp; import org.apache.iotdb.confignode.rpc.thrift.TShowTable4InformationSchemaResp; @@ -480,6 +481,8 @@ TDataPartitionTableResp getOrCreateDataPartition( TSStatus dataPartitionTableIntegrityCheck(); + TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress(); + /** * Get AuditLogger. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 2beeea6b4af4f..89dffce023e51 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -71,6 +71,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure; +import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure; @@ -2468,6 +2469,18 @@ public boolean isExistUnfinishedProcedure( return false; } + public Optional + getUnfinishedDataPartitionTableIntegrityCheckProcedure() { + for (Procedure procedure : getExecutor().getProcedures().values()) { + if (!procedure.isFinished() + && procedure instanceof DataPartitionTableIntegrityCheckProcedure) { + return Optional.of((DataPartitionTableIntegrityCheckProcedure) procedure); + } + } + + return Optional.empty(); + } + // ====================================================== /* GET-SET Region diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 0edb0e389dc3a..d8336fb93da88 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -32,6 +32,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; @@ -90,6 +91,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; @@ -539,6 +541,20 @@ public void markDataPartitionTableIntegrityCheckProcedureFinished() { dataPartitionTableIntegrityCheckProcedureRunning.set(false); } + public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() { + return configManager + .getProcedureManager() + .getUnfinishedDataPartitionTableIntegrityCheckProcedure() + .map(DataPartitionTableIntegrityCheckProcedure::getProgress) + .orElseGet( + () -> + new TShowRepairDataPartitionTableProgressResp( + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), + RepairDataPartitionTableProgressState.IDLE.name(), + 0.0) + .setMessage("No running DataPartitionTable integrity check procedure")); + } + private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) { TSStatus status = getConsensusManager().confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java index cc738669e4149..50f13039cc624 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java @@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable; import org.apache.iotdb.commons.partition.SeriesPartitionTable; @@ -42,11 +43,13 @@ import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; import org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; @@ -128,6 +131,9 @@ public class DataPartitionTableIntegrityCheckProcedure // ============Need serialize END=============/ + private final Map dataNodeGeneratorProgress = new ConcurrentHashMap<>(); + private volatile Set dataNodeGeneratorTargetDataNodeIds = Collections.emptySet(); + public DataPartitionTableIntegrityCheckProcedure() { super(); } @@ -149,7 +155,7 @@ protected Flow executeFromState( // Ensure to get the real-time DataNodes in the current cluster at every step dataNodeManager = env.getConfigManager().getNodeManager(); loadManager = env.getConfigManager().getLoadManager(); - allDataNodes = dataNodeManager.getRegisteredDataNodes(); + allDataNodes = new ArrayList<>(dataNodeManager.getRegisteredDataNodes()); switch (state) { case COLLECT_EARLIEST_TIMESLOTS: @@ -253,8 +259,9 @@ private Flow collectEarliestTimeslots() { } // Collect earliest timeslots from all DataNodes - allDataNodes.removeAll(skipDataNodes); - for (TDataNodeConfiguration dataNode : allDataNodes) { + final List targetDataNodes = new ArrayList<>(allDataNodes); + targetDataNodes.removeAll(skipDataNodes); + for (TDataNodeConfiguration dataNode : targetDataNodes) { // Check if DataNode is alive before sending request NodeStatus nodeStatus = loadManager.getNodeStatus(dataNode.getLocation().getDataNodeId()); if (!NodeStatus.Running.equals(nodeStatus)) { @@ -315,12 +322,12 @@ private Flow collectEarliestTimeslots() { if (LOG.isDebugEnabled()) { LOG.debug( "Collected earliest timeslots from {} DataNodes: {}, the number of successful DataNodes is {}", - allDataNodes.size(), + targetDataNodes.size(), earliestTimeslots, - allDataNodes.size() - failedDataNodes.size()); + targetDataNodes.size() - failedDataNodes.size()); } - if (failedDataNodes.size() == allDataNodes.size()) { + if (countFailedTargetDataNodes(targetDataNodes) == targetDataNodes.size()) { delayRollbackNextState( DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); } else { @@ -454,18 +461,22 @@ private Flow requestPartitionTables() { return Flow.HAS_MORE_STATE; } - allDataNodes.removeAll(skipDataNodes); - allDataNodes.removeAll(failedDataNodes); - for (TDataNodeConfiguration dataNode : allDataNodes) { + final List targetDataNodes = new ArrayList<>(allDataNodes); + targetDataNodes.removeAll(skipDataNodes); + targetDataNodes.removeAll(failedDataNodes); + refreshDataNodeGeneratorTarget(targetDataNodes); + for (TDataNodeConfiguration dataNode : targetDataNodes) { int dataNodeId = dataNode.getLocation().getDataNodeId(); // Check if DataNode is alive before sending request NodeStatus nodeStatus = loadManager.getNodeStatus(dataNodeId); if (!NodeStatus.Running.equals(nodeStatus)) { failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); continue; } if (!dataPartitionTables.containsKey(dataNodeId)) { + dataNodeGeneratorProgress.put(dataNodeId, 0.0); try { TGenerateDataPartitionTableReq req = new TGenerateDataPartitionTableReq(); req.setDatabases(databasesWithLostDataPartition); @@ -479,6 +490,7 @@ private Flow requestPartitionTables() { if (response instanceof TSStatus) { failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); LOG.error( "[DataPartitionIntegrity] Failed to request DataPartitionTable generation from the DataNode[id={}], already out of max retry time", dataNode.getLocation().getDataNodeId()); @@ -488,6 +500,7 @@ private Flow requestPartitionTables() { TGenerateDataPartitionTableResp resp = (TGenerateDataPartitionTableResp) response; if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); LOG.error( "[DataPartitionIntegrity] Failed to request DataPartitionTable generation from the DataNode[id={}], response status is {}", dataNode.getLocation().getDataNodeId(), @@ -495,6 +508,7 @@ private Flow requestPartitionTables() { } } catch (Exception e) { failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); LOG.error( "[DataPartitionIntegrity] Failed to request DataPartitionTable generation from DataNode[id={}]: {}", dataNodeId, @@ -504,7 +518,7 @@ private Flow requestPartitionTables() { } } - if (failedDataNodes.size() == allDataNodes.size()) { + if (countFailedTargetDataNodes(targetDataNodes) == targetDataNodes.size()) { delayRollbackNextState( DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); return Flow.HAS_MORE_STATE; @@ -520,13 +534,19 @@ private Flow requestPartitionTablesHeartBeat() { LOG.debug(ProcedureMessages.CHECKING_DATAPARTITIONTABLE_GENERATION_COMPLETION_STATUS); } + final List targetDataNodes = new ArrayList<>(allDataNodes); + targetDataNodes.removeAll(skipDataNodes); + targetDataNodes.removeAll(failedDataNodes); + refreshDataNodeGeneratorTarget(targetDataNodes); + int completeCount = 0; - for (TDataNodeConfiguration dataNode : allDataNodes) { + for (TDataNodeConfiguration dataNode : targetDataNodes) { int dataNodeId = dataNode.getLocation().getDataNodeId(); // Check if DataNode is alive before sending request NodeStatus nodeStatus = loadManager.getNodeStatus(dataNodeId); if (!NodeStatus.Running.equals(nodeStatus)) { failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); continue; } @@ -544,6 +564,7 @@ private Flow requestPartitionTablesHeartBeat() { if (response instanceof TSStatus) { failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); LOG.error( "[DataPartitionIntegrity] Failed to request DataPartitionTable generation heart beat from the DataNode[id={}], already out of max retry time", dataNode.getLocation().getDataNodeId()); @@ -570,18 +591,21 @@ private Flow requestPartitionTablesHeartBeat() { List databaseScopedDataPartitionTableList = deserializeDatabaseScopedTableList(byteBufferList); dataPartitionTables.put(dataNodeId, databaseScopedDataPartitionTableList); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); LOG.info( "[DataPartitionIntegrity] DataNode {} completed DataPartitionTable generation, terminating heart beat", dataNodeId); completeCount++; break; case IN_PROGRESS: + dataNodeGeneratorProgress.put(dataNodeId, clampProgress(resp.getProgress())); LOG.info( "[DataPartitionIntegrity] DataNode {} still generating DataPartitionTable", dataNodeId); break; default: failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); LOG.error( "[DataPartitionIntegrity] DataNode {} returned unknown error code: {}", dataNodeId, @@ -594,21 +618,23 @@ private Flow requestPartitionTablesHeartBeat() { dataNodeId, e.getMessage(), e); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); completeCount++; } } else { + dataNodeGeneratorProgress.put(dataNodeId, 1.0); completeCount++; } } - if (completeCount >= allDataNodes.size()) { + if (completeCount >= targetDataNodes.size()) { setNextState(DataPartitionTableIntegrityCheckProcedureState.MERGE_PARTITION_TABLES); return Flow.HAS_MORE_STATE; } // Don't find any one data partition table generation task on all registered DataNodes, go back // to the REQUEST_PARTITION_TABLES step and re-execute - if (failedDataNodes.size() == allDataNodes.size()) { + if (countFailedTargetDataNodes(targetDataNodes) == targetDataNodes.size()) { delayRollbackNextState( DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES); return Flow.HAS_MORE_STATE; @@ -1114,4 +1140,93 @@ public void setSkipDataNodes(Set skipDataNodes) { public void setFailedDataNodes(Set failedDataNodes) { this.failedDataNodes = failedDataNodes; } + + public TShowRepairDataPartitionTableProgressResp getProgress() { + try { + final DataPartitionTableIntegrityCheckProcedureState currentState = getCurrentState(); + final String state = getProgressStateName(currentState); + final double progress = + currentState == null ? 0.0 : calculateProgressByState(currentState) * 100; + + return new TShowRepairDataPartitionTableProgressResp( + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), state, progress) + .setMessage( + String.format("DataPartitionTable integrity check progress: %.1f%%", progress)); + } catch (Exception e) { + LOG.warn("Failed to show DataPartitionTable integrity check progress", e); + return new TShowRepairDataPartitionTableProgressResp( + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), + RepairDataPartitionTableProgressState.UNKNOWN.name(), + 0.0) + .setMessage("Failed to show DataPartitionTable integrity check progress"); + } + } + + private double calculateProgressByState( + final DataPartitionTableIntegrityCheckProcedureState currentState) { + switch (currentState) { + case COLLECT_EARLIEST_TIMESLOTS: + return 0.0; + case ANALYZE_MISSING_PARTITIONS: + return 0.05; + case REQUEST_PARTITION_TABLES: + return 0.1; + case REQUEST_PARTITION_TABLES_HEART_BEAT: + return 0.1 + 0.8 * calculateDataNodeGeneratorProgress(); + case MERGE_PARTITION_TABLES: + return 0.95; + case WRITE_PARTITION_TABLE_TO_CONSENSUS: + return 0.99; + default: + LOG.warn( + "Encountered unexpected DataPartitionTableIntegrityCheckProcedureState {} when showing progress", + currentState); + return 0.0; + } + } + + private String getProgressStateName( + final DataPartitionTableIntegrityCheckProcedureState currentState) { + if (currentState == null) { + return RepairDataPartitionTableProgressState.UNKNOWN.name(); + } + try { + return RepairDataPartitionTableProgressState.valueOf(currentState.name()).name(); + } catch (IllegalArgumentException e) { + LOG.warn( + "Unexpected DataPartitionTableIntegrityCheckProcedureState {} when showing progress", + currentState); + return RepairDataPartitionTableProgressState.UNKNOWN.name(); + } + } + + private double calculateDataNodeGeneratorProgress() { + final Set currentTargetDataNodeIds = dataNodeGeneratorTargetDataNodeIds; + if (currentTargetDataNodeIds.isEmpty()) { + return dataPartitionTables.isEmpty() ? 0.0 : 1.0; + } + + double progressSum = 0.0; + for (int dataNodeId : currentTargetDataNodeIds) { + progressSum += clampProgress(dataNodeGeneratorProgress.getOrDefault(dataNodeId, 0.0)); + } + return clampProgress(progressSum / currentTargetDataNodeIds.size()); + } + + private void refreshDataNodeGeneratorTarget(final List targetDataNodes) { + final Set targetDataNodeIds = new HashSet<>(); + for (TDataNodeConfiguration dataNode : targetDataNodes) { + targetDataNodeIds.add(dataNode.getLocation().getDataNodeId()); + } + dataNodeGeneratorProgress.keySet().retainAll(targetDataNodeIds); + dataNodeGeneratorTargetDataNodeIds = Collections.unmodifiableSet(targetDataNodeIds); + } + + private long countFailedTargetDataNodes(final List targetDataNodes) { + return targetDataNodes.stream().filter(failedDataNodes::contains).count(); + } + + private double clampProgress(final double progress) { + return Math.max(0.0, Math.min(1.0, progress)); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 5b26a6f7d6dc7..1d0b7612dbd6e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -213,6 +213,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp; import org.apache.iotdb.confignode.rpc.thrift.TShowTTLResp; @@ -639,6 +640,11 @@ public TSStatus dataPartitionTableIntegrityCheck() { return configManager.dataPartitionTableIntegrityCheck(); } + @Override + public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() { + return configManager.showRepairDataPartitionTableProgress(); + } + @Override public TSStatus operatePermission(final TAuthorizerReq req) { ConfigPhysicalPlanType configPhysicalPlanType = diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java index a0cc76fe5c9b1..4f2fcfe27519d 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java @@ -23,10 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TNodeResource; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable; import org.apache.iotdb.confignode.procedure.Procedure; import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.tsfile.utils.PublicBAOS; import org.junit.Assert; @@ -69,6 +71,17 @@ public void serDeTest() throws IOException { } } + @Test + public void progressStateTest() { + DataPartitionTableIntegrityCheckProcedure procedure = + new DataPartitionTableIntegrityCheckProcedure(); + TShowRepairDataPartitionTableProgressResp progress = procedure.getProgress(); + Assert.assertEquals( + RepairDataPartitionTableProgressState.COLLECT_EARLIEST_TIMESLOTS.name(), + progress.getState()); + Assert.assertTrue(progress.getProgress() >= 0.0 && progress.getProgress() <= 100.0); + } + private DataPartitionTableIntegrityCheckProcedure createTestProcedureWithData() { DataPartitionTableIntegrityCheckProcedure proc = new DataPartitionTableIntegrityCheckProcedure(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index ec55111a200b4..678056fb993df 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -170,6 +170,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp; import org.apache.iotdb.confignode.rpc.thrift.TShowTTLResp; @@ -747,6 +748,14 @@ public TSStatus dataPartitionTableIntegrityCheck() throws TException { () -> client.dataPartitionTableIntegrityCheck(), status -> !updateConfigNodeLeader(status)); } + @Override + public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() + throws TException { + return executeRemoteCallWithRetry( + () -> client.showRepairDataPartitionTableProgress(), + resp -> !updateConfigNodeLeader(resp.status)); + } + @Override public TSStatus operatePermission(TAuthorizerReq req) throws TException { return executeRemoteCallWithRetry( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index c588c94c7b5ab..08fe54cf10dc1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -280,6 +280,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; +import org.apache.iotdb.mpp.rpc.thrift.TGetDataPartitionTableGeneratorProgressResp; import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq; @@ -3408,13 +3409,14 @@ public TGenerateDataPartitionTableResp generateDataPartitionTable( try { // Check if there's already a task in the progress - if (currentGenerator != null - && currentGenerator.getStatus() == DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) { + final DataPartitionTableGenerator runningGenerator = currentGenerator; + if (runningGenerator != null + && runningGenerator.getStatus() == DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) { resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode()); resp.setMessage( String.format( "DataPartitionTable generation is already in the progress: %.1f%%", - currentGenerator.getProgress() * 100)); + runningGenerator.getProgress() * 100)); resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); return resp; } @@ -3436,17 +3438,18 @@ public TGenerateDataPartitionTableResp generateDataPartitionTable( ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(), new ThreadPoolExecutor.CallerRunsPolicy()); - currentGenerator = + final DataPartitionTableGenerator generator = new DataPartitionTableGenerator( partitionTableRecoverExecutor, req.getDatabases(), seriesSlotNum, seriesPartitionExecutorClass); + currentGenerator = generator; currentTaskId = System.currentTimeMillis(); // Start generation synchronously for now to return the data partition table immediately - currentGeneratorFuture = currentGenerator.startGeneration(); - parseGenerationStatus(resp); + currentGeneratorFuture = generator.startGeneration(); + parseGenerationStatus(resp, generator); } catch (Exception e) { LOGGER.error(DataNodeMiscMessages.FAILED_GENERATE_DATA_PARTITION_TABLE, e); resp.setStatus( @@ -3470,9 +3473,13 @@ public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartb try { // To resolve this situation that the DataNode is registered and didn't request // generateDataPartitionTable interface yet. - if (currentGeneratorFuture == null || currentGenerator == null) { + CompletableFuture generatorFuture = currentGeneratorFuture; + DataPartitionTableGenerator generator = currentGenerator; + if (generatorFuture == null || generator == null) { generateDataPartitionTable(req); - if (currentGeneratorFuture == null || currentGenerator == null) { + generatorFuture = currentGeneratorFuture; + generator = currentGenerator; + if (generatorFuture == null || generator == null) { resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); resp.setMessage(DataNodeMiscMessages.NO_DATA_PARTITION_TABLE_GENERATION_TASK_FOUND); resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); @@ -3480,15 +3487,20 @@ public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartb } } - currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS); + try { + generatorFuture.get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + parseGenerationStatus(resp, generator); + return resp; + } - parseGenerationStatus(resp); - if (currentGenerator.getStatus().equals(DataPartitionTableGenerator.TaskStatus.COMPLETED)) { + parseGenerationStatus(resp, generator); + if (generator.getStatus().equals(DataPartitionTableGenerator.TaskStatus.COMPLETED)) { boolean success = false; List databaseScopedDataPartitionTableList = new ArrayList<>(); Map dataPartitionTableMap = - currentGenerator.getDatabasePartitionTableMap(); + generator.getDatabasePartitionTableMap(); if (!dataPartitionTableMap.isEmpty()) { for (Map.Entry entry : dataPartitionTableMap.entrySet()) { String database = entry.getKey(); @@ -3522,30 +3534,73 @@ public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartb return resp; } - private void parseGenerationStatus(Object resp) { - if (currentGenerator == null) { - return; + @Override + public TGetDataPartitionTableGeneratorProgressResp getDataPartitionTableGeneratorProgress() { + TGetDataPartitionTableGeneratorProgressResp resp = + new TGetDataPartitionTableGeneratorProgressResp(); + final DataPartitionTableGenerator generator = currentGenerator; + + if (generator == null) { + resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); + resp.setProgress(0.0); + resp.setMessage("No DataPartitionTable generation task found"); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + return resp; + } + + switch (generator.getStatus()) { + case IN_PROGRESS: + resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode()); + resp.setProgress(generator.getProgress()); + resp.setMessage( + String.format( + "DataPartitionTable generation in progress: %.1f%%", + generator.getProgress() * 100)); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + break; + case COMPLETED: + resp.setErrorCode(DataPartitionTableGeneratorState.SUCCESS.getCode()); + resp.setProgress(1.0); + resp.setMessage("DataPartitionTable generation completed successfully"); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + break; + case FAILED: + resp.setErrorCode(DataPartitionTableGeneratorState.FAILED.getCode()); + resp.setProgress(generator.getProgress()); + resp.setMessage("DataPartitionTable generation failed: " + generator.getErrorMessage()); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + break; + default: + resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); + resp.setProgress(generator.getProgress()); + resp.setMessage("Unknown task status: " + generator.getStatus()); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + break; } + return resp; + } - switch (currentGenerator.getStatus()) { + private void parseGenerationStatus(Object resp, DataPartitionTableGenerator generator) { + switch (generator.getStatus()) { case IN_PROGRESS: setResponseFields( resp, DataPartitionTableGeneratorState.IN_PROGRESS.getCode(), String.format( - "DataPartitionTable generation in progress: %.1f%%", - currentGenerator.getProgress() * 100), + "DataPartitionTable generation in progress: %.1f%%", generator.getProgress() * 100), + generator.getProgress(), RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); LOGGER.info( String.format( "DataPartitionTable generation with task ID: %s in progress: %.1f%%", - currentTaskId, currentGenerator.getProgress() * 100)); + currentTaskId, generator.getProgress() * 100)); break; case COMPLETED: setResponseFields( resp, DataPartitionTableGeneratorState.SUCCESS.getCode(), "DataPartitionTable generation completed successfully", + 1.0, RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); LOGGER.info(DataNodeMiscMessages.DATA_PARTITION_TABLE_COMPLETED, currentTaskId); break; @@ -3553,7 +3608,8 @@ private void parseGenerationStatus(Object resp) { setResponseFields( resp, DataPartitionTableGeneratorState.FAILED.getCode(), - "DataPartitionTable generation failed: " + currentGenerator.getErrorMessage(), + "DataPartitionTable generation failed: " + generator.getErrorMessage(), + generator.getProgress(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); LOGGER.info(DataNodeMiscMessages.DATA_PARTITION_TABLE_FAILED, currentTaskId); break; @@ -3561,14 +3617,16 @@ private void parseGenerationStatus(Object resp) { setResponseFields( resp, DataPartitionTableGeneratorState.UNKNOWN.getCode(), - "Unknown task status: " + currentGenerator.getStatus(), + "Unknown task status: " + generator.getStatus(), + generator.getProgress(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); LOGGER.info(DataNodeMiscMessages.DATA_PARTITION_TABLE_FAILED, currentTaskId); break; } } - private void setResponseFields(Object resp, int errorCode, String message, TSStatus status) { + private void setResponseFields( + Object resp, int errorCode, String message, double progress, TSStatus status) { if (resp instanceof TGenerateDataPartitionTableResp) { ((TGenerateDataPartitionTableResp) resp).setErrorCode(errorCode); ((TGenerateDataPartitionTableResp) resp).setMessage(message); @@ -3576,6 +3634,7 @@ private void setResponseFields(Object resp, int errorCode, String message, TSSta } else if (resp instanceof TGenerateDataPartitionTableHeartbeatResp) { ((TGenerateDataPartitionTableHeartbeatResp) resp).setErrorCode(errorCode); ((TGenerateDataPartitionTableHeartbeatResp) resp).setMessage(message); + ((TGenerateDataPartitionTableHeartbeatResp) resp).setProgress(progress); ((TGenerateDataPartitionTableHeartbeatResp) resp).setStatus(status); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java index c2a1a9f02d191..e674199df52fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java @@ -169,6 +169,11 @@ public static DatasetHeader getShowPipeHeader() { return new DatasetHeader(ColumnHeaderConstant.showPipeColumnHeaders, true); } + public static DatasetHeader getShowRepairDataPartitionTableProgressHeader() { + return new DatasetHeader( + ColumnHeaderConstant.showRepairDataPartitionTableProgressColumnHeaders, true); + } + public static DatasetHeader getShowTopicHeader() { return new DatasetHeader(ColumnHeaderConstant.showTopicColumnHeaders, true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java index 78a0da4dfb570..ef8a5ffcaaa63 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java @@ -110,6 +110,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.sys.SetConfigurationTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.SetSystemStatusTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.ShowConfigurationTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.sys.ShowRepairDataPartitionTableProgressTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.StartRepairDataTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.StopRepairDataTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.TestConnectionTask; @@ -224,6 +225,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowConfigurationStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentSqlDialectStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowRepairDataPartitionTableProgressStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.TestConnectionStatement; @@ -398,6 +400,13 @@ public IConfigTask visitRepairDataPartitionTable( return new RepairDataPartitionTableTask(); } + @Override + public IConfigTask visitShowRepairDataPartitionTableProgress( + ShowRepairDataPartitionTableProgressStatement showRepairDataPartitionTableProgressStatement, + MPPQueryContext context) { + return new ShowRepairDataPartitionTableProgressTask(); + } + @Override public IConfigTask visitStopRepairData( StopRepairDataStatement stopRepairDataStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 420c3c8285f4c..d1e592010e484 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -170,6 +170,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp; import org.apache.iotdb.confignode.rpc.thrift.TShowTTLResp; @@ -257,6 +258,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.session.ShowCurrentUserTask; import org.apache.iotdb.db.queryengine.plan.execution.config.session.ShowVersionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.ShowConfigurationTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.sys.ShowRepairDataPartitionTableProgressTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.TestConnectionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.ShowPipeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpaceQuotaTask; @@ -1490,6 +1492,26 @@ public SettableFuture repairDataPartitionTable() { return future; } + @Override + public SettableFuture showRepairDataPartitionTableProgress() { + SettableFuture future = SettableFuture.create(); + + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TShowRepairDataPartitionTableProgressResp resp = + client.showRepairDataPartitionTableProgress(); + if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + ShowRepairDataPartitionTableProgressTask.buildTsBlock(resp, future); + } else { + future.setException(new IoTDBException(resp.getStatus())); + } + } catch (ClientManagerException | TException e) { + future.setException(e); + } + + return future; + } + @Override public SettableFuture loadConfiguration(boolean onCluster) { SettableFuture future = SettableFuture.create(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index 07f1a30fb19c9..46f82cefe28cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -160,6 +160,8 @@ SettableFuture showPipePlugins( SettableFuture repairDataPartitionTable(); + SettableFuture showRepairDataPartitionTableProgress(); + SettableFuture flush(TFlushReq tFlushReq, boolean onCluster); SettableFuture clearCache(boolean onCluster, Set options); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/ShowRepairDataPartitionTableProgressTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/ShowRepairDataPartitionTableProgressTask.java new file mode 100644 index 0000000000000..d0ccbaa9f3423 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/ShowRepairDataPartitionTableProgressTask.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.sys; + +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory; +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.Binary; + +import java.util.Arrays; + +public class ShowRepairDataPartitionTableProgressTask implements IConfigTask { + + public ShowRepairDataPartitionTableProgressTask() { + // Empty constructor + } + + @Override + public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + return configTaskExecutor.showRepairDataPartitionTableProgress(); + } + + public static void buildTsBlock( + TShowRepairDataPartitionTableProgressResp resp, SettableFuture future) { + TsBlockBuilder builder = + new TsBlockBuilder(Arrays.asList(TSDataType.TEXT, TSDataType.DOUBLE, TSDataType.TEXT)); + + builder.getTimeColumnBuilder().writeLong(0L); + builder + .getColumnBuilder(0) + .writeBinary(new Binary(resp.getState(), TSFileConfig.STRING_CHARSET)); + builder.getColumnBuilder(1).writeDouble(resp.getProgress()); + builder + .getColumnBuilder(2) + .writeBinary( + new Binary(resp.isSetMessage() ? resp.getMessage() : "", TSFileConfig.STRING_CHARSET)); + builder.declarePosition(); + + future.set( + new ConfigTaskResult( + TSStatusCode.SUCCESS_STATUS, + builder.build(), + DatasetHeaderFactory.getShowRepairDataPartitionTableProgressHeader())); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index b9e2fe21c7a54..dfa32cf1a96f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -251,6 +251,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowRepairDataPartitionTableProgressStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement; @@ -3800,6 +3801,12 @@ public Statement visitRepairDataPartitionTable( return new RepairDataPartitionTable(); } + @Override + public Statement visitShowRepairDataPartitionTableProgress( + IoTDBSqlParser.ShowRepairDataPartitionTableProgressContext ctx) { + return new ShowRepairDataPartitionTableProgressStatement(); + } + // Stop Repair Data @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java index 41f91bb461809..e3f376deba7d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java @@ -160,6 +160,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowRepairDataPartitionTableProgressStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement; @@ -1810,6 +1811,16 @@ public TSStatus visitRepairDataPartitionTable( AuditEventType.INTEGRITY_CHECK); } + @Override + public TSStatus visitShowRepairDataPartitionTableProgress( + ShowRepairDataPartitionTableProgressStatement showRepairDataPartitionTableProgressStatement, + TreeAccessCheckContext context) { + return checkGlobalAuth( + context.setAuditLogOperation(AuditLogOperation.DDL), + PrivilegeType.SYSTEM, + AuditEventType.INTEGRITY_CHECK); + } + @Override public TSStatus visitStopRepairData( StopRepairDataStatement stopRepairDataStatement, TreeAccessCheckContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java index 88a2959f1475b..00d36a03e236f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java @@ -182,6 +182,7 @@ public enum StatementType { START_REPAIR_DATA, STOP_REPAIR_DATA, REPAIR_DATA_PARTITION_TABLE, + SHOW_REPAIR_DATA_PARTITION_TABLE_PROGRESS, CREATE_TOPIC, ALTER_TOPIC, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index 39af259928b6b..5f2f72f431397 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -148,6 +148,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowRepairDataPartitionTableProgressStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement; @@ -527,6 +528,12 @@ public R visitRepairDataPartitionTable( return visitStatement(repairDataPartitionTable, context); } + public R visitShowRepairDataPartitionTableProgress( + ShowRepairDataPartitionTableProgressStatement showRepairDataPartitionTableProgressStatement, + C context) { + return visitStatement(showRepairDataPartitionTableProgressStatement, context); + } + public R visitLoadConfiguration( LoadConfigurationStatement loadConfigurationStatement, C context) { return visitStatement(loadConfigurationStatement, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowRepairDataPartitionTableProgressStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowRepairDataPartitionTableProgressStatement.java new file mode 100644 index 0000000000000..9de4d4ccca325 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowRepairDataPartitionTableProgressStatement.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.sys; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; + +import java.util.Collections; +import java.util.List; + +public class ShowRepairDataPartitionTableProgressStatement extends Statement + implements IConfigStatement { + + public ShowRepairDataPartitionTableProgressStatement() { + this.statementType = StatementType.SHOW_REPAIR_DATA_PARTITION_TABLE_PROGRESS; + } + + @Override + public List getPaths() { + return Collections.emptyList(); + } + + @Override + public QueryType getQueryType() { + return QueryType.READ; + } + + @Override + public R accept(StatementVisitor visitor, C context) { + return visitor.visitShowRepairDataPartitionTableProgress(this, context); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java index febf29bea1553..398797ffca659 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java @@ -1055,6 +1055,27 @@ public void testCreateView() throws IllegalPathException { assertEquals(null, stmt.getQueryStatement()); } + @Test + public void testShowRepairDataPartitionTableProgress() { + Statement statement = + StatementGenerator.createStatement( + "SHOW REPAIR DATA PARTITION TABLE PROGRESS;", ZonedDateTime.now().getOffset()); + assertEquals(StatementType.SHOW_REPAIR_DATA_PARTITION_TABLE_PROGRESS, statement.getType()); + + QueryStatement queryStatement = + (QueryStatement) + StatementGenerator.createStatement( + "SELECT progress FROM root.sg.d1;", ZonedDateTime.now().getOffset()); + assertEquals( + "progress", + queryStatement + .getSelectComponent() + .getResultColumns() + .get(0) + .getExpression() + .getExpressionString()); + } + // TODO: add more tests private void checkQueryStatement( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/RepairDataPartitionTableProgressState.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/RepairDataPartitionTableProgressState.java new file mode 100644 index 0000000000000..7afa3cbbdc8df --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/RepairDataPartitionTableProgressState.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.enums; + +public enum RepairDataPartitionTableProgressState { + UNKNOWN, + IDLE, + COLLECT_EARLIEST_TIMESLOTS, + ANALYZE_MISSING_PARTITIONS, + REQUEST_PARTITION_TABLES, + REQUEST_PARTITION_TABLES_HEART_BEAT, + MERGE_PARTITION_TABLES, + WRITE_PARTITION_TABLE_TO_CONSENSUS +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index d9d6518ac0beb..0fba371b9abca 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -197,6 +197,11 @@ private ColumnHeaderConstant() { public static final String REMAINING_EVENT_COUNT = "RemainingEventCount"; public static final String ESTIMATED_REMAINING_SECONDS = "EstimatedRemainingSeconds"; + // column names for show repair data partition table progress + public static final String REPAIR_DATA_PARTITION_TABLE_STATUS = "Status"; + public static final String REPAIR_DATA_PARTITION_TABLE_PROGRESS = "Progress(%)"; + public static final String REPAIR_DATA_PARTITION_TABLE_MESSAGE = "Message"; + // column names for select into public static final String SOURCE_DEVICE = "SourceDevice"; public static final String SOURCE_COLUMN = "SourceColumn"; @@ -610,6 +615,12 @@ private ColumnHeaderConstant() { new ColumnHeader(REMAINING_EVENT_COUNT, TSDataType.TEXT), new ColumnHeader(ESTIMATED_REMAINING_SECONDS, TSDataType.TEXT)); + public static final List showRepairDataPartitionTableProgressColumnHeaders = + ImmutableList.of( + new ColumnHeader(REPAIR_DATA_PARTITION_TABLE_STATUS, TSDataType.TEXT), + new ColumnHeader(REPAIR_DATA_PARTITION_TABLE_PROGRESS, TSDataType.DOUBLE), + new ColumnHeader(REPAIR_DATA_PARTITION_TABLE_MESSAGE, TSDataType.TEXT)); + public static final List showTopicColumnHeaders = ImmutableList.of( new ColumnHeader(TOPIC_NAME, TSDataType.TEXT), diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 852460fb60e8b..9329f6836b9e2 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -280,6 +280,13 @@ struct TDataPartitionTableResp { 2: optional map>>> dataPartitionTable } +struct TShowRepairDataPartitionTableProgressResp { + 1: required common.TSStatus status + 2: required string state + 3: required double progress + 4: optional string message +} + struct TGetRegionIdReq { 1: required common.TConsensusGroupType type 2: optional string database @@ -1516,6 +1523,8 @@ service IConfigNodeRPCService { common.TSStatus dataPartitionTableIntegrityCheck() + TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() + // ====================================================== // Authorize // ====================================================== diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 11de2e4cba232..aaa5c0a8627a0 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -749,6 +749,14 @@ struct TGenerateDataPartitionTableHeartbeatResp { 2: required i32 errorCode 3: optional string message 4: optional list databaseScopedDataPartitionTables + 5: optional double progress +} + +struct TGetDataPartitionTableGeneratorProgressResp { + 1: required common.TSStatus status + 2: required i32 errorCode + 3: required double progress + 4: optional string message } /** @@ -1405,6 +1413,11 @@ service IDataNodeRPCService { */ TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat(TGenerateDataPartitionTableReq req) + /** + * Get the progress of DataPartitionTable generation task without consuming the generated table. + */ + TGetDataPartitionTableGeneratorProgressResp getDataPartitionTableGeneratorProgress() + /** * END: Data Partition Table Integrity Check **/