diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index 69a69c76db71a..9a5397b4353d6 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -51,6 +51,18 @@ public final class ConfigNodeMessages { "Can't force logWriter for ConfigNode SimpleConsensus mode"; public static final String CAN_T_SERIALIZE_CURRENT_CONFIGPHYSICALPLAN_FOR_CONFIGNODE_SIMPLECONSENSUS_MODE = "Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode"; + public static final String PERSIST_CONFIGNODE_SIMPLECONSENSUS_LOG_FAILED = + "Persist ConfigNode SimpleConsensus log failed: "; + public static final String PERSIST_CURRENT_CONFIGPHYSICALPLAN_FOR_CONFIGNODE_SIMPLECONSENSUS_MODE_FAILED = + "Persist current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode failed"; + public static final String FAILED_TO_ROLLBACK_PERSISTED_CONFIGNODE_SIMPLECONSENSUS_LOG = + "Failed to rollback persisted ConfigNode SimpleConsensus log. The persisted plan may be replayed after restart."; + public static final String ROLLBACK_FAILED_CONFIGPHYSICALPLAN_FOR_CONFIGNODE_SIMPLECONSENSUS_MODE_FAILED = + "Rollback failed ConfigPhysicalPlan for ConfigNode SimpleConsensus mode failed, planType: {}, logFile: {}, truncateOffset: {}, endIndexBeforeWrite: {}"; + public static final String SEAL_RECOVERED_CONFIGNODE_SIMPLECONSENSUS_LOG_FAILED = + "Seal recovered ConfigNode SimpleConsensus log failed: {}"; + public static final String SIMPLECONSENSUS_LOG_WRITER_IS_NOT_INITIALIZED = + "SimpleConsensus log writer is not initialized."; public static final String CAN_T_START_CONFIGNODE_CONSENSUS_GROUP = "Can't start ConfigNode consensus group!"; public static final String CHANGE_REGIONS_LEADER_ERROR_ON_DATE_NODE = diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index 6bf3da0e68b19..7aa425a09da24 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -50,6 +50,18 @@ public final class ConfigNodeMessages { "Can't force logWriter for ConfigNode SimpleConsensus mode"; public static final String CAN_T_SERIALIZE_CURRENT_CONFIGPHYSICALPLAN_FOR_CONFIGNODE_SIMPLECONSENSUS_MODE = "Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode"; + public static final String PERSIST_CONFIGNODE_SIMPLECONSENSUS_LOG_FAILED = + "Persist ConfigNode SimpleConsensus log failed: "; + public static final String PERSIST_CURRENT_CONFIGPHYSICALPLAN_FOR_CONFIGNODE_SIMPLECONSENSUS_MODE_FAILED = + "Persist current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode failed"; + public static final String FAILED_TO_ROLLBACK_PERSISTED_CONFIGNODE_SIMPLECONSENSUS_LOG = + "Failed to rollback persisted ConfigNode SimpleConsensus log. The persisted plan may be replayed after restart."; + public static final String ROLLBACK_FAILED_CONFIGPHYSICALPLAN_FOR_CONFIGNODE_SIMPLECONSENSUS_MODE_FAILED = + "Rollback failed ConfigPhysicalPlan for ConfigNode SimpleConsensus mode failed, planType: {}, logFile: {}, truncateOffset: {}, endIndexBeforeWrite: {}"; + public static final String SEAL_RECOVERED_CONFIGNODE_SIMPLECONSENSUS_LOG_FAILED = + "Seal recovered ConfigNode SimpleConsensus log failed: {}"; + public static final String SIMPLECONSENSUS_LOG_WRITER_IS_NOT_INITIALIZED = + "SimpleConsensus log writer is not initialized."; public static final String CAN_T_START_CONFIGNODE_CONSENSUS_GROUP = "Can't start ConfigNode consensus group!"; public static final String CHANGE_REGIONS_LEADER_ERROR_ON_DATE_NODE = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index ad0a82bcf56b3..683d2bc5f8b1f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.auth.AuthException; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; -import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.file.SystemFileFactory; @@ -54,16 +53,18 @@ import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -156,16 +157,32 @@ public TSStatus write(IConsensusRequest request) { /** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */ protected TSStatus write(ConfigPhysicalPlan plan) { + SimpleConsensusPersistResult persistResult = null; + if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { + persistResult = persistPlanForSimpleConsensus(plan); + final TSStatus persistStatus = persistResult.status; + if (persistStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return persistStatus; + } + } + TSStatus result; try { result = executor.executeNonQueryPlan(plan); - } catch (UnknownPhysicalPlanTypeException e) { + } catch (UnknownPhysicalPlanTypeException | RuntimeException e) { LOGGER.error(ConfigNodeMessages.EXECUTE_NON_QUERY_PLAN_FAILED, e); result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); } - if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { - writeLogForSimpleConsensus(plan); + if (persistResult != null + && result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && !rollbackFailedPlanForSimpleConsensus(plan, persistResult)) { + final String rollbackFailureMessage = + ConfigNodeMessages.FAILED_TO_ROLLBACK_PERSISTED_CONFIGNODE_SIMPLECONSENSUS_LOG; + result.setMessage( + Optional.ofNullable(result.getMessage()) + .map(message -> message + " " + rollbackFailureMessage) + .orElse(rollbackFailureMessage)); } if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -232,7 +249,6 @@ public boolean takeSnapshot(File snapshotDir) { PipeConfigNodeAgent.runtime() .listener() .tryListenToSnapshots(ConfigNodeSnapshotParser.getSnapshots()); - return true; } catch (IOException e) { if (PipeConfigNodeAgent.runtime().listener().isOpened()) { LOGGER.warn( @@ -241,6 +257,7 @@ public boolean takeSnapshot(File snapshotDir) { e); } } + return true; } return false; } @@ -250,9 +267,12 @@ public boolean loadSnapshot(final File latestSnapshotRootDir) { // The boolean result must reflect whether the ConfigRegion state-machine data was loaded, so // callers (e.g. the AddPeer flow) can detect a real failure. The pipe-listener recomputation // below is best-effort post-processing: a failure there is logged but must NOT be reported as a - // snapshot-load failure, otherwise it would (e.g.) abort ConfigNode (re)initialization on what - // is actually a healthy data load. + // snapshot-load failure. final boolean loadSucceeded = executor.loadSnapshot(latestSnapshotRootDir); + if (!loadSucceeded) { + return false; + } + try { // We recompute the snapshot for pipe listener when loading snapshot // to recover the newest snapshot in cache @@ -266,7 +286,7 @@ public boolean loadSnapshot(final File latestSnapshotRootDir) { e); } } - return loadSucceeded; + return true; } @Override @@ -558,6 +578,9 @@ public void start() { @Override public void stop() { + if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { + closeSimpleLogWriter(); + } // Shutdown leader related service for config pipe PipeConfigNodeAgent.runtime().notifyLeaderUnavailable(); } @@ -567,60 +590,84 @@ public boolean isReadOnly() { return CommonDescriptor.getInstance().getConfig().isReadOnly(); } - private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) { - if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) { - try { - simpleLogWriter.force(); - File completedFilePath = new File(FILE_PATH + startIndex + "_" + endIndex); - Files.move( - simpleLogFile.toPath(), completedFilePath.toPath(), StandardCopyOption.ATOMIC_MOVE); - } catch (IOException e) { - LOGGER.error( - ConfigNodeMessages.CAN_T_FORCE_LOGWRITER_FOR_CONFIGNODE_SIMPLECONSENSUS_MODE, e); + private SimpleConsensusPersistResult persistPlanForSimpleConsensus(ConfigPhysicalPlan plan) { + File persistedLogFile = null; + long logFileSizeBeforeWrite = 0; + int endIndexBeforeWrite = endIndex; + try { + if (simpleLogWriter == null || simpleLogFile == null) { + throw new IOException(ConfigNodeMessages.SIMPLECONSENSUS_LOG_WRITER_IS_NOT_INITIALIZED); } - for (int retry = 0; retry < 5; retry++) { - try { - simpleLogWriter.close(); - } catch (IOException e) { - LOGGER.warn( - ConfigNodeMessages.CAN_T_CLOSE_STANDALONELOG_FOR_CONFIGNODE_SIMPLECONSENSUS_MODE - + "filePath: {}, retry: {}", - simpleLogFile.getAbsolutePath(), - retry); - try { - // Sleep 1s and retry - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e2) { - Thread.currentThread().interrupt(); - LOGGER.warn( - ConfigNodeMessages.UNEXPECTED_INTERRUPTION_DURING_THE_CLOSE_METHOD_OF_LOGWRITER); - } - continue; - } - break; + + if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) { + rollSimpleConsensusLogFile(); } - startIndex = endIndex + 1; - createLogFile(startIndex); - } - try { + persistedLogFile = simpleLogFile; + logFileSizeBeforeWrite = persistedLogFile.length(); + endIndexBeforeWrite = endIndex; + ByteBuffer buffer = plan.serializeToByteBuffer(); buffer.position(buffer.limit()); simpleLogWriter.write(buffer); + simpleLogWriter.force(); endIndex = endIndex + 1; } catch (Exception e) { LOGGER.error( ConfigNodeMessages - .CAN_T_SERIALIZE_CURRENT_CONFIGPHYSICALPLAN_FOR_CONFIGNODE_SIMPLECONSENSUS_MODE, + .PERSIST_CURRENT_CONFIGPHYSICALPLAN_FOR_CONFIGNODE_SIMPLECONSENSUS_MODE_FAILED, e); + return SimpleConsensusPersistResult.failure( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage( + ConfigNodeMessages.PERSIST_CONFIGNODE_SIMPLECONSENSUS_LOG_FAILED + + e.getMessage())); } + return SimpleConsensusPersistResult.success( + persistedLogFile, logFileSizeBeforeWrite, endIndexBeforeWrite); + } + + private boolean rollbackFailedPlanForSimpleConsensus( + ConfigPhysicalPlan plan, SimpleConsensusPersistResult persistResult) { + closeSimpleLogWriter(); + try (FileOutputStream outputStream = new FileOutputStream(persistResult.logFile, true); + FileChannel channel = outputStream.getChannel()) { + channel.truncate(persistResult.logFileSizeBeforeWrite); + channel.force(true); + simpleLogFile = persistResult.logFile; + simpleLogWriter = new LogWriter(simpleLogFile, false); + endIndex = persistResult.endIndexBeforeWrite; + return true; + } catch (IOException e) { + LOGGER.error( + ConfigNodeMessages + .ROLLBACK_FAILED_CONFIGPHYSICALPLAN_FOR_CONFIGNODE_SIMPLECONSENSUS_MODE_FAILED, + plan.getType(), + persistResult.logFile, + persistResult.logFileSizeBeforeWrite, + persistResult.endIndexBeforeWrite, + e); + return false; + } + } + + private void rollSimpleConsensusLogFile() throws IOException { + simpleLogWriter.force(); + closeSimpleLogWriter(); + Files.move( + simpleLogFile.toPath(), + new File(FILE_PATH + startIndex + "_" + endIndex).toPath(), + StandardCopyOption.ATOMIC_MOVE); + startIndex = endIndex + 1; + createLogFile(startIndex); } private void initStandAloneConfigNode() { File dir = new File(CURRENT_FILE_DIR); dir.mkdirs(); String[] list = new File(CURRENT_FILE_DIR).list(); + endIndex = 0; if (list != null && list.length != 0) { Arrays.sort(list, Comparator.comparingLong(ConfigRegionStateMachine::parseEndIndex)); for (String logFileName : list) { @@ -638,7 +685,7 @@ private void initStandAloneConfigNode() { continue; } - startIndex = endIndex; + final int recoveredStartIndex = parseStartIndex(logFileName); while (logReader.hasNext()) { endIndex++; // Read and re-serialize the PhysicalPlan @@ -656,34 +703,13 @@ private void initStandAloneConfigNode() { } } logReader.close(); - } - } else { - startIndex = 0; - endIndex = 0; - } - startIndex = startIndex + 1; - createLogFile(endIndex); - - ScheduledExecutorService simpleConsensusThread = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( - ThreadName.CONFIG_NODE_SIMPLE_CONSENSUS_WAL_FLUSH.getName()); - ScheduledExecutorUtil.safelyScheduleWithFixedDelay( - simpleConsensusThread, - this::flushWALForSimpleConsensus, - 0, - CONF.getForceWalPeriodForConfigNodeSimpleInMs(), - TimeUnit.MILLISECONDS); - } - - private void flushWALForSimpleConsensus() { - if (simpleLogWriter != null) { - try { - simpleLogWriter.force(); - } catch (IOException e) { - LOGGER.error( - ConfigNodeMessages.CAN_T_FORCE_LOGWRITER_FOR_CONFIGNODE_FLUSHWALFORSIMPLECONSENSUS, e); + if (isInProgressLogFile(logFileName)) { + sealRecoveredInProgressLogFile(logFile, recoveredStartIndex, endIndex); + } } } + startIndex = endIndex + 1; + createLogFile(startIndex); } private void createLogFile(int startIndex) { @@ -706,6 +732,54 @@ private void createLogFile(int startIndex) { } } + private void sealRecoveredInProgressLogFile( + File logFile, int recoveredStartIndex, int recoveredEndIndex) { + try { + if (recoveredStartIndex > recoveredEndIndex) { + Files.deleteIfExists(logFile.toPath()); + return; + } + Files.move( + logFile.toPath(), + new File(FILE_PATH + recoveredStartIndex + "_" + recoveredEndIndex).toPath(), + StandardCopyOption.ATOMIC_MOVE); + } catch (IOException e) { + LOGGER.warn( + ConfigNodeMessages.SEAL_RECOVERED_CONFIGNODE_SIMPLECONSENSUS_LOG_FAILED, logFile, e); + } + } + + private boolean isInProgressLogFile(String filename) { + return filename.startsWith(LOG_INPROGRESS_FILE_PREFIX); + } + + private void closeSimpleLogWriter() { + if (simpleLogWriter == null) { + return; + } + for (int retry = 0; retry < 5; retry++) { + try { + simpleLogWriter.close(); + simpleLogWriter = null; + return; + } catch (IOException e) { + LOGGER.warn( + ConfigNodeMessages.CAN_T_CLOSE_STANDALONELOG_FOR_CONFIGNODE_SIMPLECONSENSUS_MODE + + "filePath: {}, retry: {}", + simpleLogFile == null ? null : simpleLogFile.getAbsolutePath(), + retry); + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); + LOGGER.warn( + ConfigNodeMessages.UNEXPECTED_INTERRUPTION_DURING_THE_CLOSE_METHOD_OF_LOGWRITER); + break; + } + } + } + } + private static long parseEndIndex(String filename) { final String endIndexString; if (filename.startsWith(LOG_INPROGRESS_FILE_PREFIX)) { @@ -720,15 +794,36 @@ private static long parseEndIndex(String filename) { return 0; } - if (endIndexString.isEmpty()) { + return isDigits(endIndexString) ? Long.parseLong(endIndexString) : 0; + } + + static int parseStartIndex(String filename) { + final String startIndexString; + if (filename.startsWith(LOG_INPROGRESS_FILE_PREFIX)) { + startIndexString = filename.substring(LOG_INPROGRESS_FILE_PREFIX.length()); + } else if (filename.startsWith(LOG_FILE_PREFIX)) { + final int lastSeparatorIndex = filename.lastIndexOf('_'); + if (lastSeparatorIndex <= LOG_FILE_PREFIX.length()) { + return 0; + } + startIndexString = filename.substring(LOG_FILE_PREFIX.length(), lastSeparatorIndex); + } else { return 0; } - for (int i = 0; i < endIndexString.length(); i++) { - if (!Character.isDigit(endIndexString.charAt(i))) { - return 0; + + return isDigits(startIndexString) ? Integer.parseInt(startIndexString) : 0; + } + + private static boolean isDigits(String value) { + if (value.isEmpty()) { + return false; + } + for (int i = 0; i < value.length(); i++) { + if (!Character.isDigit(value.charAt(i))) { + return false; } } - return Long.parseLong(endIndexString); + return true; } /** @@ -753,4 +848,33 @@ private void run() { startup.run(); } } + + private static class SimpleConsensusPersistResult { + + private final TSStatus status; + private final File logFile; + private final long logFileSizeBeforeWrite; + private final int endIndexBeforeWrite; + + private SimpleConsensusPersistResult( + TSStatus status, File logFile, long logFileSizeBeforeWrite, int endIndexBeforeWrite) { + this.status = status; + this.logFile = logFile; + this.logFileSizeBeforeWrite = logFileSizeBeforeWrite; + this.endIndexBeforeWrite = endIndexBeforeWrite; + } + + private static SimpleConsensusPersistResult success( + File logFile, long logFileSizeBeforeWrite, int endIndexBeforeWrite) { + return new SimpleConsensusPersistResult( + new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), + logFile, + logFileSizeBeforeWrite, + endIndexBeforeWrite); + } + + private static SimpleConsensusPersistResult failure(TSStatus status) { + return new SimpleConsensusPersistResult(status, null, 0, 0); + } + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 9a27f8f07acde..7041662381134 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.node; import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; @@ -46,6 +47,7 @@ import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan; import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan; import org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan; @@ -329,30 +331,32 @@ public DataSet registerDataNode(TDataNodeRegisterReq req) { DataNodeRegisterResp resp = new DataNodeRegisterResp(); resp.setConfigNodeList(getRegisteredConfigNodes()); - // Create a new DataNodeHeartbeatCache and force update NodeStatus int dataNodeId = nodeInfo.generateNextNodeId(); - getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode, dataNodeId); - // TODO: invoke a force heartbeat to update new DataNode's status immediately RegisterDataNodePlan registerDataNodePlan = new RegisterDataNodePlan(req.getDataNodeConfiguration()); // Register new DataNode registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId); - try { - getConsensusManager().write(registerDataNodePlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus registerStatus = writeConfigPhysicalPlan(registerDataNodePlan); + if (!isConsensusWriteSuccessful(registerStatus)) { + resp.setStatus(registerStatus); + return resp; } // update datanode's versionInfo UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), dataNodeId); - try { - getConsensusManager().write(updateVersionInfoPlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus updateVersionStatus = writeConfigPhysicalPlan(updateVersionInfoPlan); + if (!isConsensusWriteSuccessful(updateVersionStatus)) { + resp.setStatus( + rollbackDataNodeRegistration( + registerDataNodePlan.getDataNodeConfiguration().getLocation(), updateVersionStatus)); + return resp; } + // Create a new DataNodeHeartbeatCache. The heartbeat service will refresh NodeStatus shortly. + getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode, dataNodeId); + // Bind DataNode metrics PartitionMetrics.bindDataNodePartitionMetricsWhenUpdate( MetricService.getInstance(), configManager, dataNodeId); @@ -360,7 +364,10 @@ public DataSet registerDataNode(TDataNodeRegisterReq req) { // Adjust the maximum RegionGroup number of each Database getClusterSchemaManager().adjustMaxRegionGroupNum(); - resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION); + resp.setStatus( + buildSuccessStatus( + ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION.getMessage(), + registerStatus.getMessage())); resp.setDataNodeId( registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId()); resp.setRuntimeConfiguration(getRuntimeConfiguration(dataNodeId)); @@ -388,10 +395,10 @@ public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) { // Update DataNodeConfiguration when modified during restart UpdateDataNodePlan updateDataNodePlan = new UpdateDataNodePlan(req.getDataNodeConfiguration()); - try { - getConsensusManager().write(updateDataNodePlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus updateStatus = writeConfigPhysicalPlan(updateDataNodePlan); + if (!isConsensusWriteSuccessful(updateStatus)) { + resp.setStatus(updateStatus); + return resp; } } TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId); @@ -399,14 +406,14 @@ public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) { // Update versionInfo when modified during restart UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId); - try { - getConsensusManager().write(updateVersionInfoPlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan); + if (!isConsensusWriteSuccessful(updateStatus)) { + resp.setStatus(updateStatus); + return resp; } } - resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART); + resp.setStatus(buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage())); resp.setRuntimeConfiguration(getRuntimeConfiguration(nodeId)); resp.setCorrectConsensusGroups(getPartitionManager().getAllReplicaSets(nodeId)); @@ -484,13 +491,12 @@ public TSStatus updateConfigNodeIfNecessary(int configNodeId, TNodeVersionInfo v // Update versionInfo when modified during restart UpdateVersionInfoPlan updateConfigNodePlan = new UpdateVersionInfoPlan(versionInfo, configNodeId); - try { - return getConsensusManager().write(updateConfigNodePlan); - } catch (ConsensusException e) { - return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode()); + TSStatus updateStatus = writeConfigPhysicalPlan(updateConfigNodePlan); + if (!isConsensusWriteSuccessful(updateStatus)) { + return updateStatus; } } - return ClusterNodeStartUtils.ACCEPT_NODE_RESTART; + return buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage()); } public List getRegisteredAINodeInfoList() { @@ -536,27 +542,37 @@ public synchronized DataSet registerAINode(TAINodeRegisterReq req) { } int aiNodeId = nodeInfo.generateNextNodeId(); - getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.AINode, aiNodeId); RegisterAINodePlan registerAINodePlan = new RegisterAINodePlan(req.getAiNodeConfiguration()); // Register new DataNode registerAINodePlan.getAINodeConfiguration().getLocation().setAiNodeId(aiNodeId); - try { - getConsensusManager().write(registerAINodePlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus registerStatus = writeConfigPhysicalPlan(registerAINodePlan); + if (!isConsensusWriteSuccessful(registerStatus)) { + AINodeRegisterResp resp = new AINodeRegisterResp(); + resp.setConfigNodeList(getRegisteredConfigNodes()); + resp.setStatus(registerStatus); + return resp; } // update datanode's versionInfo UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), aiNodeId); - try { - getConsensusManager().write(updateVersionInfoPlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus updateVersionStatus = writeConfigPhysicalPlan(updateVersionInfoPlan); + if (!isConsensusWriteSuccessful(updateVersionStatus)) { + AINodeRegisterResp resp = new AINodeRegisterResp(); + resp.setConfigNodeList(getRegisteredConfigNodes()); + resp.setStatus( + rollbackAINodeRegistration( + registerAINodePlan.getAINodeConfiguration().getLocation(), updateVersionStatus)); + return resp; } + getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.AINode, aiNodeId); + AINodeRegisterResp resp = new AINodeRegisterResp(); - resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION); + resp.setStatus( + buildSuccessStatus( + ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION.getMessage(), + registerStatus.getMessage())); resp.setConfigNodeList(getRegisteredConfigNodes()); resp.setAINodeId(registerAINodePlan.getAINodeConfiguration().getLocation().getAiNodeId()); return resp; @@ -594,10 +610,12 @@ public TAINodeRestartResp updateAINodeIfNecessary(TAINodeRestartReq req) { if (!req.getAiNodeConfiguration().equals(aiNodeConfiguration)) { // Update AINodeConfiguration when modified during restart UpdateAINodePlan updateAINodePlan = new UpdateAINodePlan(req.getAiNodeConfiguration()); - try { - getConsensusManager().write(updateAINodePlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus updateStatus = writeConfigPhysicalPlan(updateAINodePlan); + if (!isConsensusWriteSuccessful(updateStatus)) { + TAINodeRestartResp resp = new TAINodeRestartResp(); + resp.setConfigNodeList(getRegisteredConfigNodes()); + resp.setStatus(updateStatus); + return resp; } } TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId); @@ -605,15 +623,17 @@ public TAINodeRestartResp updateAINodeIfNecessary(TAINodeRestartReq req) { // Update versionInfo when modified during restart UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId); - try { - getConsensusManager().write(updateVersionInfoPlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan); + if (!isConsensusWriteSuccessful(updateStatus)) { + TAINodeRestartResp resp = new TAINodeRestartResp(); + resp.setConfigNodeList(getRegisteredConfigNodes()); + resp.setStatus(updateStatus); + return resp; } } TAINodeRestartResp resp = new TAINodeRestartResp(); - resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART); + resp.setStatus(buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage())); resp.setConfigNodeList(getRegisteredConfigNodes()); return resp; } @@ -898,17 +918,15 @@ public List getRegisteredConfigNodeInfo4Infor public void applyConfigNode( TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) { ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation); - try { - getConsensusManager().write(applyConfigNodePlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); - } + ensureConsensusWriteSuccessful( + writeConfigPhysicalPlan(applyConfigNodePlan), + String.format("apply ConfigNode %s", configNodeLocation)); UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(versionInfo, configNodeLocation.getConfigNodeId()); - try { - getConsensusManager().write(updateVersionInfoPlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + final TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan); + if (!isConsensusWriteSuccessful(updateStatus)) { + throw new IllegalStateException( + rollbackConfigNodeRegistration(configNodeLocation, updateStatus).getMessage()); } } @@ -1314,6 +1332,144 @@ public TDataNodeLocation getLowestLoadDataNode(Set nodes) { return getRegisteredDataNode(dataNodeId).getLocation(); } + private TSStatus writeConfigPhysicalPlan(ConfigPhysicalPlan plan) { + try { + return getConsensusManager().write(plan); + } catch (ConsensusException e) { + LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(e.getMessage()); + } + } + + private boolean isConsensusWriteSuccessful(TSStatus status) { + return status != null && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode(); + } + + private TSStatus rollbackDataNodeRegistration( + TDataNodeLocation dataNodeLocation, TSStatus versionUpdateStatus) { + final TSStatus rollbackStatus = + writeConfigPhysicalPlan( + new RemoveDataNodePlan(Collections.singletonList(dataNodeLocation))); + final String failureMessage = + String.format( + "Failed to persist version info for DataNode %d: %s", + dataNodeLocation.getDataNodeId(), describeStatus(versionUpdateStatus)); + if (isConsensusWriteSuccessful(rollbackStatus)) { + return buildStatus( + versionUpdateStatus.getCode(), + failureMessage, + "The registration has been rolled back. Please retry the registration."); + } + + LOGGER.error( + "Failed to roll back DataNode registration {} after version info persistence failure. " + + "versionUpdateStatus: {}, rollbackStatus: {}", + dataNodeLocation, + versionUpdateStatus, + rollbackStatus); + return buildStatus( + rollbackStatus.getCode(), + failureMessage, + String.format("The registration rollback also failed: %s", describeStatus(rollbackStatus)), + "Manual cleanup may be required before retrying the registration."); + } + + private TSStatus rollbackAINodeRegistration( + TAINodeLocation aiNodeLocation, TSStatus versionUpdateStatus) { + final TSStatus rollbackStatus = writeConfigPhysicalPlan(new RemoveAINodePlan(aiNodeLocation)); + final String failureMessage = + String.format( + "Failed to persist version info for AINode %d: %s", + aiNodeLocation.getAiNodeId(), describeStatus(versionUpdateStatus)); + if (isConsensusWriteSuccessful(rollbackStatus)) { + return buildStatus( + versionUpdateStatus.getCode(), + failureMessage, + "The registration has been rolled back. Please retry the registration."); + } + + LOGGER.error( + "Failed to roll back AINode registration {} after version info persistence failure. " + + "versionUpdateStatus: {}, rollbackStatus: {}", + aiNodeLocation, + versionUpdateStatus, + rollbackStatus); + return buildStatus( + rollbackStatus.getCode(), + failureMessage, + String.format("The registration rollback also failed: %s", describeStatus(rollbackStatus)), + "Manual cleanup may be required before retrying the registration."); + } + + private TSStatus rollbackConfigNodeRegistration( + TConfigNodeLocation configNodeLocation, TSStatus versionUpdateStatus) { + final TSStatus rollbackStatus = + writeConfigPhysicalPlan(new RemoveConfigNodePlan(configNodeLocation)); + final String failureMessage = + String.format( + "Failed to persist version info for ConfigNode %d: %s", + configNodeLocation.getConfigNodeId(), describeStatus(versionUpdateStatus)); + if (isConsensusWriteSuccessful(rollbackStatus)) { + return buildStatus( + versionUpdateStatus.getCode(), + failureMessage, + "The ConfigNode registration has been rolled back."); + } + + LOGGER.error( + "Failed to roll back ConfigNode registration {} after version info persistence failure. " + + "versionUpdateStatus: {}, rollbackStatus: {}", + configNodeLocation, + versionUpdateStatus, + rollbackStatus); + return buildStatus( + rollbackStatus.getCode(), + failureMessage, + String.format("The registration rollback also failed: %s", describeStatus(rollbackStatus)), + "Manual cleanup may be required before retrying the registration."); + } + + private TSStatus buildStatus(int statusCode, String... messages) { + final TSStatus status = new TSStatus(statusCode); + final StringBuilder builder = new StringBuilder(); + for (String message : messages) { + if (message == null || message.isEmpty()) { + continue; + } + if (builder.length() > 0) { + builder.append(' '); + } + builder.append(message); + } + if (builder.length() > 0) { + status.setMessage(builder.toString()); + } + return status; + } + + private TSStatus buildSuccessStatus(String... messages) { + return buildStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode(), messages); + } + + private String describeStatus(TSStatus status) { + if (status == null) { + return "unknown error"; + } + if (status.getMessage() != null && !status.getMessage().isEmpty()) { + return status.getMessage(); + } + return "status code " + status.getCode(); + } + + private void ensureConsensusWriteSuccessful(TSStatus status, String action) { + if (isConsensusWriteSuccessful(status)) { + return; + } + throw new IllegalStateException( + String.format("Failed to %s through consensus layer: %s", action, status)); + } + private ConsensusManager getConsensusManager() { return configManager.getConsensusManager(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index d2f1f80e7a3e0..f8a5ee9951af2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -244,7 +244,7 @@ public int getRegionGroupCount(TConsensusGroupType type) { result.getAndIncrement(); } }); - return result.getAndIncrement(); + return result.get(); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index af9429e59532c..f773ffa4a07ea 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -245,7 +245,9 @@ public TSStatus offerRegionMaintainTasks( */ public TSStatus pollRegionMaintainTask() { synchronized (regionMaintainTaskList) { - regionMaintainTaskList.remove(0); + if (!regionMaintainTaskList.isEmpty()) { + regionMaintainTaskList.remove(0); + } return RpcUtils.SUCCESS_STATUS; } } @@ -1010,9 +1012,14 @@ public boolean processTakeSnapshot(File snapshotDir) throws TException, IOExcept databasePartitionTableEntry.getValue().serialize(bufferedOutputStream, protocol); } + final List copiedRegionMaintainTaskList; + synchronized (regionMaintainTaskList) { + copiedRegionMaintainTaskList = new ArrayList<>(regionMaintainTaskList); + } + // serialize regionCleanList - ReadWriteIOUtils.write(regionMaintainTaskList.size(), bufferedOutputStream); - for (RegionMaintainTask task : regionMaintainTaskList) { + ReadWriteIOUtils.write(copiedRegionMaintainTaskList.size(), bufferedOutputStream); + for (RegionMaintainTask task : copiedRegionMaintainTaskList) { task.serialize(bufferedOutputStream, protocol); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java index b737bdba02cab..153122af0a61c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java @@ -81,9 +81,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, AddConfigNodeState s LOG.info(ProcedureMessages.SUCCESSFULLY_ADD_PEER, tConfigNodeLocation); break; case REGISTER_SUCCESS: - env.notifyRegisterSuccess(tConfigNodeLocation); - env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId()); env.applyConfigNode(tConfigNodeLocation, versionInfo); + env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId()); + env.notifyRegisterSuccess(tConfigNodeLocation); LOG.info( ProcedureMessages.THE_CONFIGNODE_IS_SUCCESSFULLY_ADDED_TO_THE_CLUSTER, tConfigNodeLocation); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java new file mode 100644 index 0000000000000..b6651c1ac1538 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.statemachine; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; +import org.apache.iotdb.confignode.consensus.request.TestOnlyPlan; +import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor; +import org.apache.iotdb.confignode.writelog.io.SingleFileLogReader; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.db.utils.writelog.LogWriter; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class ConfigRegionStateMachineTest { + + @Test + public void testParseStartIndex() { + Assert.assertEquals(1, ConfigRegionStateMachine.parseStartIndex("log_1_10")); + Assert.assertEquals(11, ConfigRegionStateMachine.parseStartIndex("log_11_20")); + Assert.assertEquals(21, ConfigRegionStateMachine.parseStartIndex("log_inprogress_21")); + Assert.assertEquals(0, ConfigRegionStateMachine.parseStartIndex("invalid")); + } + + @Test + public void testFileComparatorSortsByStartIndex() { + List filenames = + new ArrayList<>(Arrays.asList("log_inprogress_21", "log_11_20", "log_1_10")); + + filenames.sort(new ConfigRegionStateMachine.FileComparator()); + + Assert.assertEquals(Arrays.asList("log_1_10", "log_11_20", "log_inprogress_21"), filenames); + } + + @Test + public void testFailedSimpleConsensusWriteRollsBackPersistedPlan() throws Exception { + ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf(); + String originalConsensusProtocol = conf.getConfigNodeConsensusProtocolClass(); + Path tempLogFile = Files.createTempFile("confignode-simple-consensus", ".wal"); + ConfigPlanExecutor executor = Mockito.mock(ConfigPlanExecutor.class); + Mockito.when(executor.executeNonQueryPlan(Mockito.any(ConfigPhysicalPlan.class))) + .thenReturn(new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())); + ConfigRegionStateMachine stateMachine = new ConfigRegionStateMachine(null, executor); + + try { + conf.setConfigNodeConsensusProtocolClass(ConsensusFactory.SIMPLE_CONSENSUS); + setField(stateMachine, "simpleLogFile", tempLogFile.toFile()); + setField(stateMachine, "simpleLogWriter", new LogWriter(tempLogFile.toFile(), false)); + setField(stateMachine, "endIndex", 0); + + TSStatus status = stateMachine.write(new TestOnlyPlan()); + + Assert.assertEquals(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), status.getCode()); + Assert.assertEquals(0, Files.size(tempLogFile)); + Assert.assertEquals(0, getField(stateMachine, "endIndex")); + closeSimpleLogWriter(stateMachine); + assertNoReplayablePlans(tempLogFile); + } finally { + closeSimpleLogWriter(stateMachine); + Files.deleteIfExists(tempLogFile); + conf.setConfigNodeConsensusProtocolClass(originalConsensusProtocol); + } + } + + private static void setField(Object target, String fieldName, Object value) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } + + private static Object getField(Object target, String fieldName) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return field.get(target); + } + + private static void assertNoReplayablePlans(Path tempLogFile) throws Exception { + SingleFileLogReader logReader = new SingleFileLogReader(tempLogFile.toFile()); + try { + Assert.assertFalse(logReader.hasNext()); + } finally { + logReader.close(); + } + } + + private static void closeSimpleLogWriter(ConfigRegionStateMachine stateMachine) throws Exception { + LogWriter logWriter = (LogWriter) getField(stateMachine, "simpleLogWriter"); + if (logWriter != null) { + logWriter.close(); + } + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java new file mode 100644 index 0000000000000..247d745661ef8 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.node; + +import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.cluster.NodeType; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; +import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan; +import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; +import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp; +import org.apache.iotdb.confignode.manager.ClusterManager; +import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.manager.load.LoadManager; +import org.apache.iotdb.confignode.manager.load.cache.LoadCache; +import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager; +import org.apache.iotdb.confignode.persistence.node.NodeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp; +import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo; +import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class NodeManagerTest { + + private IManager configManager; + private ConsensusManager consensusManager; + private LoadManager loadManager; + private LoadCache loadCache; + private ClusterSchemaManager clusterSchemaManager; + private ClusterManager clusterManager; + private NodeInfo nodeInfo; + private NodeManager nodeManager; + + @Before + public void setUp() { + configManager = Mockito.mock(IManager.class); + consensusManager = Mockito.mock(ConsensusManager.class); + loadManager = Mockito.mock(LoadManager.class); + loadCache = Mockito.mock(LoadCache.class); + clusterSchemaManager = Mockito.mock(ClusterSchemaManager.class); + clusterManager = Mockito.mock(ClusterManager.class); + nodeInfo = new NodeInfo(); + nodeManager = new NodeManager(configManager, nodeInfo); + + when(configManager.getConsensusManager()).thenReturn(consensusManager); + when(configManager.getLoadManager()).thenReturn(loadManager); + when(loadManager.getLoadCache()).thenReturn(loadCache); + when(configManager.getClusterSchemaManager()).thenReturn(clusterSchemaManager); + when(configManager.getClusterManager()).thenReturn(clusterManager); + } + + @Test + public void testRegisterDataNodeStopsWhenRegisterWriteFails() throws ConsensusException { + TSStatus failureStatus = + new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()).setMessage("redirect"); + when(consensusManager.write(any())).thenReturn(failureStatus); + + DataNodeRegisterResp resp = + (DataNodeRegisterResp) nodeManager.registerDataNode(generateDataNodeRegisterReq(1)); + + Assert.assertEquals(failureStatus, resp.getStatus()); + verify(loadCache, never()).createNodeHeartbeatCache(eq(NodeType.DataNode), anyInt()); + verify(clusterSchemaManager, never()).adjustMaxRegionGroupNum(); + } + + @Test + public void testRegisterDataNodeRollsBackWhenVersionWriteFails() throws ConsensusException { + TSStatus successStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + TSStatus failureStatus = + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage("update failed"); + when(consensusManager.write(any())).thenReturn(successStatus, failureStatus, successStatus); + + DataNodeRegisterResp resp = + (DataNodeRegisterResp) nodeManager.registerDataNode(generateDataNodeRegisterReq(1)); + + Assert.assertEquals( + TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), resp.getStatus().getCode()); + Assert.assertTrue(resp.getStatus().getMessage().contains("rolled back")); + verify(loadCache, never()).createNodeHeartbeatCache(eq(NodeType.DataNode), anyInt()); + verify(clusterSchemaManager, never()).adjustMaxRegionGroupNum(); + + ArgumentCaptor planCaptor = + ArgumentCaptor.forClass(ConfigPhysicalPlan.class); + verify(consensusManager, Mockito.times(3)).write(planCaptor.capture()); + Assert.assertTrue(planCaptor.getAllValues().get(0) instanceof RegisterDataNodePlan); + Assert.assertTrue(planCaptor.getAllValues().get(1) instanceof UpdateVersionInfoPlan); + Assert.assertTrue(planCaptor.getAllValues().get(2) instanceof RemoveDataNodePlan); + } + + @Test + public void testRestartDataNodeReturnsFailureWhenUpdateWriteFails() throws ConsensusException { + final TDataNodeConfiguration registeredConfig = generateDataNodeConfiguration(1, "127.0.0.1"); + nodeInfo.registerDataNode(new RegisterDataNodePlan(registeredConfig)); + when(clusterManager.getClusterIdWithRetry(anyLong())).thenReturn("cluster"); + + TSStatus failureStatus = + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage("update failed"); + when(consensusManager.write(any())).thenReturn(failureStatus); + + final TDataNodeRestartReq req = new TDataNodeRestartReq(); + req.setDataNodeConfiguration(generateDataNodeConfiguration(1, "127.0.0.2")); + req.setVersionInfo(new TNodeVersionInfo("version", "build")); + + final TDataNodeRestartResp resp = nodeManager.updateDataNodeIfNecessary(req); + + Assert.assertEquals(failureStatus, resp.getStatus()); + } + + @Test + public void testApplyConfigNodeRollsBackWhenVersionWriteFails() throws ConsensusException { + TSStatus successStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + TSStatus failureStatus = + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage("apply failed"); + when(consensusManager.write(any())).thenReturn(successStatus, failureStatus, successStatus); + + try { + nodeManager.applyConfigNode( + new TConfigNodeLocation( + 1, new TEndPoint("127.0.0.1", 10710), new TEndPoint("127.0.0.1", 10720)), + new TNodeVersionInfo("version", "build")); + Assert.fail("Expected applyConfigNode to fail fast"); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("rolled back")); + } + + ArgumentCaptor planCaptor = + ArgumentCaptor.forClass(ConfigPhysicalPlan.class); + verify(consensusManager, Mockito.times(3)).write(planCaptor.capture()); + Assert.assertTrue(planCaptor.getAllValues().get(0) instanceof ApplyConfigNodePlan); + Assert.assertTrue(planCaptor.getAllValues().get(1) instanceof UpdateVersionInfoPlan); + Assert.assertTrue(planCaptor.getAllValues().get(2) instanceof RemoveConfigNodePlan); + } + + private TDataNodeRegisterReq generateDataNodeRegisterReq(int dataNodeId) { + final TDataNodeRegisterReq req = new TDataNodeRegisterReq(); + req.setDataNodeConfiguration(generateDataNodeConfiguration(dataNodeId, "127.0.0.1")); + req.setVersionInfo(new TNodeVersionInfo("version", "build")); + return req; + } + + private TDataNodeConfiguration generateDataNodeConfiguration(int dataNodeId, String ip) { + final TDataNodeLocation dataNodeLocation = new TDataNodeLocation(); + dataNodeLocation.setDataNodeId(dataNodeId); + dataNodeLocation.setClientRpcEndPoint(new TEndPoint(ip, 6667 + dataNodeId)); + dataNodeLocation.setInternalEndPoint(new TEndPoint(ip, 10730 + dataNodeId)); + dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint(ip, 10740 + dataNodeId)); + dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint(ip, 10760 + dataNodeId)); + dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint(ip, 10750 + dataNodeId)); + + final TDataNodeConfiguration dataNodeConfiguration = new TDataNodeConfiguration(); + dataNodeConfiguration.setLocation(dataNodeLocation); + return dataNodeConfiguration; + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java index e3ced5b069428..5bebe0bf5ad32 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java @@ -26,8 +26,11 @@ import org.apache.iotdb.common.rpc.thrift.TNodeResource; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan; import org.apache.iotdb.confignode.persistence.node.NodeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; import org.apache.tsfile.external.commons.io.FileUtils; @@ -73,6 +76,49 @@ public void testSnapshot() throws TException, IOException { Assert.assertEquals(nodeInfo, nodeInfo1); } + @Test + public void testRegistrationPlansAreIdempotentForWalReplay() { + NodeInfo replayNodeInfo = new NodeInfo(); + + TDataNodeConfiguration dataNodeConfiguration = generateTDataNodeConfiguration(100); + RegisterDataNodePlan registerDataNodePlan = new RegisterDataNodePlan(dataNodeConfiguration); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + replayNodeInfo.registerDataNode(registerDataNodePlan).getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + replayNodeInfo.registerDataNode(registerDataNodePlan).getCode()); + Assert.assertEquals(1, replayNodeInfo.getRegisteredDataNodeCount()); + Assert.assertEquals( + dataNodeConfiguration, + replayNodeInfo.getRegisteredDataNode(dataNodeConfiguration.getLocation().getDataNodeId())); + + TConfigNodeLocation configNodeLocation = + new TConfigNodeLocation( + 20000, new TEndPoint("127.0.0.1", 22200), new TEndPoint("127.0.0.1", 22300)); + ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + replayNodeInfo.applyConfigNode(applyConfigNodePlan).getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + replayNodeInfo.applyConfigNode(applyConfigNodePlan).getCode()); + Assert.assertEquals(1, replayNodeInfo.getRegisteredConfigNodes().size()); + Assert.assertEquals(configNodeLocation, replayNodeInfo.getRegisteredConfigNodes().get(0)); + + TNodeVersionInfo versionInfo = new TNodeVersionInfo("version", "build"); + UpdateVersionInfoPlan updateVersionInfoPlan = + new UpdateVersionInfoPlan(versionInfo, configNodeLocation.getConfigNodeId()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + replayNodeInfo.updateVersionInfo(updateVersionInfoPlan).getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + replayNodeInfo.updateVersionInfo(updateVersionInfoPlan).getCode()); + Assert.assertEquals( + versionInfo, replayNodeInfo.getVersionInfo(configNodeLocation.getConfigNodeId())); + } + private void registerConfigNodes() { for (int i = 0; i < 3; i++) { ApplyConfigNodePlan applyConfigNodePlan = diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java index afccb0c0eba12..eccddcaa8d107 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java @@ -37,6 +37,7 @@ import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; import org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListResp; +import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; @@ -267,6 +268,34 @@ public void testShowRegion() { }); } + @Test + public void testRegionGroupCount() throws DatabaseNotExistsException { + partitionInfo.createDatabase( + new DatabaseSchemaPlan( + ConfigPhysicalPlanType.CreateDatabase, new TDatabaseSchema("root.region_count"))); + + CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan(); + createRegionGroupsPlan.addRegionGroup( + "root.region_count", + generateTRegionReplicaSet( + testFlag.SchemaPartition.getFlag(), + generateTConsensusGroupId( + testFlag.SchemaPartition.getFlag(), TConsensusGroupType.SchemaRegion))); + createRegionGroupsPlan.addRegionGroup( + "root.region_count", + generateTRegionReplicaSet( + testFlag.DataPartition.getFlag(), + generateTConsensusGroupId( + testFlag.DataPartition.getFlag(), TConsensusGroupType.DataRegion))); + partitionInfo.createRegionGroups(createRegionGroupsPlan); + + Assert.assertEquals( + 1, + partitionInfo.getRegionGroupCount("root.region_count", TConsensusGroupType.SchemaRegion)); + Assert.assertEquals( + 1, partitionInfo.getRegionGroupCount("root.region_count", TConsensusGroupType.DataRegion)); + } + private TRegionReplicaSet generateTRegionReplicaSet( int startFlag, TConsensusGroupId tConsensusGroupId) { TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();