Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,8 @@ public void testInformationSchema() throws SQLException {
"pipe_sink,STRING,ATTRIBUTE,",
"exception_message,STRING,ATTRIBUTE,",
"remaining_event_count,INT64,ATTRIBUTE,",
"estimated_remaining_seconds,DOUBLE,ATTRIBUTE,")));
"estimated_remaining_seconds,DOUBLE,ATTRIBUTE,",
"is_degraded,BOOLEAN,ATTRIBUTE,")));
TestUtils.assertResultSetEqual(
statement.executeQuery("desc pipe_plugins"),
"ColumnName,DataType,Category,",
Expand Down Expand Up @@ -673,7 +674,7 @@ public void testInformationSchema() throws SQLException {
// Filter out not self-created pipes
TestUtils.assertResultSetEqual(
statement.executeQuery("select * from pipes"),
"id,creation_time,state,pipe_source,pipe_processor,pipe_sink,exception_message,remaining_event_count,estimated_remaining_seconds,",
"id,creation_time,state,pipe_source,pipe_processor,pipe_sink,exception_message,remaining_event_count,estimated_remaining_seconds,is_degraded,",
Collections.emptySet());

// No auth needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ private void cachePipeHeartbeat(TDataNodeHeartbeatResp heartbeatResp) {
heartbeatResp.getPipeMetaList(),
heartbeatResp.getPipeCompletedList(),
heartbeatResp.getPipeRemainingEventCountList(),
heartbeatResp.getPipeRemainingTimeList());
heartbeatResp.getPipeRemainingTimeList(),
heartbeatResp.getPipeDegradedStatusList());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ public TShowPipeResp convertToTShowPipeResp() {
canCalculateOnLocal ? -1 : temporaryMeta.getGlobalRemainingEvents());
showPipeInfo.setEstimatedRemainingTime(
canCalculateOnLocal ? -1 : temporaryMeta.getGlobalRemainingTime());
final Boolean isDegraded = temporaryMeta.getGlobalDegraded();
if (Objects.nonNull(isDegraded)) {
showPipeInfo.setIsDegraded(isDegraded);
}
showPipeInfoList.add(showPipeInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3178,7 +3178,8 @@ public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp res
resp.getPipeMetaList(),
resp.getPipeCompletedList(),
resp.getPipeRemainingEventCountList(),
resp.getPipeRemainingTimeList());
resp.getPipeRemainingTimeList(),
resp.getPipeDegradedStatusList());
return StatusUtils.OK;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
Expand Down Expand Up @@ -219,6 +220,7 @@ protected void collectPipeMetaListInternal(
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
final List<Long> pipeRemainingEventCountList = new ArrayList<>();
final List<Double> pipeRemainingTimeList = new ArrayList<>();
final List<Integer> pipeDegradedStatusList = new ArrayList<>();
try {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
Expand All @@ -233,6 +235,7 @@ protected void collectPipeMetaListInternal(

pipeRemainingEventCountList.add(remainingEventCount);
pipeRemainingTimeList.add(estimatedRemainingTime);
pipeDegradedStatusList.add(PipeTemporaryMeta.TS_FILE_EPOCH_DEGRADED_STATUS_UNKNOWN);

logger.ifPresent(
l ->
Expand All @@ -249,6 +252,7 @@ protected void collectPipeMetaListInternal(
resp.setPipeMetaList(pipeMetaBinaryList);
resp.setPipeRemainingEventCountList(pipeRemainingEventCountList);
resp.setPipeRemainingTimeList(pipeRemainingTimeList);
resp.setPipeDegradedStatusList(pipeDegradedStatusList);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,15 @@ public void parseHeartbeat(
final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent,
/* @Nullable */ final List<Integer> pipeDegradedStatusListFromAgent) {
pipeHeartbeatScheduler.parseHeartbeat(
dataNodeId,
new PipeHeartbeat(
pipeMetaByteBufferListFromDataNode,
pipeCompletedListFromAgent,
pipeRemainingEventCountListFromAgent,
pipeRemainingTimeListFromAgent));
pipeRemainingTimeListFromAgent,
pipeDegradedStatusListFromAgent));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;

import java.nio.ByteBuffer;
import java.util.HashMap;
Expand All @@ -33,12 +34,14 @@ public class PipeHeartbeat {
private final Map<PipeStaticMeta, Boolean> isCompletedMap = new HashMap<>();
private final Map<PipeStaticMeta, Long> remainingEventCountMap = new HashMap<>();
private final Map<PipeStaticMeta, Double> remainingTimeMap = new HashMap<>();
private final Map<PipeStaticMeta, Boolean> isDegradedMap = new HashMap<>();

public PipeHeartbeat(
final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent,
/* @Nullable */ final List<Integer> pipeDegradedStatusListFromAgent) {
// Shall not reach here, just in case
if (Objects.isNull(pipeMetaByteBufferListFromAgent)) {
return;
Expand All @@ -49,20 +52,31 @@ public PipeHeartbeat(
pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta);
isCompletedMap.put(
pipeMeta.getStaticMeta(),
Objects.nonNull(pipeCompletedListFromAgent) && pipeCompletedListFromAgent.get(i));
Objects.nonNull(pipeCompletedListFromAgent)
&& i < pipeCompletedListFromAgent.size()
&& pipeCompletedListFromAgent.get(i));
// If remaining event count & remaining time can not be got, it implies that the heartbeat is
// from an ancient version of DataNode. Here we guarantee that "0" will not affect both of
// the final results and namely these dataNodes are omitted in calculation.
remainingEventCountMap.put(
pipeMeta.getStaticMeta(),
Objects.nonNull(pipeRemainingEventCountListFromAgent)
&& i < pipeRemainingEventCountListFromAgent.size()
? pipeRemainingEventCountListFromAgent.get(i)
: 0L);
remainingTimeMap.put(
pipeMeta.getStaticMeta(),
Objects.nonNull(pipeRemainingTimeListFromAgent)
&& i < pipeRemainingTimeListFromAgent.size()
? pipeRemainingTimeListFromAgent.get(i)
: 0d);
isDegradedMap.put(
pipeMeta.getStaticMeta(),
PipeTemporaryMeta.decodeTsFileEpochDegradedStatus(
Objects.nonNull(pipeDegradedStatusListFromAgent)
&& i < pipeDegradedStatusListFromAgent.size()
? pipeDegradedStatusListFromAgent.get(i)
: null));
}
}

Expand All @@ -86,6 +100,10 @@ public Double getRemainingTime(final PipeStaticMeta pipeStaticMeta) {
return remainingTimeMap.get(pipeStaticMeta);
}

public Boolean getDegraded(final PipeStaticMeta pipeStaticMeta) {
return isDegradedMap.get(pipeStaticMeta);
}

public boolean isEmpty() {
return pipeMetaMap.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
// Record statistics
temporaryMeta.setRemainingEvent(nodeId, pipeHeartbeat.getRemainingEventCount(staticMeta));
temporaryMeta.setRemainingTime(nodeId, pipeHeartbeat.getRemainingTime(staticMeta));
temporaryMeta.setDegraded(nodeId, pipeHeartbeat.getDegraded(staticMeta));

final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromCoordinator =
pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ private synchronized void heartbeat() {
resp.getPipeMetaList(),
resp.getPipeCompletedList(),
resp.getPipeRemainingEventCountList(),
resp.getPipeRemainingTimeList())));
resp.getPipeRemainingTimeList(),
resp.getPipeDegradedStatusList())));

