From e634a25dd9156292e52ddfb40c4c6f337e454fd0 Mon Sep 17 00:00:00 2001 From: libo Date: Mon, 11 May 2026 18:37:48 +0800 Subject: [PATCH 1/5] Display data partition repair progress by providing a new SQL statement --- .../apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 7 +- .../org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 6 +- .../client/sync/CnToDnSyncRequestType.java | 1 + .../client/sync/SyncDataNodeClientPool.java | 3 + .../confignode/manager/ConfigManager.java | 12 +++ .../iotdb/confignode/manager/IManager.java | 3 + .../confignode/manager/ProcedureManager.java | 13 +++ .../manager/partition/PartitionManager.java | 13 +++ ...PartitionTableIntegrityCheckProcedure.java | 85 +++++++++++++++++++ .../thrift/ConfigNodeRPCServiceProcessor.java | 6 ++ .../db/protocol/client/ConfigNodeClient.java | 9 ++ .../impl/DataNodeInternalRPCServiceImpl.java | 47 ++++++++++ .../common/header/DatasetHeaderFactory.java | 5 ++ .../config/TreeConfigTaskVisitor.java | 9 ++ .../executor/ClusterConfigTaskExecutor.java | 22 +++++ .../config/executor/IConfigTaskExecutor.java | 2 + ...wRepairDataPartitionTableProgressTask.java | 71 ++++++++++++++++ .../queryengine/plan/parser/ASTVisitor.java | 7 ++ .../security/TreeAccessCheckVisitor.java | 11 +++ .../plan/statement/StatementType.java | 1 + .../plan/statement/StatementVisitor.java | 7 ++ ...irDataPartitionTableProgressStatement.java | 53 ++++++++++++ .../plan/parser/StatementGeneratorTest.java | 8 ++ .../schema/column/ColumnHeaderConstant.java | 11 +++ .../src/main/thrift/confignode.thrift | 10 ++- .../src/main/thrift/datanode.thrift | 14 ++- 26 files changed, 432 insertions(+), 4 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/ShowRepairDataPartitionTableProgressTask.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowRepairDataPartitionTableProgressStatement.java diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index c6271c134cb00..34029a0176c66 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -94,7 +94,7 @@ utilityStatement | showQueries | showDiskUsage | showCurrentTimestamp | killQuery | grantWatermarkEmbedding | revokeWatermarkEmbedding | loadConfiguration | loadTimeseries | loadFile | removeFile | unloadFile | setSqlDialect | showCurrentSqlDialect | showCurrentUser - | repairDataPartitionTable + | repairDataPartitionTable | showRepairDataPartitionTableProgress ; /** @@ -1244,6 +1244,11 @@ repairDataPartitionTable : REPAIR DATA PARTITION TABLE ; +// Show Repair Data Partition Table Progress +showRepairDataPartitionTableProgress + : SHOW REPAIR DATA PARTITION TABLE PROGRESS + ; + // Explain explain : EXPLAIN (ANALYZE VERBOSE?)? selectStatement? diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 index 5c5cbe4a186a9..f2cf1fcb31e0d 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 @@ -1203,6 +1203,10 @@ REPAIR : R E P A I R ; +PROGRESS + : P R O G R E S S + ; + SCHEMA_REPLICATION_FACTOR : S C H E M A '_' R E P L I C A T I O N '_' F A C T O R ; @@ -1399,4 +1403,4 @@ fragment V: [vV]; fragment W: [wW]; fragment X: [xX]; fragment Y: [yY]; -fragment Z: [zZ]; \ No newline at end of file +fragment Z: [zZ]; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java index 790fd637d616a..ca7fc7a37b2af 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java @@ -41,6 +41,7 @@ public enum CnToDnSyncRequestType { COLLECT_EARLIEST_TIMESLOTS, GENERATE_DATA_PARTITION_TABLE, GENERATE_DATA_PARTITION_TABLE_HEART_BEAT, + GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS, // PartitionCache INVALIDATE_PARTITION_CACHE, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java index b9cf775459c2e..5a06719e7b4ca 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java @@ -150,6 +150,9 @@ private void buildActionMap() { CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT, (req, client) -> client.generateDataPartitionTableHeartbeat((TGenerateDataPartitionTableReq) req)); + actionMapBuilder.put( + CnToDnSyncRequestType.GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS, + (req, client) -> client.getDataPartitionTableGeneratorProgress()); actionMap = actionMapBuilder.build(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 34533362d44e6..57e7499239d32 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -234,6 +234,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp; import org.apache.iotdb.confignode.rpc.thrift.TShowTable4InformationSchemaResp; @@ -1167,6 +1168,17 @@ public TSStatus dataPartitionTableIntegrityCheck() { return partitionManager.dataPartitionTableIntegrityCheck(); } + @Override + public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() { + TSStatus status = confirmLeader(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return new TShowRepairDataPartitionTableProgressResp(status, "UNKNOWN", 0.0) + .setMessage(status.getMessage()); + } + + return partitionManager.showRepairDataPartitionTableProgress(); + } + private void printNewCreatedDataPartition( GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan, TDataPartitionTableResp resp) { final String lineSeparator = System.lineSeparator(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 4dce39a9e98f0..f4bd98a1643b8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -152,6 +152,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp; import org.apache.iotdb.confignode.rpc.thrift.TShowTable4InformationSchemaResp; @@ -479,6 +480,8 @@ TDataPartitionTableResp getOrCreateDataPartition( TSStatus dataPartitionTableIntegrityCheck(); + TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress(); + /** * Get AuditLogger. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 3de0f4247d808..1c90036b76537 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -69,6 +69,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure; +import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure; @@ -2340,6 +2341,18 @@ public boolean isExistUnfinishedProcedure( return false; } + public Optional + getUnfinishedDataPartitionTableIntegrityCheckProcedure() { + for (Procedure procedure : getExecutor().getProcedures().values()) { + if (!procedure.isFinished() + && procedure instanceof DataPartitionTableIntegrityCheckProcedure) { + return Optional.of((DataPartitionTableIntegrityCheckProcedure) procedure); + } + } + + return Optional.empty(); + } + // ====================================================== /* GET-SET Region diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index da7a19cfdb5aa..77197c55832a4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -89,6 +89,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; @@ -541,6 +542,18 @@ public void markDataPartitionTableIntegrityCheckProcedureFinished() { dataPartitionTableIntegrityCheckProcedureRunning.set(false); } + public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() { + return configManager + .getProcedureManager() + .getUnfinishedDataPartitionTableIntegrityCheckProcedure() + .map(procedure -> procedure.getProgress(configManager.getProcedureManager().getEnv())) + .orElseGet( + () -> + new TShowRepairDataPartitionTableProgressResp( + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), "IDLE", 100.0) + .setMessage("No running DataPartitionTable integrity check procedure")); + } + private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) { TSStatus status = getConsensusManager().confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java index f3d539576d4ba..f25f7abfe8d4b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java @@ -41,11 +41,14 @@ import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; import org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; +import org.apache.iotdb.mpp.rpc.thrift.TGetDataPartitionTableGeneratorProgressResp; import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; @@ -1098,4 +1101,86 @@ public void setSkipDataNodes(Set skipDataNodes) { public void setFailedDataNodes(Set failedDataNodes) { this.failedDataNodes = failedDataNodes; } + + public TShowRepairDataPartitionTableProgressResp getProgress(final ConfigNodeProcedureEnv env) { + final DataPartitionTableIntegrityCheckProcedureState currentState = getCurrentState(); + final String state = currentState == null ? "UNKNOWN" : currentState.name(); + final double progress = + currentState == null ? 0.0 : calculateProgressByState(env, currentState) * 100; + + return new TShowRepairDataPartitionTableProgressResp( + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), state, progress) + .setMessage(String.format("DataPartitionTable integrity check progress: %.1f%%", progress)); + } + + private double calculateProgressByState( + final ConfigNodeProcedureEnv env, + final DataPartitionTableIntegrityCheckProcedureState currentState) { + switch (currentState) { + case COLLECT_EARLIEST_TIMESLOTS: + return 0.0; + case ANALYZE_MISSING_PARTITIONS: + return 0.05; + case REQUEST_PARTITION_TABLES: + return 0.1; + case REQUEST_PARTITION_TABLES_HEART_BEAT: + return 0.1 + 0.8 * calculateDataNodeGeneratorProgress(env); + case MERGE_PARTITION_TABLES: + return 0.95; + case WRITE_PARTITION_TABLE_TO_CONSENSUS: + return 0.99; + default: + return 0.0; + } + } + + private double calculateDataNodeGeneratorProgress(final ConfigNodeProcedureEnv env) { + final LoadManager currentLoadManager = + loadManager == null ? env.getConfigManager().getLoadManager() : loadManager; + + final Set targetDataNodes = new HashSet<>(allDataNodes); + targetDataNodes.removeAll(skipDataNodes); + if (targetDataNodes.isEmpty()) { + return dataPartitionTables.isEmpty() ? 0.0 : 1.0; + } + + double progressSum = 0.0; + for (TDataNodeConfiguration dataNode : targetDataNodes) { + final int dataNodeId = dataNode.getLocation().getDataNodeId(); + if (dataPartitionTables.containsKey(dataNodeId) + || failedDataNodes.contains(dataNode) + || !NodeStatus.Running.equals(currentLoadManager.getNodeStatus(dataNodeId))) { + progressSum += 1.0; + continue; + } + + try { + Object response = + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getLocation().getInternalEndPoint(), + null, + CnToDnSyncRequestType.GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS, + MAX_RETRY_COUNT); + if (response instanceof TGetDataPartitionTableGeneratorProgressResp) { + TGetDataPartitionTableGeneratorProgressResp resp = + (TGetDataPartitionTableGeneratorProgressResp) response; + DataPartitionTableGeneratorState state = + DataPartitionTableGeneratorState.getStateByCode(resp.getErrorCode()); + if (state == DataPartitionTableGeneratorState.SUCCESS) { + progressSum += 1.0; + } else if (state == DataPartitionTableGeneratorState.IN_PROGRESS) { + progressSum += Math.max(0.0, Math.min(1.0, resp.getProgress())); + } + } + } catch (Exception e) { + LOG.warn( + "[DataPartitionIntegrity] Failed to get DataPartitionTable generation progress from DataNode[id={}]: {}", + dataNodeId, + e.getMessage()); + } + } + + return progressSum / targetDataNodes.size(); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 4dac0ea8e34ec..a34cdd45d4bce 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -209,6 +209,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp; import org.apache.iotdb.confignode.rpc.thrift.TShowTTLResp; @@ -631,6 +632,11 @@ public TSStatus dataPartitionTableIntegrityCheck() { return configManager.dataPartitionTableIntegrityCheck(); } + @Override + public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() { + return configManager.showRepairDataPartitionTableProgress(); + } + @Override public TSStatus operatePermission(final TAuthorizerReq req) { ConfigPhysicalPlanType configPhysicalPlanType = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 613fbcf5d95d1..59c381f3feef4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -167,6 +167,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp; import org.apache.iotdb.confignode.rpc.thrift.TShowTTLResp; @@ -708,6 +709,14 @@ public TSStatus dataPartitionTableIntegrityCheck() throws TException { () -> client.dataPartitionTableIntegrityCheck(), status -> !updateConfigNodeLeader(status)); } + @Override + public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() + throws TException { + return executeRemoteCallWithRetry( + () -> client.showRepairDataPartitionTableProgress(), + resp -> !updateConfigNodeLeader(resp.status)); + } + @Override public TSStatus operatePermission(TAuthorizerReq req) throws TException { return executeRemoteCallWithRetry( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 436e78906df3e..48f84fc26f44b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -274,6 +274,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; +import org.apache.iotdb.mpp.rpc.thrift.TGetDataPartitionTableGeneratorProgressResp; import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq; @@ -3397,6 +3398,52 @@ public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartb return resp; } + @Override + public TGetDataPartitionTableGeneratorProgressResp getDataPartitionTableGeneratorProgress() { + TGetDataPartitionTableGeneratorProgressResp resp = + new TGetDataPartitionTableGeneratorProgressResp(); + + if (currentGenerator == null) { + resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); + resp.setProgress(0.0); + resp.setMessage("No DataPartitionTable generation task found"); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + return resp; + } + + switch (currentGenerator.getStatus()) { + case IN_PROGRESS: + resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode()); + resp.setProgress(currentGenerator.getProgress()); + resp.setMessage( + String.format( + "DataPartitionTable generation in progress: %.1f%%", + currentGenerator.getProgress() * 100)); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + break; + case COMPLETED: + resp.setErrorCode(DataPartitionTableGeneratorState.SUCCESS.getCode()); + resp.setProgress(1.0); + resp.setMessage("DataPartitionTable generation completed successfully"); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + break; + case FAILED: + resp.setErrorCode(DataPartitionTableGeneratorState.FAILED.getCode()); + resp.setProgress(currentGenerator.getProgress()); + resp.setMessage( + "DataPartitionTable generation failed: " + currentGenerator.getErrorMessage()); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + break; + default: + resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); + resp.setProgress(currentGenerator.getProgress()); + resp.setMessage("Unknown task status: " + currentGenerator.getStatus()); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + break; + } + return resp; + } + private void parseGenerationStatus(Object resp) { if (currentGenerator == null) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java index 18f15eea8f397..618158d74db22 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/DatasetHeaderFactory.java @@ -169,6 +169,11 @@ public static DatasetHeader getShowPipeHeader() { return new DatasetHeader(ColumnHeaderConstant.showPipeColumnHeaders, true); } + public static DatasetHeader getShowRepairDataPartitionTableProgressHeader() { + return new DatasetHeader( + ColumnHeaderConstant.showRepairDataPartitionTableProgressColumnHeaders, true); + } + public static DatasetHeader getShowTopicHeader() { return new DatasetHeader(ColumnHeaderConstant.showTopicColumnHeaders, true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java index de090b31f4892..fb20494ac5b98 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java @@ -109,6 +109,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.sys.SetConfigurationTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.SetSystemStatusTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.ShowConfigurationTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.sys.ShowRepairDataPartitionTableProgressTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.StartRepairDataTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.StopRepairDataTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.TestConnectionTask; @@ -221,6 +222,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowConfigurationStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentSqlDialectStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowRepairDataPartitionTableProgressStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.TestConnectionStatement; @@ -392,6 +394,13 @@ public IConfigTask visitRepairDataPartitionTable( return new RepairDataPartitionTableTask(); } + @Override + public IConfigTask visitShowRepairDataPartitionTableProgress( + ShowRepairDataPartitionTableProgressStatement showRepairDataPartitionTableProgressStatement, + MPPQueryContext context) { + return new ShowRepairDataPartitionTableProgressTask(); + } + @Override public IConfigTask visitStopRepairData( StopRepairDataStatement stopRepairDataStatement, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index ab823966de8e7..f10d5cc4ea788 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -164,6 +164,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp; import org.apache.iotdb.confignode.rpc.thrift.TShowTTLResp; @@ -245,6 +246,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.session.ShowCurrentUserTask; import org.apache.iotdb.db.queryengine.plan.execution.config.session.ShowVersionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.ShowConfigurationTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.sys.ShowRepairDataPartitionTableProgressTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.TestConnectionTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.ShowPipeTask; import org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpaceQuotaTask; @@ -1471,6 +1473,26 @@ public SettableFuture repairDataPartitionTable() { return future; } + @Override + public SettableFuture showRepairDataPartitionTableProgress() { + SettableFuture future = SettableFuture.create(); + + try (ConfigNodeClient client = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TShowRepairDataPartitionTableProgressResp resp = + client.showRepairDataPartitionTableProgress(); + if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + ShowRepairDataPartitionTableProgressTask.buildTsBlock(resp, future); + } else { + future.setException(new IoTDBException(resp.getStatus())); + } + } catch (ClientManagerException | TException e) { + future.setException(e); + } + + return future; + } + @Override public SettableFuture loadConfiguration(boolean onCluster) { SettableFuture future = SettableFuture.create(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index b4b928ba0b617..cdd00cc7c7093 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -158,6 +158,8 @@ SettableFuture showPipePlugins( SettableFuture repairDataPartitionTable(); + SettableFuture showRepairDataPartitionTableProgress(); + SettableFuture flush(TFlushReq tFlushReq, boolean onCluster); SettableFuture clearCache(boolean onCluster, Set options); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/ShowRepairDataPartitionTableProgressTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/ShowRepairDataPartitionTableProgressTask.java new file mode 100644 index 0000000000000..64b929be4a311 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/ShowRepairDataPartitionTableProgressTask.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.sys; + +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; +import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory; +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.Binary; + +import java.util.Arrays; + +public class ShowRepairDataPartitionTableProgressTask implements IConfigTask { + + public ShowRepairDataPartitionTableProgressTask() { + // Empty constructor + } + + @Override + public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + return configTaskExecutor.showRepairDataPartitionTableProgress(); + } + + public static void buildTsBlock( + TShowRepairDataPartitionTableProgressResp resp, SettableFuture future) { + TsBlockBuilder builder = + new TsBlockBuilder(Arrays.asList(TSDataType.TEXT, TSDataType.DOUBLE, TSDataType.TEXT)); + + builder.getTimeColumnBuilder().writeLong(0L); + builder + .getColumnBuilder(0) + .writeBinary(new Binary(resp.getState(), TSFileConfig.STRING_CHARSET)); + builder.getColumnBuilder(1).writeDouble(resp.getProgress()); + builder + .getColumnBuilder(2) + .writeBinary(new Binary(resp.getMessage(), TSFileConfig.STRING_CHARSET)); + builder.declarePosition(); + + future.set( + new ConfigTaskResult( + TSStatusCode.SUCCESS_STATUS, + builder.build(), + DatasetHeaderFactory.getShowRepairDataPartitionTableProgressHeader())); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 28e935d5047a8..553a7b3c68626 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -249,6 +249,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowRepairDataPartitionTableProgressStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement; @@ -3765,6 +3766,12 @@ public Statement visitRepairDataPartitionTable( return new RepairDataPartitionTable(); } + @Override + public Statement visitShowRepairDataPartitionTableProgress( + IoTDBSqlParser.ShowRepairDataPartitionTableProgressContext ctx) { + return new ShowRepairDataPartitionTableProgressStatement(); + } + // Stop Repair Data @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java index 7b4655c79a84c..4c0966907dec5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java @@ -156,6 +156,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowRepairDataPartitionTableProgressStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement; @@ -1692,6 +1693,16 @@ public TSStatus visitRepairDataPartitionTable( AuditEventType.INTEGRITY_CHECK); } + @Override + public TSStatus visitShowRepairDataPartitionTableProgress( + ShowRepairDataPartitionTableProgressStatement showRepairDataPartitionTableProgressStatement, + TreeAccessCheckContext context) { + return checkGlobalAuth( + context.setAuditLogOperation(AuditLogOperation.DDL), + PrivilegeType.SYSTEM, + AuditEventType.INTEGRITY_CHECK); + } + @Override public TSStatus visitStopRepairData( StopRepairDataStatement stopRepairDataStatement, TreeAccessCheckContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java index b40c6444816fc..8003d23030975 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java @@ -182,6 +182,7 @@ public enum StatementType { START_REPAIR_DATA, STOP_REPAIR_DATA, REPAIR_DATA_PARTITION_TABLE, + SHOW_REPAIR_DATA_PARTITION_TABLE_PROGRESS, CREATE_TOPIC, DROP_TOPIC, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index 847e850c52172..ada88e151e4a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -147,6 +147,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowCurrentUserStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowDiskUsageStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; +import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowRepairDataPartitionTableProgressStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StartRepairDataStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.StopRepairDataStatement; @@ -526,6 +527,12 @@ public R visitRepairDataPartitionTable( return visitStatement(repairDataPartitionTable, context); } + public R visitShowRepairDataPartitionTableProgress( + ShowRepairDataPartitionTableProgressStatement showRepairDataPartitionTableProgressStatement, + C context) { + return visitStatement(showRepairDataPartitionTableProgressStatement, context); + } + public R visitLoadConfiguration( LoadConfigurationStatement loadConfigurationStatement, C context) { return visitStatement(loadConfigurationStatement, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowRepairDataPartitionTableProgressStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowRepairDataPartitionTableProgressStatement.java new file mode 100644 index 0000000000000..9de4d4ccca325 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/ShowRepairDataPartitionTableProgressStatement.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.sys; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; + +import java.util.Collections; +import java.util.List; + +public class ShowRepairDataPartitionTableProgressStatement extends Statement + implements IConfigStatement { + + public ShowRepairDataPartitionTableProgressStatement() { + this.statementType = StatementType.SHOW_REPAIR_DATA_PARTITION_TABLE_PROGRESS; + } + + @Override + public List getPaths() { + return Collections.emptyList(); + } + + @Override + public QueryType getQueryType() { + return QueryType.READ; + } + + @Override + public R accept(StatementVisitor visitor, C context) { + return visitor.visitShowRepairDataPartitionTableProgress(this, context); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java index b98f34a2484d9..3aeb8c3fa4b21 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java @@ -1040,6 +1040,14 @@ public void testCreateView() throws IllegalPathException { assertEquals(null, stmt.getQueryStatement()); } + @Test + public void testShowRepairDataPartitionTableProgress() { + Statement statement = + StatementGenerator.createStatement( + "SHOW REPAIR DATA PARTITION TABLE PROGRESS;", ZonedDateTime.now().getOffset()); + assertEquals(StatementType.SHOW_REPAIR_DATA_PARTITION_TABLE_PROGRESS, statement.getType()); + } + // TODO: add more tests private void checkQueryStatement( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index 186f7daa6846d..dad585e7c6d97 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -197,6 +197,11 @@ private ColumnHeaderConstant() { public static final String REMAINING_EVENT_COUNT = "RemainingEventCount"; public static final String ESTIMATED_REMAINING_SECONDS = "EstimatedRemainingSeconds"; + // column names for show repair data partition table progress + public static final String REPAIR_DATA_PARTITION_TABLE_STATUS = "Status"; + public static final String REPAIR_DATA_PARTITION_TABLE_PROGRESS = "Progress(%)"; + public static final String REPAIR_DATA_PARTITION_TABLE_MESSAGE = "Message"; + // column names for select into public static final String SOURCE_DEVICE = "SourceDevice"; public static final String SOURCE_COLUMN = "SourceColumn"; @@ -606,6 +611,12 @@ private ColumnHeaderConstant() { new ColumnHeader(REMAINING_EVENT_COUNT, TSDataType.TEXT), new ColumnHeader(ESTIMATED_REMAINING_SECONDS, TSDataType.TEXT)); + public static final List showRepairDataPartitionTableProgressColumnHeaders = + ImmutableList.of( + new ColumnHeader(REPAIR_DATA_PARTITION_TABLE_STATUS, TSDataType.TEXT), + new ColumnHeader(REPAIR_DATA_PARTITION_TABLE_PROGRESS, TSDataType.DOUBLE), + new ColumnHeader(REPAIR_DATA_PARTITION_TABLE_MESSAGE, TSDataType.TEXT)); + public static final List showTopicColumnHeaders = ImmutableList.of( new ColumnHeader(TOPIC_NAME, TSDataType.TEXT), diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 22529ffbb737a..ab6e5523a8923 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -274,6 +274,13 @@ struct TDataPartitionTableResp { 2: optional map>>> dataPartitionTable } +struct TShowRepairDataPartitionTableProgressResp { + 1: required common.TSStatus status + 2: required string state + 3: required double progress + 4: optional string message +} + struct TGetRegionIdReq { 1: required common.TConsensusGroupType type 2: optional string database @@ -1500,6 +1507,8 @@ service IConfigNodeRPCService { common.TSStatus dataPartitionTableIntegrityCheck() + TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() + // ====================================================== // Authorize // ====================================================== @@ -2071,4 +2080,3 @@ service IConfigNodeRPCService { common.TSStatus createTableView(TCreateTableViewReq req) } - diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 92a7602b34dee..8570478e6b0e9 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -709,6 +709,13 @@ struct TGenerateDataPartitionTableHeartbeatResp { 4: optional list databaseScopedDataPartitionTables } +struct TGetDataPartitionTableGeneratorProgressResp { + 1: required common.TSStatus status + 2: required i32 errorCode + 3: required double progress + 4: optional string message +} + /** * END: Data Partition Table Integrity Check Structures **/ @@ -1343,6 +1350,11 @@ service IDataNodeRPCService { */ TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat(TGenerateDataPartitionTableReq req) + /** + * Get the progress of DataPartitionTable generation task without consuming the generated table. + */ + TGetDataPartitionTableGeneratorProgressResp getDataPartitionTableGeneratorProgress() + /** * END: Data Partition Table Integrity Check **/ @@ -1361,4 +1373,4 @@ service MPPDataExchangeService { /** Empty rpc, only for connection test */ common.TSStatus testConnectionEmptyRPC() -} \ No newline at end of file +} From 0a60f24c5aacaa719081ea32c992b3ec5c28636e Mon Sep 17 00:00:00 2001 From: libo Date: Thu, 25 Jun 2026 19:44:04 +0800 Subject: [PATCH 2/5] Fix issues related to review comments --- ...rtitionTableIntegrityCheckProcedureIT.java | 36 ++++ .../iotdb/db/qp/sql/IdentifierParser.g4 | 3 +- .../confignode/manager/ConfigManager.java | 4 +- .../manager/partition/PartitionManager.java | 7 +- ...PartitionTableIntegrityCheckProcedure.java | 160 +++++++++++------- ...itionTableIntegrityCheckProcedureTest.java | 13 ++ .../impl/DataNodeInternalRPCServiceImpl.java | 78 +++++---- .../plan/parser/StatementGeneratorTest.java | 13 ++ ...RepairDataPartitionTableProgressState.java | 31 ++++ .../src/main/thrift/datanode.thrift | 1 + 10 files changed, 244 insertions(+), 102 deletions(-) create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/RepairDataPartitionTableProgressState.java diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java index e93856cacd1e5..da54c39479e0d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.it.partition; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; @@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -128,4 +130,38 @@ public void testConcurrentSubmitDataPartitionTableIntegrityCheckProcedure() Assert.assertEquals( "The other concurrent submissions should be rejected", threadCount - 1, failCount.get()); } + + @Test + public void testShowRepairDataPartitionTableProgress() throws Exception { + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement()) { + assertRepairProgress(statement, RepairDataPartitionTableProgressState.IDLE.name(), 0.0, 0.0); + + statement.execute("REPAIR DATA PARTITION TABLE"); + assertRepairProgress(statement, null, 0.0, 100.0); + } + } + + private static void assertRepairProgress( + final Statement statement, + final String expectedStatus, + final double minProgress, + final double maxProgress) + throws SQLException { + try (final ResultSet resultSet = + statement.executeQuery("SHOW REPAIR DATA PARTITION TABLE PROGRESS")) { + Assert.assertTrue(resultSet.next()); + if (expectedStatus != null) { + Assert.assertEquals(expectedStatus, resultSet.getString("Status")); + } else { + Assert.assertNotEquals( + RepairDataPartitionTableProgressState.UNKNOWN.name(), resultSet.getString("Status")); + } + final double progress = resultSet.getDouble("Progress(%)"); + Assert.assertTrue(progress >= minProgress); + Assert.assertTrue(progress <= maxProgress); + Assert.assertNotNull(resultSet.getString("Message")); + Assert.assertFalse(resultSet.next()); + } + } } diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 index 54d53bab67431..cfdcb0c7cf9d5 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 @@ -189,6 +189,7 @@ keyWords | PRIVILEGES | PRIVILEGE_VALUE | PROCESSLIST + | PROGRESS | PROCESSOR | PROPERTY | PRUNE @@ -298,4 +299,4 @@ keyWords | OPTION | INF | CURRENT_TIMESTAMP - ; \ No newline at end of file + ; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 868d1f41dbdb5..aa7f993c17e09 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -47,6 +47,7 @@ import org.apache.iotdb.commons.conf.ConfigurationFileUtils; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.conf.TrimProperties; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.MeasurementPath; @@ -1185,7 +1186,8 @@ public TSStatus dataPartitionTableIntegrityCheck() { public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() { TSStatus status = confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return new TShowRepairDataPartitionTableProgressResp(status, "UNKNOWN", 0.0) + return new TShowRepairDataPartitionTableProgressResp( + status, RepairDataPartitionTableProgressState.UNKNOWN.name(), 0.0) .setMessage(status.getMessage()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 9b9b2e68ab228..bfb743f81fee0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -32,6 +32,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; @@ -548,11 +549,13 @@ public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTablePro return configManager .getProcedureManager() .getUnfinishedDataPartitionTableIntegrityCheckProcedure() - .map(procedure -> procedure.getProgress(configManager.getProcedureManager().getEnv())) + .map(DataPartitionTableIntegrityCheckProcedure::getProgress) .orElseGet( () -> new TShowRepairDataPartitionTableProgressResp( - RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), "IDLE", 100.0) + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), + RepairDataPartitionTableProgressState.IDLE.name(), + 0.0) .setMessage("No running DataPartitionTable integrity check procedure")); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java index c181ed62aba12..50f13039cc624 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java @@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable; import org.apache.iotdb.commons.partition.SeriesPartitionTable; @@ -47,7 +48,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; -import org.apache.iotdb.mpp.rpc.thrift.TGetDataPartitionTableGeneratorProgressResp; import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -131,6 +131,9 @@ public class DataPartitionTableIntegrityCheckProcedure // ============Need serialize END=============/ + private final Map dataNodeGeneratorProgress = new ConcurrentHashMap<>(); + private volatile Set dataNodeGeneratorTargetDataNodeIds = Collections.emptySet(); + public DataPartitionTableIntegrityCheckProcedure() { super(); } @@ -152,7 +155,7 @@ protected Flow executeFromState( // Ensure to get the real-time DataNodes in the current cluster at every step dataNodeManager = env.getConfigManager().getNodeManager(); loadManager = env.getConfigManager().getLoadManager(); - allDataNodes = dataNodeManager.getRegisteredDataNodes(); + allDataNodes = new ArrayList<>(dataNodeManager.getRegisteredDataNodes()); switch (state) { case COLLECT_EARLIEST_TIMESLOTS: @@ -256,8 +259,9 @@ private Flow collectEarliestTimeslots() { } // Collect earliest timeslots from all DataNodes - allDataNodes.removeAll(skipDataNodes); - for (TDataNodeConfiguration dataNode : allDataNodes) { + final List targetDataNodes = new ArrayList<>(allDataNodes); + targetDataNodes.removeAll(skipDataNodes); + for (TDataNodeConfiguration dataNode : targetDataNodes) { // Check if DataNode is alive before sending request NodeStatus nodeStatus = loadManager.getNodeStatus(dataNode.getLocation().getDataNodeId()); if (!NodeStatus.Running.equals(nodeStatus)) { @@ -318,12 +322,12 @@ private Flow collectEarliestTimeslots() { if (LOG.isDebugEnabled()) { LOG.debug( "Collected earliest timeslots from {} DataNodes: {}, the number of successful DataNodes is {}", - allDataNodes.size(), + targetDataNodes.size(), earliestTimeslots, - allDataNodes.size() - failedDataNodes.size()); + targetDataNodes.size() - failedDataNodes.size()); } - if (failedDataNodes.size() == allDataNodes.size()) { + if (countFailedTargetDataNodes(targetDataNodes) == targetDataNodes.size()) { delayRollbackNextState( DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); } else { @@ -457,18 +461,22 @@ private Flow requestPartitionTables() { return Flow.HAS_MORE_STATE; } - allDataNodes.removeAll(skipDataNodes); - allDataNodes.removeAll(failedDataNodes); - for (TDataNodeConfiguration dataNode : allDataNodes) { + final List targetDataNodes = new ArrayList<>(allDataNodes); + targetDataNodes.removeAll(skipDataNodes); + targetDataNodes.removeAll(failedDataNodes); + refreshDataNodeGeneratorTarget(targetDataNodes); + for (TDataNodeConfiguration dataNode : targetDataNodes) { int dataNodeId = dataNode.getLocation().getDataNodeId(); // Check if DataNode is alive before sending request NodeStatus nodeStatus = loadManager.getNodeStatus(dataNodeId); if (!NodeStatus.Running.equals(nodeStatus)) { failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); continue; } if (!dataPartitionTables.containsKey(dataNodeId)) { + dataNodeGeneratorProgress.put(dataNodeId, 0.0); try { TGenerateDataPartitionTableReq req = new TGenerateDataPartitionTableReq(); req.setDatabases(databasesWithLostDataPartition); @@ -482,6 +490,7 @@ private Flow requestPartitionTables() { if (response instanceof TSStatus) { failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); LOG.error( "[DataPartitionIntegrity] Failed to request DataPartitionTable generation from the DataNode[id={}], already out of max retry time", dataNode.getLocation().getDataNodeId()); @@ -491,6 +500,7 @@ private Flow requestPartitionTables() { TGenerateDataPartitionTableResp resp = (TGenerateDataPartitionTableResp) response; if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); LOG.error( "[DataPartitionIntegrity] Failed to request DataPartitionTable generation from the DataNode[id={}], response status is {}", dataNode.getLocation().getDataNodeId(), @@ -498,6 +508,7 @@ private Flow requestPartitionTables() { } } catch (Exception e) { failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); LOG.error( "[DataPartitionIntegrity] Failed to request DataPartitionTable generation from DataNode[id={}]: {}", dataNodeId, @@ -507,7 +518,7 @@ private Flow requestPartitionTables() { } } - if (failedDataNodes.size() == allDataNodes.size()) { + if (countFailedTargetDataNodes(targetDataNodes) == targetDataNodes.size()) { delayRollbackNextState( DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); return Flow.HAS_MORE_STATE; @@ -523,13 +534,19 @@ private Flow requestPartitionTablesHeartBeat() { LOG.debug(ProcedureMessages.CHECKING_DATAPARTITIONTABLE_GENERATION_COMPLETION_STATUS); } + final List targetDataNodes = new ArrayList<>(allDataNodes); + targetDataNodes.removeAll(skipDataNodes); + targetDataNodes.removeAll(failedDataNodes); + refreshDataNodeGeneratorTarget(targetDataNodes); + int completeCount = 0; - for (TDataNodeConfiguration dataNode : allDataNodes) { + for (TDataNodeConfiguration dataNode : targetDataNodes) { int dataNodeId = dataNode.getLocation().getDataNodeId(); // Check if DataNode is alive before sending request NodeStatus nodeStatus = loadManager.getNodeStatus(dataNodeId); if (!NodeStatus.Running.equals(nodeStatus)) { failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); continue; } @@ -547,6 +564,7 @@ private Flow requestPartitionTablesHeartBeat() { if (response instanceof TSStatus) { failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); LOG.error( "[DataPartitionIntegrity] Failed to request DataPartitionTable generation heart beat from the DataNode[id={}], already out of max retry time", dataNode.getLocation().getDataNodeId()); @@ -573,18 +591,21 @@ private Flow requestPartitionTablesHeartBeat() { List databaseScopedDataPartitionTableList = deserializeDatabaseScopedTableList(byteBufferList); dataPartitionTables.put(dataNodeId, databaseScopedDataPartitionTableList); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); LOG.info( "[DataPartitionIntegrity] DataNode {} completed DataPartitionTable generation, terminating heart beat", dataNodeId); completeCount++; break; case IN_PROGRESS: + dataNodeGeneratorProgress.put(dataNodeId, clampProgress(resp.getProgress())); LOG.info( "[DataPartitionIntegrity] DataNode {} still generating DataPartitionTable", dataNodeId); break; default: failedDataNodes.add(dataNode); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); LOG.error( "[DataPartitionIntegrity] DataNode {} returned unknown error code: {}", dataNodeId, @@ -597,21 +618,23 @@ private Flow requestPartitionTablesHeartBeat() { dataNodeId, e.getMessage(), e); + dataNodeGeneratorProgress.put(dataNodeId, 1.0); completeCount++; } } else { + dataNodeGeneratorProgress.put(dataNodeId, 1.0); completeCount++; } } - if (completeCount >= allDataNodes.size()) { + if (completeCount >= targetDataNodes.size()) { setNextState(DataPartitionTableIntegrityCheckProcedureState.MERGE_PARTITION_TABLES); return Flow.HAS_MORE_STATE; } // Don't find any one data partition table generation task on all registered DataNodes, go back // to the REQUEST_PARTITION_TABLES step and re-execute - if (failedDataNodes.size() == allDataNodes.size()) { + if (countFailedTargetDataNodes(targetDataNodes) == targetDataNodes.size()) { delayRollbackNextState( DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES); return Flow.HAS_MORE_STATE; @@ -1118,19 +1141,28 @@ public void setFailedDataNodes(Set failedDataNodes) { this.failedDataNodes = failedDataNodes; } - public TShowRepairDataPartitionTableProgressResp getProgress(final ConfigNodeProcedureEnv env) { - final DataPartitionTableIntegrityCheckProcedureState currentState = getCurrentState(); - final String state = currentState == null ? "UNKNOWN" : currentState.name(); - final double progress = - currentState == null ? 0.0 : calculateProgressByState(env, currentState) * 100; - - return new TShowRepairDataPartitionTableProgressResp( - RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), state, progress) - .setMessage(String.format("DataPartitionTable integrity check progress: %.1f%%", progress)); + public TShowRepairDataPartitionTableProgressResp getProgress() { + try { + final DataPartitionTableIntegrityCheckProcedureState currentState = getCurrentState(); + final String state = getProgressStateName(currentState); + final double progress = + currentState == null ? 0.0 : calculateProgressByState(currentState) * 100; + + return new TShowRepairDataPartitionTableProgressResp( + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), state, progress) + .setMessage( + String.format("DataPartitionTable integrity check progress: %.1f%%", progress)); + } catch (Exception e) { + LOG.warn("Failed to show DataPartitionTable integrity check progress", e); + return new TShowRepairDataPartitionTableProgressResp( + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), + RepairDataPartitionTableProgressState.UNKNOWN.name(), + 0.0) + .setMessage("Failed to show DataPartitionTable integrity check progress"); + } } private double calculateProgressByState( - final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState currentState) { switch (currentState) { case COLLECT_EARLIEST_TIMESLOTS: @@ -1140,63 +1172,61 @@ private double calculateProgressByState( case REQUEST_PARTITION_TABLES: return 0.1; case REQUEST_PARTITION_TABLES_HEART_BEAT: - return 0.1 + 0.8 * calculateDataNodeGeneratorProgress(env); + return 0.1 + 0.8 * calculateDataNodeGeneratorProgress(); case MERGE_PARTITION_TABLES: return 0.95; case WRITE_PARTITION_TABLE_TO_CONSENSUS: return 0.99; default: + LOG.warn( + "Encountered unexpected DataPartitionTableIntegrityCheckProcedureState {} when showing progress", + currentState); return 0.0; } } - private double calculateDataNodeGeneratorProgress(final ConfigNodeProcedureEnv env) { - final LoadManager currentLoadManager = - loadManager == null ? env.getConfigManager().getLoadManager() : loadManager; + private String getProgressStateName( + final DataPartitionTableIntegrityCheckProcedureState currentState) { + if (currentState == null) { + return RepairDataPartitionTableProgressState.UNKNOWN.name(); + } + try { + return RepairDataPartitionTableProgressState.valueOf(currentState.name()).name(); + } catch (IllegalArgumentException e) { + LOG.warn( + "Unexpected DataPartitionTableIntegrityCheckProcedureState {} when showing progress", + currentState); + return RepairDataPartitionTableProgressState.UNKNOWN.name(); + } + } - final Set targetDataNodes = new HashSet<>(allDataNodes); - targetDataNodes.removeAll(skipDataNodes); - if (targetDataNodes.isEmpty()) { + private double calculateDataNodeGeneratorProgress() { + final Set currentTargetDataNodeIds = dataNodeGeneratorTargetDataNodeIds; + if (currentTargetDataNodeIds.isEmpty()) { return dataPartitionTables.isEmpty() ? 0.0 : 1.0; } double progressSum = 0.0; - for (TDataNodeConfiguration dataNode : targetDataNodes) { - final int dataNodeId = dataNode.getLocation().getDataNodeId(); - if (dataPartitionTables.containsKey(dataNodeId) - || failedDataNodes.contains(dataNode) - || !NodeStatus.Running.equals(currentLoadManager.getNodeStatus(dataNodeId))) { - progressSum += 1.0; - continue; - } + for (int dataNodeId : currentTargetDataNodeIds) { + progressSum += clampProgress(dataNodeGeneratorProgress.getOrDefault(dataNodeId, 0.0)); + } + return clampProgress(progressSum / currentTargetDataNodeIds.size()); + } - try { - Object response = - SyncDataNodeClientPool.getInstance() - .sendSyncRequestToDataNodeWithGivenRetry( - dataNode.getLocation().getInternalEndPoint(), - null, - CnToDnSyncRequestType.GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS, - MAX_RETRY_COUNT); - if (response instanceof TGetDataPartitionTableGeneratorProgressResp) { - TGetDataPartitionTableGeneratorProgressResp resp = - (TGetDataPartitionTableGeneratorProgressResp) response; - DataPartitionTableGeneratorState state = - DataPartitionTableGeneratorState.getStateByCode(resp.getErrorCode()); - if (state == DataPartitionTableGeneratorState.SUCCESS) { - progressSum += 1.0; - } else if (state == DataPartitionTableGeneratorState.IN_PROGRESS) { - progressSum += Math.max(0.0, Math.min(1.0, resp.getProgress())); - } - } - } catch (Exception e) { - LOG.warn( - "[DataPartitionIntegrity] Failed to get DataPartitionTable generation progress from DataNode[id={}]: {}", - dataNodeId, - e.getMessage()); - } + private void refreshDataNodeGeneratorTarget(final List targetDataNodes) { + final Set targetDataNodeIds = new HashSet<>(); + for (TDataNodeConfiguration dataNode : targetDataNodes) { + targetDataNodeIds.add(dataNode.getLocation().getDataNodeId()); } + dataNodeGeneratorProgress.keySet().retainAll(targetDataNodeIds); + dataNodeGeneratorTargetDataNodeIds = Collections.unmodifiableSet(targetDataNodeIds); + } + + private long countFailedTargetDataNodes(final List targetDataNodes) { + return targetDataNodes.stream().filter(failedDataNodes::contains).count(); + } - return progressSum / targetDataNodes.size(); + private double clampProgress(final double progress) { + return Math.max(0.0, Math.min(1.0, progress)); } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java index a0cc76fe5c9b1..4f2fcfe27519d 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedureTest.java @@ -23,10 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TNodeResource; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable; import org.apache.iotdb.confignode.procedure.Procedure; import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; +import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp; import org.apache.tsfile.utils.PublicBAOS; import org.junit.Assert; @@ -69,6 +71,17 @@ public void serDeTest() throws IOException { } } + @Test + public void progressStateTest() { + DataPartitionTableIntegrityCheckProcedure procedure = + new DataPartitionTableIntegrityCheckProcedure(); + TShowRepairDataPartitionTableProgressResp progress = procedure.getProgress(); + Assert.assertEquals( + RepairDataPartitionTableProgressState.COLLECT_EARLIEST_TIMESLOTS.name(), + progress.getState()); + Assert.assertTrue(progress.getProgress() >= 0.0 && progress.getProgress() <= 100.0); + } + private DataPartitionTableIntegrityCheckProcedure createTestProcedureWithData() { DataPartitionTableIntegrityCheckProcedure proc = new DataPartitionTableIntegrityCheckProcedure(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 0b9befed78166..61c0514df97f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -3409,13 +3409,14 @@ public TGenerateDataPartitionTableResp generateDataPartitionTable( try { // Check if there's already a task in the progress - if (currentGenerator != null - && currentGenerator.getStatus() == DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) { + final DataPartitionTableGenerator generator = currentGenerator; + if (generator != null + && generator.getStatus() == DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) { resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode()); resp.setMessage( String.format( "DataPartitionTable generation is already in the progress: %.1f%%", - currentGenerator.getProgress() * 100)); + generator.getProgress() * 100)); resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); return resp; } @@ -3437,17 +3438,18 @@ public TGenerateDataPartitionTableResp generateDataPartitionTable( ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(), new ThreadPoolExecutor.CallerRunsPolicy()); - currentGenerator = + final DataPartitionTableGenerator generator = new DataPartitionTableGenerator( partitionTableRecoverExecutor, req.getDatabases(), seriesSlotNum, seriesPartitionExecutorClass); + currentGenerator = generator; currentTaskId = System.currentTimeMillis(); // Start generation synchronously for now to return the data partition table immediately - currentGeneratorFuture = currentGenerator.startGeneration(); - parseGenerationStatus(resp); + currentGeneratorFuture = generator.startGeneration(); + parseGenerationStatus(resp, generator); } catch (Exception e) { LOGGER.error(DataNodeMiscMessages.FAILED_GENERATE_DATA_PARTITION_TABLE, e); resp.setStatus( @@ -3471,9 +3473,13 @@ public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartb try { // To resolve this situation that the DataNode is registered and didn't request // generateDataPartitionTable interface yet. - if (currentGeneratorFuture == null || currentGenerator == null) { + CompletableFuture generatorFuture = currentGeneratorFuture; + DataPartitionTableGenerator generator = currentGenerator; + if (generatorFuture == null || generator == null) { generateDataPartitionTable(req); - if (currentGeneratorFuture == null || currentGenerator == null) { + generatorFuture = currentGeneratorFuture; + generator = currentGenerator; + if (generatorFuture == null || generator == null) { resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); resp.setMessage(DataNodeMiscMessages.NO_DATA_PARTITION_TABLE_GENERATION_TASK_FOUND); resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); @@ -3481,15 +3487,20 @@ public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartb } } - currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS); + try { + generatorFuture.get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + parseGenerationStatus(resp, generator); + return resp; + } - parseGenerationStatus(resp); - if (currentGenerator.getStatus().equals(DataPartitionTableGenerator.TaskStatus.COMPLETED)) { + parseGenerationStatus(resp, generator); + if (generator.getStatus().equals(DataPartitionTableGenerator.TaskStatus.COMPLETED)) { boolean success = false; List databaseScopedDataPartitionTableList = new ArrayList<>(); Map dataPartitionTableMap = - currentGenerator.getDatabasePartitionTableMap(); + generator.getDatabasePartitionTableMap(); if (!dataPartitionTableMap.isEmpty()) { for (Map.Entry entry : dataPartitionTableMap.entrySet()) { String database = entry.getKey(); @@ -3527,8 +3538,9 @@ public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartb public TGetDataPartitionTableGeneratorProgressResp getDataPartitionTableGeneratorProgress() { TGetDataPartitionTableGeneratorProgressResp resp = new TGetDataPartitionTableGeneratorProgressResp(); + final DataPartitionTableGenerator generator = currentGenerator; - if (currentGenerator == null) { + if (generator == null) { resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); resp.setProgress(0.0); resp.setMessage("No DataPartitionTable generation task found"); @@ -3536,14 +3548,14 @@ public TGetDataPartitionTableGeneratorProgressResp getDataPartitionTableGenerato return resp; } - switch (currentGenerator.getStatus()) { + switch (generator.getStatus()) { case IN_PROGRESS: resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode()); - resp.setProgress(currentGenerator.getProgress()); + resp.setProgress(generator.getProgress()); resp.setMessage( String.format( "DataPartitionTable generation in progress: %.1f%%", - currentGenerator.getProgress() * 100)); + generator.getProgress() * 100)); resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); break; case COMPLETED: @@ -3554,45 +3566,41 @@ public TGetDataPartitionTableGeneratorProgressResp getDataPartitionTableGenerato break; case FAILED: resp.setErrorCode(DataPartitionTableGeneratorState.FAILED.getCode()); - resp.setProgress(currentGenerator.getProgress()); - resp.setMessage( - "DataPartitionTable generation failed: " + currentGenerator.getErrorMessage()); + resp.setProgress(generator.getProgress()); + resp.setMessage("DataPartitionTable generation failed: " + generator.getErrorMessage()); resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); break; default: resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); - resp.setProgress(currentGenerator.getProgress()); - resp.setMessage("Unknown task status: " + currentGenerator.getStatus()); + resp.setProgress(generator.getProgress()); + resp.setMessage("Unknown task status: " + generator.getStatus()); resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); break; } return resp; } - private void parseGenerationStatus(Object resp) { - if (currentGenerator == null) { - return; - } - - switch (currentGenerator.getStatus()) { + private void parseGenerationStatus(Object resp, DataPartitionTableGenerator generator) { + switch (generator.getStatus()) { case IN_PROGRESS: setResponseFields( resp, DataPartitionTableGeneratorState.IN_PROGRESS.getCode(), String.format( - "DataPartitionTable generation in progress: %.1f%%", - currentGenerator.getProgress() * 100), + "DataPartitionTable generation in progress: %.1f%%", generator.getProgress() * 100), + generator.getProgress(), RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); LOGGER.info( String.format( "DataPartitionTable generation with task ID: %s in progress: %.1f%%", - currentTaskId, currentGenerator.getProgress() * 100)); + currentTaskId, generator.getProgress() * 100)); break; case COMPLETED: setResponseFields( resp, DataPartitionTableGeneratorState.SUCCESS.getCode(), "DataPartitionTable generation completed successfully", + 1.0, RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); LOGGER.info(DataNodeMiscMessages.DATA_PARTITION_TABLE_COMPLETED, currentTaskId); break; @@ -3600,7 +3608,8 @@ private void parseGenerationStatus(Object resp) { setResponseFields( resp, DataPartitionTableGeneratorState.FAILED.getCode(), - "DataPartitionTable generation failed: " + currentGenerator.getErrorMessage(), + "DataPartitionTable generation failed: " + generator.getErrorMessage(), + generator.getProgress(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); LOGGER.info(DataNodeMiscMessages.DATA_PARTITION_TABLE_FAILED, currentTaskId); break; @@ -3608,14 +3617,16 @@ private void parseGenerationStatus(Object resp) { setResponseFields( resp, DataPartitionTableGeneratorState.UNKNOWN.getCode(), - "Unknown task status: " + currentGenerator.getStatus(), + "Unknown task status: " + generator.getStatus(), + generator.getProgress(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); LOGGER.info(DataNodeMiscMessages.DATA_PARTITION_TABLE_FAILED, currentTaskId); break; } } - private void setResponseFields(Object resp, int errorCode, String message, TSStatus status) { + private void setResponseFields( + Object resp, int errorCode, String message, double progress, TSStatus status) { if (resp instanceof TGenerateDataPartitionTableResp) { ((TGenerateDataPartitionTableResp) resp).setErrorCode(errorCode); ((TGenerateDataPartitionTableResp) resp).setMessage(message); @@ -3623,6 +3634,7 @@ private void setResponseFields(Object resp, int errorCode, String message, TSSta } else if (resp instanceof TGenerateDataPartitionTableHeartbeatResp) { ((TGenerateDataPartitionTableHeartbeatResp) resp).setErrorCode(errorCode); ((TGenerateDataPartitionTableHeartbeatResp) resp).setMessage(message); + ((TGenerateDataPartitionTableHeartbeatResp) resp).setProgress(progress); ((TGenerateDataPartitionTableHeartbeatResp) resp).setStatus(status); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java index cccdfb5d127be..398797ffca659 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGeneratorTest.java @@ -1061,6 +1061,19 @@ public void testShowRepairDataPartitionTableProgress() { StatementGenerator.createStatement( "SHOW REPAIR DATA PARTITION TABLE PROGRESS;", ZonedDateTime.now().getOffset()); assertEquals(StatementType.SHOW_REPAIR_DATA_PARTITION_TABLE_PROGRESS, statement.getType()); + + QueryStatement queryStatement = + (QueryStatement) + StatementGenerator.createStatement( + "SELECT progress FROM root.sg.d1;", ZonedDateTime.now().getOffset()); + assertEquals( + "progress", + queryStatement + .getSelectComponent() + .getResultColumns() + .get(0) + .getExpression() + .getExpressionString()); } // TODO: add more tests diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/RepairDataPartitionTableProgressState.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/RepairDataPartitionTableProgressState.java new file mode 100644 index 0000000000000..7afa3cbbdc8df --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/RepairDataPartitionTableProgressState.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.enums; + +public enum RepairDataPartitionTableProgressState { + UNKNOWN, + IDLE, + COLLECT_EARLIEST_TIMESLOTS, + ANALYZE_MISSING_PARTITIONS, + REQUEST_PARTITION_TABLES, + REQUEST_PARTITION_TABLES_HEART_BEAT, + MERGE_PARTITION_TABLES, + WRITE_PARTITION_TABLE_TO_CONSENSUS +} diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index cfd6d047168a6..aaa5c0a8627a0 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -749,6 +749,7 @@ struct TGenerateDataPartitionTableHeartbeatResp { 2: required i32 errorCode 3: optional string message 4: optional list databaseScopedDataPartitionTables + 5: optional double progress } struct TGetDataPartitionTableGeneratorProgressResp { From 88b8521d6be55d0d161e665814a42fcdb13c6881 Mon Sep 17 00:00:00 2001 From: libo Date: Tue, 30 Jun 2026 16:27:26 +0800 Subject: [PATCH 3/5] Add the "PROGRESS" expression --- .../iotdb/db/qp/sql/IdentifierParser.g4 | 3 +- .../confignode/manager/ConfigManager.java | 4 ++- .../manager/partition/PartitionManager.java | 5 ++- ...PartitionTableIntegrityCheckProcedure.java | 13 ++++++-- ...wRepairDataPartitionTableProgressTask.java | 3 +- ...RepairDataPartitionTableProgressState.java | 31 +++++++++++++++++++ 6 files changed, 53 insertions(+), 6 deletions(-) create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/RepairDataPartitionTableProgressState.java diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 index e96d84e892c8c..ffa2b5108b162 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 @@ -190,6 +190,7 @@ keyWords | PRIVILEGE_VALUE | PROCESSLIST | PROCESSOR + | PROGRESS | PROPERTY | PRUNE | QUERIES @@ -298,4 +299,4 @@ keyWords | OPTION | INF | CURRENT_TIMESTAMP - ; \ No newline at end of file + ; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 4f8c117358382..90766221ddde0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -47,6 +47,7 @@ import org.apache.iotdb.commons.conf.ConfigurationFileUtils; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.conf.TrimProperties; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.MeasurementPath; @@ -1185,7 +1186,8 @@ public TSStatus dataPartitionTableIntegrityCheck() { public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() { TSStatus status = confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return new TShowRepairDataPartitionTableProgressResp(status, "UNKNOWN", 0.0) + return new TShowRepairDataPartitionTableProgressResp( + status, RepairDataPartitionTableProgressState.UNKNOWN.name(), 0.0) .setMessage(status.getMessage()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 34865e1aa2e81..9b6b153160d70 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -32,6 +32,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; @@ -548,7 +549,9 @@ public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTablePro .orElseGet( () -> new TShowRepairDataPartitionTableProgressResp( - RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), "IDLE", 100.0) + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), + RepairDataPartitionTableProgressState.IDLE.name(), + 100.0) .setMessage("No running DataPartitionTable integrity check procedure")); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java index c181ed62aba12..c6b545098c3b3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java @@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState; +import org.apache.iotdb.commons.enums.RepairDataPartitionTableProgressState; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable; import org.apache.iotdb.commons.partition.SeriesPartitionTable; @@ -1120,15 +1121,23 @@ public void setFailedDataNodes(Set failedDataNodes) { public TShowRepairDataPartitionTableProgressResp getProgress(final ConfigNodeProcedureEnv env) { final DataPartitionTableIntegrityCheckProcedureState currentState = getCurrentState(); - final String state = currentState == null ? "UNKNOWN" : currentState.name(); + final RepairDataPartitionTableProgressState state = getProgressState(currentState); final double progress = currentState == null ? 0.0 : calculateProgressByState(env, currentState) * 100; return new TShowRepairDataPartitionTableProgressResp( - RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), state, progress) + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), state.name(), progress) .setMessage(String.format("DataPartitionTable integrity check progress: %.1f%%", progress)); } + private RepairDataPartitionTableProgressState getProgressState( + final DataPartitionTableIntegrityCheckProcedureState currentState) { + if (currentState == null) { + return RepairDataPartitionTableProgressState.UNKNOWN; + } + return RepairDataPartitionTableProgressState.valueOf(currentState.name()); + } + private double calculateProgressByState( final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState currentState) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/ShowRepairDataPartitionTableProgressTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/ShowRepairDataPartitionTableProgressTask.java index 64b929be4a311..d0ccbaa9f3423 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/ShowRepairDataPartitionTableProgressTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/ShowRepairDataPartitionTableProgressTask.java @@ -59,7 +59,8 @@ public static void buildTsBlock( builder.getColumnBuilder(1).writeDouble(resp.getProgress()); builder .getColumnBuilder(2) - .writeBinary(new Binary(resp.getMessage(), TSFileConfig.STRING_CHARSET)); + .writeBinary( + new Binary(resp.isSetMessage() ? resp.getMessage() : "", TSFileConfig.STRING_CHARSET)); builder.declarePosition(); future.set( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/RepairDataPartitionTableProgressState.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/RepairDataPartitionTableProgressState.java new file mode 100644 index 0000000000000..7afa3cbbdc8df --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/RepairDataPartitionTableProgressState.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.enums; + +public enum RepairDataPartitionTableProgressState { + UNKNOWN, + IDLE, + COLLECT_EARLIEST_TIMESLOTS, + ANALYZE_MISSING_PARTITIONS, + REQUEST_PARTITION_TABLES, + REQUEST_PARTITION_TABLES_HEART_BEAT, + MERGE_PARTITION_TABLES, + WRITE_PARTITION_TABLE_TO_CONSENSUS +} From 7e1b27f01eab19b6c184b0874926d8d2d32fa625 Mon Sep 17 00:00:00 2001 From: libo Date: Tue, 30 Jun 2026 17:13:09 +0800 Subject: [PATCH 4/5] Resolve duplicate variable name error --- .../thrift/impl/DataNodeInternalRPCServiceImpl.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 61c0514df97f3..08fe54cf10dc1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -3409,14 +3409,14 @@ public TGenerateDataPartitionTableResp generateDataPartitionTable( try { // Check if there's already a task in the progress - final DataPartitionTableGenerator generator = currentGenerator; - if (generator != null - && generator.getStatus() == DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) { + final DataPartitionTableGenerator runningGenerator = currentGenerator; + if (runningGenerator != null + && runningGenerator.getStatus() == DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) { resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode()); resp.setMessage( String.format( "DataPartitionTable generation is already in the progress: %.1f%%", - generator.getProgress() * 100)); + runningGenerator.getProgress() * 100)); resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); return resp; } From 0982711d7f50e723c25b4abb2615bdcc50f1c3a0 Mon Sep 17 00:00:00 2001 From: libo Date: Tue, 30 Jun 2026 17:49:59 +0800 Subject: [PATCH 5/5] Remove the duplicate one --- .../main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 1 - 1 file changed, 1 deletion(-) diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 index a10965ca8f81b..135b674554b52 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4 @@ -191,7 +191,6 @@ keyWords | PROCESSLIST | PROGRESS | PROCESSOR - | PROGRESS | PROPERTY | PRUNE | QUERIES