Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ public TShowPipeResp convertToTShowPipeResp() {
runtimeMeta.getNodeId2PipeRuntimeExceptionMap().entrySet()) {
final Integer nodeId = entry.getKey();
final PipeRuntimeException e = entry.getValue();
if (e.getTimeStamp() <= runtimeMeta.getExceptionsClearTime()) {
continue;
}
final String exceptionMessage =
DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + e.getMessage();

Expand All @@ -134,6 +137,9 @@ public TShowPipeResp convertToTShowPipeResp() {
runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) {
final Integer regionId = entry.getKey();
for (final PipeRuntimeException e : entry.getValue().getExceptionMessages()) {
if (e.getTimeStamp() <= runtimeMeta.getExceptionsClearTime()) {
continue;
}
final String exceptionMessage =
DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + e.getMessage();
pipeExceptionMessage2RegionIdsMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,24 +241,21 @@ private void parseHeartbeatAndSaveMetaChangeLocally(

// Update runtime exception
final PipeTaskMeta pipeTaskMetaFromCoordinator = runtimeMetaFromCoordinator.getValue();
final PipeRuntimeMeta pipeRuntimeMeta = pipeMetaFromCoordinator.getRuntimeMeta();
pipeTaskMetaFromCoordinator.clearExceptionMessages();
for (final PipeRuntimeException exception : runtimeMetaFromAgent.getExceptionMessages()) {

// Do not judge the exception's clear time to avoid the restart process
// being ended after the failure of some pipe
if (exception.getTimeStamp() <= pipeRuntimeMeta.getExceptionsClearTime()) {
needPushPipeMetaToDataNodes.set(true);
continue;
}

pipeTaskMetaFromCoordinator.trackExceptionMessage(exception);

if (exception instanceof PipeRuntimeCriticalException) {
final String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName();
if (!pipeMetaFromCoordinator
.getRuntimeMeta()
.getStatus()
.get()
.equals(PipeStatus.STOPPED)) {
PipeRuntimeMeta runtimeMeta = pipeMetaFromCoordinator.getRuntimeMeta();
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
runtimeMeta.setIsStoppedByRuntimeException(true);
if (!pipeRuntimeMeta.getStatus().get().equals(PipeStatus.STOPPED)) {
pipeRuntimeMeta.getStatus().set(PipeStatus.STOPPED);
pipeRuntimeMeta.setIsStoppedByRuntimeException(true);

needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,24 +722,42 @@ private boolean isStoppedByRuntimeExceptionInternal(final String pipeName) {
public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(final String pipeName) {
acquireWriteLock();
try {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(pipeName);
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
pipeName, System.currentTimeMillis());
} finally {
releaseWriteLock();
}
}

public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(
final String pipeName, final long exceptionsClearTime) {
acquireWriteLock();
try {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
pipeName, exceptionsClearTime);
} finally {
releaseWriteLock();
}
}

private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
final String pipeName) {
if (!pipeMetaKeeper.containsPipeMeta(pipeName)) {
final String pipeName, final long exceptionsClearTime) {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
pipeMetaKeeper.getPipeMeta(pipeName), exceptionsClearTime);
}

private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
final PipeMeta pipeMeta, final long exceptionsClearTime) {
if (pipeMeta == null) {
return;
}

final PipeRuntimeMeta runtimeMeta = pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta();
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();

// To avoid unnecessary retries, we set the isStoppedByRuntimeException flag to false
runtimeMeta.setIsStoppedByRuntimeException(false);

runtimeMeta.setExceptionsClearTime(System.currentTimeMillis());
runtimeMeta.setExceptionsClearTime(exceptionsClearTime);

final Map<Integer, PipeRuntimeException> exceptionMap =
runtimeMeta.getNodeId2PipeRuntimeExceptionMap();
Expand Down Expand Up @@ -863,14 +881,17 @@ public boolean autoRestart() {
*/
private boolean autoRestartInternal() {
final AtomicBoolean needRestart = new AtomicBoolean(false);
final long exceptionsClearTime = System.currentTimeMillis();
final List<String> pipeToRestart = new LinkedList<>();

pipeMetaKeeper
.getPipeMetaList()
.forEach(
pipeMeta -> {
if (pipeMeta.getRuntimeMeta().getIsStoppedByRuntimeException()) {
pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
if (runtimeMeta.getIsStoppedByRuntimeException()) {
runtimeMeta.setExceptionsClearTime(exceptionsClearTime);
runtimeMeta.getStatus().set(PipeStatus.RUNNING);

needRestart.set(true);
pipeToRestart.add(pipeMeta.getStaticMeta().getPipeName());
Expand Down Expand Up @@ -901,9 +922,11 @@ private void handleSuccessfulRestartInternal() {
.getPipeMetaList()
.forEach(
pipeMeta -> {
if (pipeMeta.getRuntimeMeta().getStatus().get().equals(PipeStatus.RUNNING)) {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(
pipeMeta.getStaticMeta().getPipeName());
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
if (runtimeMeta.getStatus().get().equals(PipeStatus.RUNNING)
&& runtimeMeta.getIsStoppedByRuntimeException()) {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
pipeMeta, runtimeMeta.getExceptionsClearTime());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
Expand All @@ -37,6 +39,8 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
Expand Down Expand Up @@ -99,6 +103,9 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) thro
public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})", pipeName);

final long exceptionsClearTime = System.currentTimeMillis();
final boolean isStoppedByRuntimeException =
pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
final String exceptionMessage =
parsePushPipeMetaExceptionForPipe(pipeName, pushSinglePipeMetaToDataNodes(pipeName, env));
if (!exceptionMessage.isEmpty()) {
Expand All @@ -111,7 +118,35 @@ public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOE

// Clear exceptions and set isStoppedByRuntimeException to false if the pipe is
// started successfully on all data nodes
pipeTaskInfo.get().clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName);
pipeTaskInfo
.get()
.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, exceptionsClearTime);

if (isStoppedByRuntimeException) {
writePipeMetaChangesToConfigNodeConsensus(env);
}
}

private void writePipeMetaChangesToConfigNodeConsensus(final ConfigNodeProcedureEnv env) {
final List<PipeMeta> pipeMetaList = new ArrayList<>();
for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
pipeMetaList.add(pipeMeta);
}

TSStatus response;
try {
response =
env.getConfigManager()
.getConsensusManager()
.write(new PipeHandleMetaChangePlan(pipeMetaList));
} catch (ConsensusException e) {
LOGGER.warn("Failed in the write API executing the consensus layer due to: ", e);
response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
}
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(response.getMessage());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@
package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;

import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
Expand All @@ -29,13 +37,18 @@
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -49,6 +62,8 @@

public class PipeHeartbeatParserTest {

private static final int DATA_NODE_ID = 1;

private boolean originalSeparatedPipeHeartbeatEnabled;

@Before
Expand Down Expand Up @@ -117,7 +132,88 @@ public void testParseHeartbeatKeepsPendingFlagsWhenProcedureSubmissionFails() th
verify(context.procedureManager, times(2)).pipeHandleMetaChange(true, false);
}

@Test
public void testParseHeartbeatIgnoresExceptionsBeforeClearTime() throws Exception {
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);

final String pipeName = "staleExceptionPipe";
final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo();
createPipe(pipeTaskInfo, pipeName, PipeStatus.RUNNING);

final PipeMeta pipeMeta = pipeTaskInfo.getPipeMetaByPipeName(pipeName);
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
final PipeTaskMeta coordinatorTaskMeta =
runtimeMeta.getConsensusGroupId2TaskMetaMap().get(DATA_NODE_ID);
coordinatorTaskMeta.trackExceptionMessage(
new PipeRuntimeCriticalException("stale failure", 100L));

pipeTaskInfo.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 200L);

final PipeTaskMeta agentTaskMeta =
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
agentTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("stale failure", 100L));
final ConcurrentMap<Integer, PipeTaskMeta> agentPipeTasks = new ConcurrentHashMap<>();
agentPipeTasks.put(DATA_NODE_ID, agentTaskMeta);
final PipeHeartbeat heartbeat =
new PipeHeartbeat(
Collections.singletonList(
new PipeMeta(pipeMeta.getStaticMeta(), new PipeRuntimeMeta(agentPipeTasks))
.serialize()),
Collections.singletonList(false),
Collections.singletonList(0L),
Collections.singletonList(0D));

final ParserTestContext context = createParserTestContext(1, pipeTaskInfo);
context.parser.parseHeartbeat(DATA_NODE_ID, heartbeat);

Assert.assertFalse(coordinatorTaskMeta.hasExceptionMessages());
Assert.assertEquals(PipeStatus.RUNNING, runtimeMeta.getStatus().get());
verify(context.procedureManager, times(1)).pipeHandleMetaChange(false, true);
}

@Test
public void testParseHeartbeatTracksExceptionsAfterClearTime() throws Exception {
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);

final String pipeName = "freshExceptionPipe";
final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo();
createPipe(pipeTaskInfo, pipeName, PipeStatus.RUNNING);

final PipeMeta pipeMeta = pipeTaskInfo.getPipeMetaByPipeName(pipeName);
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
final PipeTaskMeta coordinatorTaskMeta =
runtimeMeta.getConsensusGroupId2TaskMetaMap().get(DATA_NODE_ID);
pipeTaskInfo.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 200L);

final PipeTaskMeta agentTaskMeta =
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
agentTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("fresh failure", 300L));
final ConcurrentMap<Integer, PipeTaskMeta> agentPipeTasks = new ConcurrentHashMap<>();
agentPipeTasks.put(DATA_NODE_ID, agentTaskMeta);
final PipeHeartbeat heartbeat =
new PipeHeartbeat(
Collections.singletonList(
new PipeMeta(pipeMeta.getStaticMeta(), new PipeRuntimeMeta(agentPipeTasks))
.serialize()),
Collections.singletonList(false),
Collections.singletonList(0L),
Collections.singletonList(0D));

final ParserTestContext context = createParserTestContext(1, pipeTaskInfo);
context.parser.parseHeartbeat(DATA_NODE_ID, heartbeat);

Assert.assertTrue(coordinatorTaskMeta.hasExceptionMessages());
Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get());
Assert.assertTrue(runtimeMeta.getIsStoppedByRuntimeException());
verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, false);
}

private ParserTestContext createParserTestContext(final int registeredDataNodeCount) {
return createParserTestContext(registeredDataNodeCount, new PipeTaskInfo());
}

private ParserTestContext createParserTestContext(
final int registeredDataNodeCount, final PipeTaskInfo pipeTaskInfo) {
final ConfigManager configManager = Mockito.mock(ConfigManager.class);
final NodeManager nodeManager = Mockito.mock(NodeManager.class);
final ProcedureManager procedureManager = Mockito.mock(ProcedureManager.class);
Expand All @@ -134,7 +230,7 @@ private ParserTestContext createParserTestContext(final int registeredDataNodeCo
when(pipeManager.getPipeRuntimeCoordinator()).thenReturn(pipeRuntimeCoordinator);
when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator);
when(pipeRuntimeCoordinator.getProcedureSubmitter()).thenReturn(procedureSubmitter);
when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(new PipeTaskInfo()));
when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(pipeTaskInfo));
when(procedureManager.pipeHandleMetaChange(anyBoolean(), anyBoolean())).thenReturn(true);
Mockito.doAnswer(
invocation -> {
Expand Down Expand Up @@ -165,6 +261,37 @@ private void setAtomicBooleanField(
((AtomicBoolean) field.get(parser)).set(value);
}

private void createPipe(
final PipeTaskInfo pipeTaskInfo, final String pipeName, final PipeStatus initialStatus) {
final Map<String, String> extractorAttributes = new HashMap<>();
extractorAttributes.put("extractor", "iotdb-source");
final Map<String, String> processorAttributes = new HashMap<>();
processorAttributes.put("processor", "do-nothing-processor");
final Map<String, String> connectorAttributes = new HashMap<>();
connectorAttributes.put("connector", "iotdb-thrift-sink");

final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(DATA_NODE_ID, pipeTaskMeta);
final PipeStaticMeta pipeStaticMeta =
new PipeStaticMeta(
pipeName,
System.currentTimeMillis(),
extractorAttributes,
processorAttributes,
connectorAttributes);
final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta));

if (PipeStatus.RUNNING.equals(initialStatus)) {
pipeTaskInfo
.getPipeMetaByPipeName(pipeName)
.getRuntimeMeta()
.getStatus()
.set(PipeStatus.RUNNING);
}
}

private PipeHeartbeat emptyHeartbeat() {
return new PipeHeartbeat(Collections.emptyList(), null, null, null);
}
Expand Down
Loading
Loading