// config node heartbeat
try {
Expand All @@ -127,7 +128,8 @@ private synchronized void heartbeat() {
configNodeResp.getPipeMetaList(),
null,
configNodeResp.getPipeRemainingEventCountList(),
configNodeResp.getPipeRemainingTimeList()));
configNodeResp.getPipeRemainingTimeList(),
configNodeResp.getPipeDegradedStatusList()));
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn, e, ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -40,6 +48,26 @@

public class PipeTableRespTest {

private ConfigNode oldInstance;

@Before
public void setUp() {
oldInstance = ConfigNode.getInstance();
final ConfigNode configNode = Mockito.mock(ConfigNode.class);
final ConfigManager configManager = Mockito.mock(ConfigManager.class);
final NodeManager nodeManager = Mockito.mock(NodeManager.class);

Mockito.when(configNode.getConfigManager()).thenReturn(configManager);
Mockito.when(configManager.getNodeManager()).thenReturn(nodeManager);
Mockito.when(nodeManager.getRegisteredDataNodeCount()).thenReturn(2);
ConfigNode.setInstance(configNode);
}

@After
public void tearDown() {
ConfigNode.setInstance(oldInstance);
}

public PipeTableResp constructPipeTableResp() {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
List<PipeMeta> pipeMetaList = new ArrayList<>();
Expand Down Expand Up @@ -120,6 +148,26 @@ public void testFilter() {
Assert.assertEquals(3, allPipeTableResp.getAllPipeMeta().size());
}

@Test
public void testConvertToTShowPipeRespIncludesDegradedStatus() {
final PipeTableResp pipeTableResp = constructPipeTableResp();
((PipeTemporaryMetaInCoordinator) pipeTableResp.getAllPipeMeta().get(0).getTemporaryMeta())
.setDegraded(1, true);
((PipeTemporaryMetaInCoordinator) pipeTableResp.getAllPipeMeta().get(1).getTemporaryMeta())
.setDegraded(1, false);

final List<TShowPipeInfo> showPipeResult =
pipeTableResp.convertToTShowPipeResp().getPipeInfoList();

Assert.assertEquals(3, showPipeResult.size());
Assert.assertEquals("testPipe", showPipeResult.get(0).getId());
Assert.assertTrue(showPipeResult.get(0).isSetIsDegraded());
Assert.assertTrue(showPipeResult.get(0).isIsDegraded());
Assert.assertTrue(showPipeResult.get(1).isSetIsDegraded());
Assert.assertFalse(showPipeResult.get(1).isIsDegraded());
Assert.assertFalse(showPipeResult.get(2).isSetIsDegraded());
}

@Test
public void testFilterByModelBeforeWhereClause() {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
Expand Down
Loading
Loading