From 034c207b78e1785a1beb617e586adcf47f68108d Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 3 Jul 2026 10:52:26 +0800 Subject: [PATCH] Fix stale pipe runtime error messages (#18061) (cherry picked from commit ef5f4ca64416829c711e801888c5ce111c181a4e) --- .../response/pipe/task/PipeTableResp.java | 6 + .../heartbeat/PipeHeartbeatParser.java | 19 ++- .../persistence/pipe/PipeTaskInfo.java | 43 ++++-- .../impl/pipe/task/StartPipeProcedureV2.java | 37 ++++- .../heartbeat/PipeHeartbeatParserTest.java | 129 +++++++++++++++++- .../pipe/PipeTaskInfoAutoRestartTest.java | 28 ++++ .../pipe/agent/task/PipeTaskAgent.java | 13 ++ .../pipe/agent/task/meta/PipeRuntimeMeta.java | 9 ++ .../pipe/agent/task/meta/PipeTaskMeta.java | 4 + .../commons/pipe/task/PipeMetaDeSerTest.java | 26 ++++ 10 files changed, 291 insertions(+), 23 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java index 35ffd6a7abdf6..f7a76b6174cc4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java @@ -122,6 +122,9 @@ public TShowPipeResp convertToTShowPipeResp() { runtimeMeta.getNodeId2PipeRuntimeExceptionMap().entrySet()) { final Integer nodeId = entry.getKey(); final PipeRuntimeException e = entry.getValue(); + if (e.getTimeStamp() <= runtimeMeta.getExceptionsClearTime()) { + continue; + } final String exceptionMessage = DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + e.getMessage(); @@ -134,6 +137,9 @@ public TShowPipeResp convertToTShowPipeResp() { runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) { final Integer regionId = entry.getKey(); for (final PipeRuntimeException e : entry.getValue().getExceptionMessages()) { + if (e.getTimeStamp() <= runtimeMeta.getExceptionsClearTime()) { + continue; + } final String exceptionMessage = DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + e.getMessage(); pipeExceptionMessage2RegionIdsMap diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java index 8cafa2094c446..60ca3b8580aa9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java @@ -241,24 +241,21 @@ private void parseHeartbeatAndSaveMetaChangeLocally( // Update runtime exception final PipeTaskMeta pipeTaskMetaFromCoordinator = runtimeMetaFromCoordinator.getValue(); + final PipeRuntimeMeta pipeRuntimeMeta = pipeMetaFromCoordinator.getRuntimeMeta(); pipeTaskMetaFromCoordinator.clearExceptionMessages(); for (final PipeRuntimeException exception : runtimeMetaFromAgent.getExceptionMessages()) { - - // Do not judge the exception's clear time to avoid the restart process - // being ended after the failure of some pipe + if (exception.getTimeStamp() <= pipeRuntimeMeta.getExceptionsClearTime()) { + needPushPipeMetaToDataNodes.set(true); + continue; + } pipeTaskMetaFromCoordinator.trackExceptionMessage(exception); if (exception instanceof PipeRuntimeCriticalException) { final String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName(); - if (!pipeMetaFromCoordinator - .getRuntimeMeta() - .getStatus() - .get() - .equals(PipeStatus.STOPPED)) { - PipeRuntimeMeta runtimeMeta = pipeMetaFromCoordinator.getRuntimeMeta(); - runtimeMeta.getStatus().set(PipeStatus.STOPPED); - runtimeMeta.setIsStoppedByRuntimeException(true); + if (!pipeRuntimeMeta.getStatus().get().equals(PipeStatus.STOPPED)) { + pipeRuntimeMeta.getStatus().set(PipeStatus.STOPPED); + pipeRuntimeMeta.setIsStoppedByRuntimeException(true); needWriteConsensusOnConfigNodes.set(true); needPushPipeMetaToDataNodes.set(false); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 022144f1a9159..01b043d57a2f8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -722,24 +722,42 @@ private boolean isStoppedByRuntimeExceptionInternal(final String pipeName) { public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(final String pipeName) { acquireWriteLock(); try { - clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(pipeName); + clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + pipeName, System.currentTimeMillis()); + } finally { + releaseWriteLock(); + } + } + + public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse( + final String pipeName, final long exceptionsClearTime) { + acquireWriteLock(); + try { + clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + pipeName, exceptionsClearTime); } finally { releaseWriteLock(); } } private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( - final String pipeName) { - if (!pipeMetaKeeper.containsPipeMeta(pipeName)) { + final String pipeName, final long exceptionsClearTime) { + clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + pipeMetaKeeper.getPipeMeta(pipeName), exceptionsClearTime); + } + + private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + final PipeMeta pipeMeta, final long exceptionsClearTime) { + if (pipeMeta == null) { return; } - final PipeRuntimeMeta runtimeMeta = pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta(); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); // To avoid unnecessary retries, we set the isStoppedByRuntimeException flag to false runtimeMeta.setIsStoppedByRuntimeException(false); - runtimeMeta.setExceptionsClearTime(System.currentTimeMillis()); + runtimeMeta.setExceptionsClearTime(exceptionsClearTime); final Map exceptionMap = runtimeMeta.getNodeId2PipeRuntimeExceptionMap(); @@ -863,14 +881,17 @@ public boolean autoRestart() { */ private boolean autoRestartInternal() { final AtomicBoolean needRestart = new AtomicBoolean(false); + final long exceptionsClearTime = System.currentTimeMillis(); final List pipeToRestart = new LinkedList<>(); pipeMetaKeeper .getPipeMetaList() .forEach( pipeMeta -> { - if (pipeMeta.getRuntimeMeta().getIsStoppedByRuntimeException()) { - pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + if (runtimeMeta.getIsStoppedByRuntimeException()) { + runtimeMeta.setExceptionsClearTime(exceptionsClearTime); + runtimeMeta.getStatus().set(PipeStatus.RUNNING); needRestart.set(true); pipeToRestart.add(pipeMeta.getStaticMeta().getPipeName()); @@ -901,9 +922,11 @@ private void handleSuccessfulRestartInternal() { .getPipeMetaList() .forEach( pipeMeta -> { - if (pipeMeta.getRuntimeMeta().getStatus().get().equals(PipeStatus.RUNNING)) { - clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse( - pipeMeta.getStaticMeta().getPipeName()); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + if (runtimeMeta.getStatus().get().equals(PipeStatus.RUNNING) + && runtimeMeta.getIsStoppedByRuntimeException()) { + clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + pipeMeta, runtimeMeta.getExceptionsClearTime()); } }); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java index 29cb2b51ab4e5..962aa341fc750 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java @@ -20,7 +20,9 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; +import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2; @@ -37,6 +39,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { @@ -99,6 +103,9 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) thro public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException { LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})", pipeName); + final long exceptionsClearTime = System.currentTimeMillis(); + final boolean isStoppedByRuntimeException = + pipeTaskInfo.get().isStoppedByRuntimeException(pipeName); final String exceptionMessage = parsePushPipeMetaExceptionForPipe(pipeName, pushSinglePipeMetaToDataNodes(pipeName, env)); if (!exceptionMessage.isEmpty()) { @@ -111,7 +118,35 @@ public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOE // Clear exceptions and set isStoppedByRuntimeException to false if the pipe is // started successfully on all data nodes - pipeTaskInfo.get().clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName); + pipeTaskInfo + .get() + .clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, exceptionsClearTime); + + if (isStoppedByRuntimeException) { + writePipeMetaChangesToConfigNodeConsensus(env); + } + } + + private void writePipeMetaChangesToConfigNodeConsensus(final ConfigNodeProcedureEnv env) { + final List pipeMetaList = new ArrayList<>(); + for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) { + pipeMetaList.add(pipeMeta); + } + + TSStatus response; + try { + response = + env.getConfigManager() + .getConsensusManager() + .write(new PipeHandleMetaChangePlan(pipeMetaList)); + } catch (ConsensusException e) { + LOGGER.warn("Failed in the write API executing the consensus layer due to: ", e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); + } } @Override diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java index d5a46d42c84a4..0c68881584e43 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java @@ -20,6 +20,14 @@ package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.ProcedureManager; import org.apache.iotdb.confignode.manager.node.NodeManager; @@ -29,13 +37,18 @@ import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import java.lang.reflect.Field; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -49,6 +62,8 @@ public class PipeHeartbeatParserTest { + private static final int DATA_NODE_ID = 1; + private boolean originalSeparatedPipeHeartbeatEnabled; @Before @@ -117,7 +132,88 @@ public void testParseHeartbeatKeepsPendingFlagsWhenProcedureSubmissionFails() th verify(context.procedureManager, times(2)).pipeHandleMetaChange(true, false); } + @Test + public void testParseHeartbeatIgnoresExceptionsBeforeClearTime() throws Exception { + CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false); + + final String pipeName = "staleExceptionPipe"; + final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo(); + createPipe(pipeTaskInfo, pipeName, PipeStatus.RUNNING); + + final PipeMeta pipeMeta = pipeTaskInfo.getPipeMetaByPipeName(pipeName); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + final PipeTaskMeta coordinatorTaskMeta = + runtimeMeta.getConsensusGroupId2TaskMetaMap().get(DATA_NODE_ID); + coordinatorTaskMeta.trackExceptionMessage( + new PipeRuntimeCriticalException("stale failure", 100L)); + + pipeTaskInfo.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 200L); + + final PipeTaskMeta agentTaskMeta = + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID); + agentTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("stale failure", 100L)); + final ConcurrentMap agentPipeTasks = new ConcurrentHashMap<>(); + agentPipeTasks.put(DATA_NODE_ID, agentTaskMeta); + final PipeHeartbeat heartbeat = + new PipeHeartbeat( + Collections.singletonList( + new PipeMeta(pipeMeta.getStaticMeta(), new PipeRuntimeMeta(agentPipeTasks)) + .serialize()), + Collections.singletonList(false), + Collections.singletonList(0L), + Collections.singletonList(0D)); + + final ParserTestContext context = createParserTestContext(1, pipeTaskInfo); + context.parser.parseHeartbeat(DATA_NODE_ID, heartbeat); + + Assert.assertFalse(coordinatorTaskMeta.hasExceptionMessages()); + Assert.assertEquals(PipeStatus.RUNNING, runtimeMeta.getStatus().get()); + verify(context.procedureManager, times(1)).pipeHandleMetaChange(false, true); + } + + @Test + public void testParseHeartbeatTracksExceptionsAfterClearTime() throws Exception { + CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false); + + final String pipeName = "freshExceptionPipe"; + final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo(); + createPipe(pipeTaskInfo, pipeName, PipeStatus.RUNNING); + + final PipeMeta pipeMeta = pipeTaskInfo.getPipeMetaByPipeName(pipeName); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + final PipeTaskMeta coordinatorTaskMeta = + runtimeMeta.getConsensusGroupId2TaskMetaMap().get(DATA_NODE_ID); + pipeTaskInfo.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 200L); + + final PipeTaskMeta agentTaskMeta = + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID); + agentTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("fresh failure", 300L)); + final ConcurrentMap agentPipeTasks = new ConcurrentHashMap<>(); + agentPipeTasks.put(DATA_NODE_ID, agentTaskMeta); + final PipeHeartbeat heartbeat = + new PipeHeartbeat( + Collections.singletonList( + new PipeMeta(pipeMeta.getStaticMeta(), new PipeRuntimeMeta(agentPipeTasks)) + .serialize()), + Collections.singletonList(false), + Collections.singletonList(0L), + Collections.singletonList(0D)); + + final ParserTestContext context = createParserTestContext(1, pipeTaskInfo); + context.parser.parseHeartbeat(DATA_NODE_ID, heartbeat); + + Assert.assertTrue(coordinatorTaskMeta.hasExceptionMessages()); + Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get()); + Assert.assertTrue(runtimeMeta.getIsStoppedByRuntimeException()); + verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, false); + } + private ParserTestContext createParserTestContext(final int registeredDataNodeCount) { + return createParserTestContext(registeredDataNodeCount, new PipeTaskInfo()); + } + + private ParserTestContext createParserTestContext( + final int registeredDataNodeCount, final PipeTaskInfo pipeTaskInfo) { final ConfigManager configManager = Mockito.mock(ConfigManager.class); final NodeManager nodeManager = Mockito.mock(NodeManager.class); final ProcedureManager procedureManager = Mockito.mock(ProcedureManager.class); @@ -134,7 +230,7 @@ private ParserTestContext createParserTestContext(final int registeredDataNodeCo when(pipeManager.getPipeRuntimeCoordinator()).thenReturn(pipeRuntimeCoordinator); when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator); when(pipeRuntimeCoordinator.getProcedureSubmitter()).thenReturn(procedureSubmitter); - when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(new PipeTaskInfo())); + when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(pipeTaskInfo)); when(procedureManager.pipeHandleMetaChange(anyBoolean(), anyBoolean())).thenReturn(true); Mockito.doAnswer( invocation -> { @@ -165,6 +261,37 @@ private void setAtomicBooleanField( ((AtomicBoolean) field.get(parser)).set(value); } + private void createPipe( + final PipeTaskInfo pipeTaskInfo, final String pipeName, final PipeStatus initialStatus) { + final Map extractorAttributes = new HashMap<>(); + extractorAttributes.put("extractor", "iotdb-source"); + final Map processorAttributes = new HashMap<>(); + processorAttributes.put("processor", "do-nothing-processor"); + final Map connectorAttributes = new HashMap<>(); + connectorAttributes.put("connector", "iotdb-thrift-sink"); + + final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID); + final ConcurrentMap pipeTasks = new ConcurrentHashMap<>(); + pipeTasks.put(DATA_NODE_ID, pipeTaskMeta); + final PipeStaticMeta pipeStaticMeta = + new PipeStaticMeta( + pipeName, + System.currentTimeMillis(), + extractorAttributes, + processorAttributes, + connectorAttributes); + final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks); + pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta)); + + if (PipeStatus.RUNNING.equals(initialStatus)) { + pipeTaskInfo + .getPipeMetaByPipeName(pipeName) + .getRuntimeMeta() + .getStatus() + .set(PipeStatus.RUNNING); + } + } + private PipeHeartbeat emptyHeartbeat() { return new PipeHeartbeat(Collections.emptyList(), null, null, null); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java index 762c3bf045c5b..0947637812da9 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java @@ -88,6 +88,34 @@ public void testRecordDataNodePushPipeMetaExceptionsKeepsUserStoppedPipeOutOfAut Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get()); } + @Test + public void testHandleSuccessfulRestartClearsRuntimeExceptionMessages() { + final String pipeName = "restartPipe"; + createPipe(pipeName, PipeStatus.RUNNING); + + Assert.assertTrue( + pipeTaskInfo.recordDataNodePushPipeMetaExceptions(createErrorRespMap(pipeName))); + + final PipeRuntimeMeta runtimeMeta = + pipeTaskInfo.getPipeMetaByPipeName(pipeName).getRuntimeMeta(); + Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get()); + Assert.assertTrue(runtimeMeta.getIsStoppedByRuntimeException()); + Assert.assertFalse(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().isEmpty()); + + Assert.assertTrue(pipeTaskInfo.autoRestart()); + final long exceptionsClearTime = runtimeMeta.getExceptionsClearTime(); + Assert.assertTrue( + runtimeMeta.getNodeId2PipeRuntimeExceptionMap().values().stream() + .allMatch(exception -> exception.getTimeStamp() <= exceptionsClearTime)); + + pipeTaskInfo.handleSuccessfulRestart(); + + Assert.assertEquals(PipeStatus.RUNNING, runtimeMeta.getStatus().get()); + Assert.assertFalse(runtimeMeta.getIsStoppedByRuntimeException()); + Assert.assertTrue(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().isEmpty()); + Assert.assertEquals(exceptionsClearTime, runtimeMeta.getExceptionsClearTime()); + } + private Map createErrorRespMap(final String pipeName) { final TPushPipeMetaRespExceptionMessage exceptionMessage = new TPushPipeMetaRespExceptionMessage( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index fcff317b3f337..66ed052ea6fef 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -302,6 +302,8 @@ private void executeSinglePipeRuntimeMetaChanges( } } + syncRuntimeExceptionClearTime(runtimeMetaFromCoordinator, runtimeMetaInAgent); + // 2. Handle pipe runtime meta status changes final PipeStatus statusFromCoordinator = runtimeMetaFromCoordinator.getStatus().get(); final PipeStatus statusInAgent = runtimeMetaInAgent.getStatus().get(); @@ -342,6 +344,12 @@ private void executeSinglePipeRuntimeMetaChanges( } } + private void syncRuntimeExceptionClearTime( + final PipeRuntimeMeta runtimeMetaFromCoordinator, final PipeRuntimeMeta runtimeMetaInAgent) { + runtimeMetaInAgent.setExceptionsClearTime(runtimeMetaFromCoordinator.getExceptionsClearTime()); + runtimeMetaInAgent.clearExceptionMessagesBefore(runtimeMetaInAgent.getExceptionsClearTime()); + } + protected abstract void thawRate(final String pipeName, final long creationTime); protected abstract void freezeRate(final String pipeName, final long creationTime); @@ -544,6 +552,11 @@ private boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws Illega pipeMetaKeeper.addPipeMeta(pipeName, pipeMetaFromCoordinator); + pipeMetaFromCoordinator + .getRuntimeMeta() + .clearExceptionMessagesBefore( + pipeMetaFromCoordinator.getRuntimeMeta().getExceptionsClearTime()); + // If the pipe status from coordinator is RUNNING, we will start the pipe later. return needToStartPipe; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java index 4724c619f9683..0e52297edffa5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java @@ -128,6 +128,15 @@ public void setExceptionsClearTime(long exceptionsClearTime) { } } + public void clearExceptionMessagesBefore(final long exceptionsClearTime) { + nodeId2PipeRuntimeExceptionMap + .entrySet() + .removeIf(entry -> entry.getValue().getTimeStamp() <= exceptionsClearTime); + consensusGroupId2TaskMetaMap + .values() + .forEach(pipeTaskMeta -> pipeTaskMeta.clearExceptionMessagesBefore(exceptionsClearTime)); + } + public boolean getIsStoppedByRuntimeException() { return isStoppedByRuntimeException.get(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java index 9584ca8cbabdf..e9939d7b2c6f5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java @@ -134,6 +134,10 @@ public synchronized void clearExceptionMessages() { exceptionMessages.clear(); } + public synchronized void clearExceptionMessagesBefore(final long exceptionsClearTime) { + exceptionMessages.removeIf(exception -> exception.getTimeStamp() <= exceptionsClearTime); + } + public synchronized void serialize(final OutputStream outputStream) throws IOException { progressIndex.get().serialize(outputStream); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java index 1e29c96e0905e..fa0ac2c1a37eb 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java @@ -151,4 +151,30 @@ public void test() throws IOException { final PipeMeta pipeMeta1 = PipeMeta.deserialize4Coordinator(byteBuffer); Assert.assertEquals(pipeMeta, pipeMeta1); } + + @Test + public void testClearExceptionMessagesBeforeClearTime() { + final PipeTaskMeta staleTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); + staleTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("stale", 100L)); + final PipeTaskMeta freshTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); + freshTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("fresh", 300L)); + + final ConcurrentHashMap taskMetaMap = new ConcurrentHashMap<>(); + taskMetaMap.put(1, staleTaskMeta); + taskMetaMap.put(2, freshTaskMeta); + final PipeRuntimeMeta runtimeMeta = new PipeRuntimeMeta(taskMetaMap); + runtimeMeta + .getNodeId2PipeRuntimeExceptionMap() + .put(1, new PipeRuntimeCriticalException("stale node", 100L)); + runtimeMeta + .getNodeId2PipeRuntimeExceptionMap() + .put(2, new PipeRuntimeCriticalException("fresh node", 300L)); + + runtimeMeta.clearExceptionMessagesBefore(200L); + + Assert.assertFalse(staleTaskMeta.hasExceptionMessages()); + Assert.assertTrue(freshTaskMeta.hasExceptionMessages()); + Assert.assertFalse(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().containsKey(1)); + Assert.assertTrue(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().containsKey(2)); + } }