diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java index 31cd0241f69a3..22483ab43a68d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java @@ -551,7 +551,8 @@ public void testInformationSchema() throws SQLException { "pipe_sink,STRING,ATTRIBUTE,", "exception_message,STRING,ATTRIBUTE,", "remaining_event_count,INT64,ATTRIBUTE,", - "estimated_remaining_seconds,DOUBLE,ATTRIBUTE,"))); + "estimated_remaining_seconds,DOUBLE,ATTRIBUTE,", + "is_degraded,BOOLEAN,ATTRIBUTE,"))); TestUtils.assertResultSetEqual( statement.executeQuery("desc pipe_plugins"), "ColumnName,DataType,Category,", @@ -673,7 +674,7 @@ public void testInformationSchema() throws SQLException { // Filter out not self-created pipes TestUtils.assertResultSetEqual( statement.executeQuery("select * from pipes"), - "id,creation_time,state,pipe_source,pipe_processor,pipe_sink,exception_message,remaining_event_count,estimated_remaining_seconds,", + "id,creation_time,state,pipe_source,pipe_processor,pipe_sink,exception_message,remaining_event_count,estimated_remaining_seconds,is_degraded,", Collections.emptySet()); // No auth needed diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index 52053cdab4564..3114d60ca3a59 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -184,7 +184,8 @@ private void cachePipeHeartbeat(TDataNodeHeartbeatResp heartbeatResp) { heartbeatResp.getPipeMetaList(), heartbeatResp.getPipeCompletedList(), heartbeatResp.getPipeRemainingEventCountList(), - heartbeatResp.getPipeRemainingTimeList()); + heartbeatResp.getPipeRemainingTimeList(), + heartbeatResp.getPipeDegradedStatusList()); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java index 6a63ca506903f..b33bb7d4d6d9e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java @@ -283,6 +283,10 @@ public TShowPipeResp convertToTShowPipeResp() { canCalculateOnLocal ? -1 : temporaryMeta.getGlobalRemainingEvents()); showPipeInfo.setEstimatedRemainingTime( canCalculateOnLocal ? -1 : temporaryMeta.getGlobalRemainingTime()); + final Boolean isDegraded = temporaryMeta.getGlobalDegraded(); + if (Objects.nonNull(isDegraded)) { + showPipeInfo.setIsDegraded(isDegraded); + } showPipeInfoList.add(showPipeInfo); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index da10bb2228378..e9647551a3051 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -3178,7 +3178,8 @@ public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp res resp.getPipeMetaList(), resp.getPipeCompletedList(), resp.getPipeRemainingEventCountList(), - resp.getPipeRemainingTimeList()); + resp.getPipeRemainingTimeList(), + resp.getPipeDegradedStatusList()); return StatusUtils.OK; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java index 8703c90670932..f23cde99d014c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.i18n.ManagerMessages; @@ -219,6 +220,7 @@ protected void collectPipeMetaListInternal( final List pipeMetaBinaryList = new ArrayList<>(); final List pipeRemainingEventCountList = new ArrayList<>(); final List pipeRemainingTimeList = new ArrayList<>(); + final List pipeDegradedStatusList = new ArrayList<>(); try { for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); @@ -233,6 +235,7 @@ protected void collectPipeMetaListInternal( pipeRemainingEventCountList.add(remainingEventCount); pipeRemainingTimeList.add(estimatedRemainingTime); + pipeDegradedStatusList.add(PipeTemporaryMeta.TS_FILE_EPOCH_DEGRADED_STATUS_UNKNOWN); logger.ifPresent( l -> @@ -249,6 +252,7 @@ protected void collectPipeMetaListInternal( resp.setPipeMetaList(pipeMetaBinaryList); resp.setPipeRemainingEventCountList(pipeRemainingEventCountList); resp.setPipeRemainingTimeList(pipeRemainingTimeList); + resp.setPipeDegradedStatusList(pipeDegradedStatusList); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java index cfbc35a446c64..d9a578c379ec0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java @@ -95,13 +95,15 @@ public void parseHeartbeat( final List pipeMetaByteBufferListFromDataNode, /* @Nullable */ final List pipeCompletedListFromAgent, /* @Nullable */ final List pipeRemainingEventCountListFromAgent, - /* @Nullable */ final List pipeRemainingTimeListFromAgent) { + /* @Nullable */ final List pipeRemainingTimeListFromAgent, + /* @Nullable */ final List pipeDegradedStatusListFromAgent) { pipeHeartbeatScheduler.parseHeartbeat( dataNodeId, new PipeHeartbeat( pipeMetaByteBufferListFromDataNode, pipeCompletedListFromAgent, pipeRemainingEventCountListFromAgent, - pipeRemainingTimeListFromAgent)); + pipeRemainingTimeListFromAgent, + pipeDegradedStatusListFromAgent)); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java index e7e9d2cd97dec..31e5020bf8913 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta; import java.nio.ByteBuffer; import java.util.HashMap; @@ -33,12 +34,14 @@ public class PipeHeartbeat { private final Map isCompletedMap = new HashMap<>(); private final Map remainingEventCountMap = new HashMap<>(); private final Map remainingTimeMap = new HashMap<>(); + private final Map isDegradedMap = new HashMap<>(); public PipeHeartbeat( final List pipeMetaByteBufferListFromAgent, /* @Nullable */ final List pipeCompletedListFromAgent, /* @Nullable */ final List pipeRemainingEventCountListFromAgent, - /* @Nullable */ final List pipeRemainingTimeListFromAgent) { + /* @Nullable */ final List pipeRemainingTimeListFromAgent, + /* @Nullable */ final List pipeDegradedStatusListFromAgent) { // Shall not reach here, just in case if (Objects.isNull(pipeMetaByteBufferListFromAgent)) { return; @@ -49,20 +52,31 @@ public PipeHeartbeat( pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta); isCompletedMap.put( pipeMeta.getStaticMeta(), - Objects.nonNull(pipeCompletedListFromAgent) && pipeCompletedListFromAgent.get(i)); + Objects.nonNull(pipeCompletedListFromAgent) + && i < pipeCompletedListFromAgent.size() + && pipeCompletedListFromAgent.get(i)); // If remaining event count & remaining time can not be got, it implies that the heartbeat is // from an ancient version of DataNode. Here we guarantee that "0" will not affect both of // the final results and namely these dataNodes are omitted in calculation. remainingEventCountMap.put( pipeMeta.getStaticMeta(), Objects.nonNull(pipeRemainingEventCountListFromAgent) + && i < pipeRemainingEventCountListFromAgent.size() ? pipeRemainingEventCountListFromAgent.get(i) : 0L); remainingTimeMap.put( pipeMeta.getStaticMeta(), Objects.nonNull(pipeRemainingTimeListFromAgent) + && i < pipeRemainingTimeListFromAgent.size() ? pipeRemainingTimeListFromAgent.get(i) : 0d); + isDegradedMap.put( + pipeMeta.getStaticMeta(), + PipeTemporaryMeta.decodeTsFileEpochDegradedStatus( + Objects.nonNull(pipeDegradedStatusListFromAgent) + && i < pipeDegradedStatusListFromAgent.size() + ? pipeDegradedStatusListFromAgent.get(i) + : null)); } } @@ -86,6 +100,10 @@ public Double getRemainingTime(final PipeStaticMeta pipeStaticMeta) { return remainingTimeMap.get(pipeStaticMeta); } + public Boolean getDegraded(final PipeStaticMeta pipeStaticMeta) { + return isDegradedMap.get(pipeStaticMeta); + } + public boolean isEmpty() { return pipeMetaMap.isEmpty(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java index 31f4119caaaef..2a28567149352 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java @@ -191,6 +191,7 @@ private void parseHeartbeatAndSaveMetaChangeLocally( // Record statistics temporaryMeta.setRemainingEvent(nodeId, pipeHeartbeat.getRemainingEventCount(staticMeta)); temporaryMeta.setRemainingTime(nodeId, pipeHeartbeat.getRemainingTime(staticMeta)); + temporaryMeta.setDegraded(nodeId, pipeHeartbeat.getDegraded(staticMeta)); final Map pipeTaskMetaMapFromCoordinator = pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java index b9276880cea56..00368c0472a4a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java @@ -115,7 +115,8 @@ private synchronized void heartbeat() { resp.getPipeMetaList(), resp.getPipeCompletedList(), resp.getPipeRemainingEventCountList(), - resp.getPipeRemainingTimeList()))); + resp.getPipeRemainingTimeList(), + resp.getPipeDegradedStatusList()))); // config node heartbeat try { @@ -127,7 +128,8 @@ private synchronized void heartbeat() { configNodeResp.getPipeMetaList(), null, configNodeResp.getPipeRemainingEventCountList(), - configNodeResp.getPipeRemainingTimeList())); + configNodeResp.getPipeRemainingTimeList(), + configNodeResp.getPipeDegradedStatusList())); } catch (final Exception e) { PipeLogger.log( LOGGER::warn, e, ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java index f6a75d26ee4a8..093e5989bb0b9 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java @@ -24,12 +24,20 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; +import org.apache.iotdb.confignode.service.ConfigNode; import org.apache.iotdb.rpc.TSStatusCode; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.HashMap; @@ -40,6 +48,26 @@ public class PipeTableRespTest { + private ConfigNode oldInstance; + + @Before + public void setUp() { + oldInstance = ConfigNode.getInstance(); + final ConfigNode configNode = Mockito.mock(ConfigNode.class); + final ConfigManager configManager = Mockito.mock(ConfigManager.class); + final NodeManager nodeManager = Mockito.mock(NodeManager.class); + + Mockito.when(configNode.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getNodeManager()).thenReturn(nodeManager); + Mockito.when(nodeManager.getRegisteredDataNodeCount()).thenReturn(2); + ConfigNode.setInstance(configNode); + } + + @After + public void tearDown() { + ConfigNode.setInstance(oldInstance); + } + public PipeTableResp constructPipeTableResp() { TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); List pipeMetaList = new ArrayList<>(); @@ -120,6 +148,26 @@ public void testFilter() { Assert.assertEquals(3, allPipeTableResp.getAllPipeMeta().size()); } + @Test + public void testConvertToTShowPipeRespIncludesDegradedStatus() { + final PipeTableResp pipeTableResp = constructPipeTableResp(); + ((PipeTemporaryMetaInCoordinator) pipeTableResp.getAllPipeMeta().get(0).getTemporaryMeta()) + .setDegraded(1, true); + ((PipeTemporaryMetaInCoordinator) pipeTableResp.getAllPipeMeta().get(1).getTemporaryMeta()) + .setDegraded(1, false); + + final List showPipeResult = + pipeTableResp.convertToTShowPipeResp().getPipeInfoList(); + + Assert.assertEquals(3, showPipeResult.size()); + Assert.assertEquals("testPipe", showPipeResult.get(0).getId()); + Assert.assertTrue(showPipeResult.get(0).isSetIsDegraded()); + Assert.assertTrue(showPipeResult.get(0).isIsDegraded()); + Assert.assertTrue(showPipeResult.get(1).isSetIsDegraded()); + Assert.assertFalse(showPipeResult.get(1).isIsDegraded()); + Assert.assertFalse(showPipeResult.get(2).isSetIsDegraded()); + } + @Test public void testFilterByModelBeforeWhereClause() { TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java index d5a46d42c84a4..1021d60768b5e 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java @@ -20,6 +20,14 @@ package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator; +import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.ProcedureManager; import org.apache.iotdb.confignode.manager.node.NodeManager; @@ -35,11 +43,14 @@ import java.lang.reflect.Field; import java.util.Collections; +import java.util.HashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.never; @@ -117,7 +128,79 @@ public void testParseHeartbeatKeepsPendingFlagsWhenProcedureSubmissionFails() th verify(context.procedureManager, times(2)).pipeHandleMetaChange(true, false); } + @Test + public void testParseHeartbeatRecordsPipeDegradedStatus() throws Exception { + CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false); + + final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo(); + final PipeMeta pipeMeta = createPipeMeta(); + pipeTaskInfo.createPipe( + new CreatePipePlanV2(pipeMeta.getStaticMeta(), pipeMeta.getRuntimeMeta())); + + final ParserTestContext context = createParserTestContext(1, pipeTaskInfo); + context.parser.parseHeartbeat( + 1, + new PipeHeartbeat( + Collections.singletonList(pipeMeta.serialize()), + Collections.singletonList(false), + Collections.singletonList(0L), + Collections.singletonList(0d), + Collections.singletonList(PipeTemporaryMeta.TS_FILE_EPOCH_DEGRADED_STATUS_TRUE))); + + assertEquals(Boolean.TRUE, getTemporaryMeta(pipeTaskInfo).getGlobalDegraded()); + verify(context.procedureManager, never()).pipeHandleMetaChange(anyBoolean(), anyBoolean()); + } + + @Test + public void testParseHeartbeatAggregatesPipeDegradedStatusFromAllDataNodes() throws Exception { + CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false); + + final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo(); + final PipeMeta pipeMeta = createPipeMeta(); + pipeTaskInfo.createPipe( + new CreatePipePlanV2(pipeMeta.getStaticMeta(), pipeMeta.getRuntimeMeta())); + + final ParserTestContext context = createParserTestContext(2, pipeTaskInfo); + context.parser.parseHeartbeat(1, createPipeHeartbeat(pipeMeta, true)); + assertEquals(Boolean.TRUE, getTemporaryMeta(pipeTaskInfo).getGlobalDegraded()); + + context.parser.parseHeartbeat(2, createPipeHeartbeat(pipeMeta, false)); + assertEquals(Boolean.TRUE, getTemporaryMeta(pipeTaskInfo).getGlobalDegraded()); + + context.parser.parseHeartbeat(1, createPipeHeartbeat(pipeMeta, false)); + assertEquals(Boolean.FALSE, getTemporaryMeta(pipeTaskInfo).getGlobalDegraded()); + verify(context.procedureManager, never()).pipeHandleMetaChange(anyBoolean(), anyBoolean()); + } + + @Test + public void testParseHeartbeatTreatsMissingPipeDegradedStatusAsUnknown() throws Exception { + CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false); + + final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo(); + final PipeMeta pipeMeta = createPipeMeta(); + pipeTaskInfo.createPipe( + new CreatePipePlanV2(pipeMeta.getStaticMeta(), pipeMeta.getRuntimeMeta())); + + final ParserTestContext context = createParserTestContext(1, pipeTaskInfo); + context.parser.parseHeartbeat( + 1, + new PipeHeartbeat( + Collections.singletonList(pipeMeta.serialize()), + Collections.singletonList(false), + Collections.singletonList(0L), + Collections.singletonList(0d), + null)); + + assertNull(getTemporaryMeta(pipeTaskInfo).getGlobalDegraded()); + verify(context.procedureManager, never()).pipeHandleMetaChange(anyBoolean(), anyBoolean()); + } + private ParserTestContext createParserTestContext(final int registeredDataNodeCount) { + return createParserTestContext(registeredDataNodeCount, new PipeTaskInfo()); + } + + private ParserTestContext createParserTestContext( + final int registeredDataNodeCount, final PipeTaskInfo pipeTaskInfo) { final ConfigManager configManager = Mockito.mock(ConfigManager.class); final NodeManager nodeManager = Mockito.mock(NodeManager.class); final ProcedureManager procedureManager = Mockito.mock(ProcedureManager.class); @@ -134,7 +217,7 @@ private ParserTestContext createParserTestContext(final int registeredDataNodeCo when(pipeManager.getPipeRuntimeCoordinator()).thenReturn(pipeRuntimeCoordinator); when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator); when(pipeRuntimeCoordinator.getProcedureSubmitter()).thenReturn(procedureSubmitter); - when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(new PipeTaskInfo())); + when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(pipeTaskInfo)); when(procedureManager.pipeHandleMetaChange(anyBoolean(), anyBoolean())).thenReturn(true); Mockito.doAnswer( invocation -> { @@ -147,6 +230,21 @@ private ParserTestContext createParserTestContext(final int registeredDataNodeCo return new ParserTestContext(new PipeHeartbeatParser(configManager), procedureManager); } + private PipeHeartbeat createPipeHeartbeat(final PipeMeta pipeMeta, final boolean isDegraded) + throws Exception { + return new PipeHeartbeat( + Collections.singletonList(pipeMeta.serialize()), + Collections.singletonList(false), + Collections.singletonList(0L), + Collections.singletonList(0d), + Collections.singletonList(PipeTemporaryMeta.encodeTsFileEpochDegradedStatus(isDegraded))); + } + + private PipeTemporaryMetaInCoordinator getTemporaryMeta(final PipeTaskInfo pipeTaskInfo) { + return (PipeTemporaryMetaInCoordinator) + pipeTaskInfo.getPipeMetaByPipeName("test_pipe").getTemporaryMeta(); + } + private void setMetaChangeFlags( final PipeHeartbeatParser parser, final boolean needWriteConsensusOnConfigNodes, @@ -165,8 +263,18 @@ private void setAtomicBooleanField( ((AtomicBoolean) field.get(parser)).set(value); } + private PipeMeta createPipeMeta() { + final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(); + pipeRuntimeMeta + .getConsensusGroupId2TaskMetaMap() + .put(1, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)); + return new PipeMeta( + new PipeStaticMeta("test_pipe", 1L, new HashMap<>(), new HashMap<>(), new HashMap<>()), + pipeRuntimeMeta); + } + private PipeHeartbeat emptyHeartbeat() { - return new PipeHeartbeat(Collections.emptyList(), null, null, null); + return new PipeHeartbeat(Collections.emptyList(), null, null, null, null); } private static class ParserTestContext { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 8a66830fe1c35..dfd3ccf4f0391 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -39,6 +39,8 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInAgent; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; @@ -413,6 +415,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro final List pipeCompletedList = new ArrayList<>(); final List pipeRemainingEventCountList = new ArrayList<>(); final List pipeRemainingTimeList = new ArrayList<>(); + final List pipeDegradedStatusList = new ArrayList<>(); try { for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); @@ -448,6 +451,10 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro pipeCompletedList.add(isCompleted); pipeRemainingEventCountList.add(remainingEventAndTime.getLeft()); pipeRemainingTimeList.add(remainingEventAndTime.getRight()); + pipeDegradedStatusList.add( + PipeTemporaryMeta.encodeTsFileEpochDegradedStatus( + ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) + .getGlobalTsFileEpochDegraded())); logger.ifPresent( l -> @@ -467,6 +474,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro resp.setPipeCompletedList(pipeCompletedList); resp.setPipeRemainingEventCountList(pipeRemainingEventCountList); resp.setPipeRemainingTimeList(pipeRemainingTimeList); + resp.setPipeDegradedStatusList(pipeDegradedStatusList); PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true); } @@ -497,6 +505,7 @@ protected void collectPipeMetaListInternal( final List pipeCompletedList = new ArrayList<>(); final List pipeRemainingEventCountList = new ArrayList<>(); final List pipeRemainingTimeList = new ArrayList<>(); + final List pipeDegradedStatusList = new ArrayList<>(); try { for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); @@ -523,6 +532,10 @@ protected void collectPipeMetaListInternal( pipeCompletedList.add(isCompleted); pipeRemainingEventCountList.add(remainingEventAndTime.getLeft()); pipeRemainingTimeList.add(remainingEventAndTime.getRight()); + pipeDegradedStatusList.add( + PipeTemporaryMeta.encodeTsFileEpochDegradedStatus( + ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) + .getGlobalTsFileEpochDegraded())); logger.ifPresent( l -> @@ -542,6 +555,7 @@ protected void collectPipeMetaListInternal( resp.setPipeCompletedList(pipeCompletedList); resp.setPipeRemainingEventCountList(pipeRemainingEventCountList); resp.setPipeRemainingTimeList(pipeRemainingTimeList); + resp.setPipeDegradedStatusList(pipeDegradedStatusList); PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java index e3c0a032d888e..13263c1a5fa3c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java @@ -40,13 +40,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; public class PipeRealtimeDataRegionHybridSource extends PipeRealtimeDataRegionSource { private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionHybridSource.class); + private final Set activeTsFileEpochs = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set degradedTsFileEpochs = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + @Override protected void doExtract(final PipeRealtimeEvent event) { final Event eventToExtract = event.getEvent(); @@ -79,6 +87,8 @@ public boolean isNeedListenToInsertNode() { } private void extractTabletInsertion(final PipeRealtimeEvent event) { + markTsFileEpochActive(event.getTsFileEpoch()); + TsFileEpoch.State state; if (canNotUseTabletAnymore(event)) { @@ -105,6 +115,9 @@ private void extractTabletInsertion(final PipeRealtimeEvent event) { } state = event.getTsFileEpoch().getState(this); + if (state == TsFileEpoch.State.USING_TSFILE || state == TsFileEpoch.State.USING_BOTH) { + markTsFileEpochDegraded(event.getTsFileEpoch()); + } switch (state) { case USING_TSFILE: // Ignore the tablet event. @@ -129,6 +142,8 @@ private void extractTabletInsertion(final PipeRealtimeEvent event) { } private void extractTsFileInsertion(final PipeRealtimeEvent event) { + markTsFileEpochActive(event.getTsFileEpoch()); + // Notice that, if the tsFile is partially extracted because the pipe is not opened before, the // former data won't be extracted event @@ -154,6 +169,9 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) { }); final TsFileEpoch.State state = event.getTsFileEpoch().getState(this); + if (state == TsFileEpoch.State.USING_BOTH) { + markTsFileEpochDegraded(event.getTsFileEpoch()); + } switch (state) { case USING_TABLET: // If the state is USING_TABLET, discard the event @@ -161,6 +179,7 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) { .eliminateProgressIndex( dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getFilePath()); event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(), false); + clearTsFileEpoch(event.getTsFileEpoch()); return; case EMPTY: case USING_TSFILE: @@ -175,6 +194,44 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) { } } + private void markTsFileEpochActive(final TsFileEpoch tsFileEpoch) { + activeTsFileEpochs.add(tsFileEpoch); + reportTsFileEpochDegradedStatus(); + } + + private void markTsFileEpochDegraded(final TsFileEpoch tsFileEpoch) { + activeTsFileEpochs.add(tsFileEpoch); + degradedTsFileEpochs.add(tsFileEpoch); + reportTsFileEpochDegradedStatus(); + } + + private void clearTsFileEpoch(final TsFileEpoch tsFileEpoch) { + activeTsFileEpochs.remove(tsFileEpoch); + degradedTsFileEpochs.remove(tsFileEpoch); + reportTsFileEpochDegradedStatus(); + } + + private void reportTsFileEpochDegradedStatus() { + if (activeTsFileEpochs.isEmpty()) { + PipeDataNodeAgent.task().clearPipeTsFileEpochDegraded(pipeName, creationTime, dataRegionId); + } else { + PipeDataNodeAgent.task() + .setPipeTsFileEpochDegraded( + pipeName, creationTime, dataRegionId, !degradedTsFileEpochs.isEmpty()); + } + } + + @Override + public void close() throws Exception { + try { + super.close(); + } finally { + activeTsFileEpochs.clear(); + degradedTsFileEpochs.clear(); + PipeDataNodeAgent.task().clearPipeTsFileEpochDegraded(pipeName, creationTime, dataRegionId); + } + } + // If the insertNode's memory has reached the dangerous threshold, we should not extract any // tablets. private boolean canNotUseTabletAnymore(final PipeRealtimeEvent event) { @@ -262,6 +319,7 @@ private Event supplyTabletInsertion(final PipeRealtimeEvent event) { // this event is not reliable anymore. but the data represented by this event // has been carried by the following tsfile event, so we can just discard this event. event.getTsFileEpoch().migrateState(this, s -> TsFileEpoch.State.USING_BOTH); + markTsFileEpochDegraded(event.getTsFileEpoch()); LOGGER.warn( "Discard tablet event {} because it is not reliable anymore. " + "Change the state of TsFileEpoch to USING_BOTH.", @@ -271,23 +329,27 @@ private Event supplyTabletInsertion(final PipeRealtimeEvent event) { } private Event supplyTsFileInsertion(final PipeRealtimeEvent event) { - if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName())) { - return event.getEvent(); - } else { - // If the event's reference count can not be increased, it means the data represented by - // this event is not reliable anymore. the data has been lost. we simply discard this - // event and report the exception to PipeRuntimeAgent. - final String errorMessage = - String.format( - DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST, - event.getEvent()); - LOGGER.error(errorMessage); - PipeDataNodeAgent.runtime() - .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); - PipeTsFileEpochProgressIndexKeeper.getInstance() - .eliminateProgressIndex( - dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getFilePath()); - return null; + try { + if (event.increaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName())) { + return event.getEvent(); + } else { + // If the event's reference count can not be increased, it means the data represented by + // this event is not reliable anymore. the data has been lost. we simply discard this + // event and report the exception to PipeRuntimeAgent. + final String errorMessage = + String.format( + DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST, + event.getEvent()); + LOGGER.error(errorMessage); + PipeDataNodeAgent.runtime() + .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); + PipeTsFileEpochProgressIndexKeeper.getInstance() + .eliminateProgressIndex( + dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getFilePath()); + return null; + } + } finally { + clearTsFileEpoch(event.getTsFileEpoch()); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index a501824850d13..7ca14b38284ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -704,6 +704,11 @@ protected void constructLine() { columnBuilders[7].writeLong(tPipeInfo.isSetRemainingEventCount() ? remainingEventCount : -1); columnBuilders[8].writeDouble(tPipeInfo.isSetEstimatedRemainingTime() ? remainingTime : -1); + if (tPipeInfo.isSetIsDegraded()) { + columnBuilders[9].writeBoolean(tPipeInfo.isIsDegraded()); + } else { + columnBuilders[9].appendNull(); + } resultBuilder.declarePosition(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java index d204cd048022b..6698ae88fdc0c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java @@ -130,6 +130,11 @@ public static void buildTSBlock( ? String.format("%.2f", remainingTime) : "Unknown", TSFileConfig.STRING_CHARSET)); + if (tPipeInfo.isSetIsDegraded()) { + builder.getColumnBuilder(9).writeBoolean(tPipeInfo.isIsDegraded()); + } else { + builder.getColumnBuilder(9).appendNull(); + } builder.declarePosition(); } final DatasetHeader datasetHeader = DatasetHeaderFactory.getShowPipeHeader(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java index b793e8ca6991b..1b714a800097e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java @@ -22,14 +22,23 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMetaKeeper; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInAgent; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionHybridSource; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource; @@ -55,6 +64,7 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.file.Files; import java.util.Arrays; import java.util.HashMap; @@ -72,6 +82,9 @@ public class PipeRealtimeExtractTest { private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeExtractTest.class); + private static final String TEST_PIPE_NAME = "test_degraded_status_pipe"; + private static final long TEST_PIPE_CREATION_TIME = 1L; + private static final String TEST_REFERENCE_HOLDER = PipeRealtimeExtractTest.class.getName(); private final int dataRegion1 = 1; private final int dataRegion2 = 2; @@ -87,9 +100,10 @@ public class PipeRealtimeExtractTest { private int dataNodeId; @Before - public void setUp() throws IOException { + public void setUp() throws Exception { dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0); + removeTestPipeMeta(); writeService = Executors.newFixedThreadPool(2); listenerService = Executors.newFixedThreadPool(4); tmpDir = new File(Files.createTempDirectory("pipeRealtimeExtractor").toString()); @@ -103,11 +117,12 @@ public void setUp() throws IOException { } @After - public void tearDown() { + public void tearDown() throws Exception { IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId); writeService.shutdownNow(); listenerService.shutdownNow(); FileUtils.deleteFileOrDirectory(tmpDir); + removeTestPipeMeta(); } @Test @@ -310,6 +325,59 @@ public void testListenToTsFileSkipsAssignerWithoutTsFileSource() throws Exceptio } } + @Test + public void testHybridSourceReportsTsFileEpochDegradedStatus() throws Exception { + registerTestPipeMeta(); + + try (final PipeRealtimeDataRegionHybridSource extractor = + new PipeRealtimeDataRegionHybridSource()) { + final PipeParameters parameters = + new PipeParameters( + new HashMap() { + { + put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, pattern1); + } + }); + final PipeTaskRuntimeConfiguration configuration = + new PipeTaskRuntimeConfiguration( + new PipeTaskSourceRuntimeEnvironment( + TEST_PIPE_NAME, + TEST_PIPE_CREATION_TIME, + dataRegion1, + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); + + extractor.validate(new PipeParameterValidator(parameters)); + extractor.customize(parameters, configuration); + + final TsFileResource resource = createTsFileResource(dataRegion1, "100-100-0-0.tsfile"); + final PipeRealtimeEvent tabletEvent = + PipeRealtimeEventFactory.createRealtimeEvent( + false, "root.sg", createInsertRowNode("degraded-tablet", "a"), resource); + + Assert.assertTrue(tabletEvent.increaseReferenceCount(TEST_REFERENCE_HOLDER)); + extractor.extract(tabletEvent); + Assert.assertEquals(Boolean.FALSE, getGlobalTsFileEpochDegraded()); + + tabletEvent.clearReferenceCount(TEST_REFERENCE_HOLDER); + Assert.assertNull(extractor.supply()); + Assert.assertEquals(Boolean.TRUE, getGlobalTsFileEpochDegraded()); + + final PipeRealtimeEvent tsFileEvent = + PipeRealtimeEventFactory.createRealtimeEvent(false, "root.sg", resource, false); + + Assert.assertTrue(tsFileEvent.increaseReferenceCount(TEST_REFERENCE_HOLDER)); + extractor.extract(tsFileEvent); + Assert.assertEquals(Boolean.TRUE, getGlobalTsFileEpochDegraded()); + + final Event suppliedEvent = extractor.supply(); + Assert.assertTrue(suppliedEvent instanceof TsFileInsertionEvent); + releaseSuppliedEvent(suppliedEvent); + Assert.assertNull(getGlobalTsFileEpochDegraded()); + } + + Assert.assertNull(getGlobalTsFileEpochDegraded()); + } + private Future write2DataRegion( final int writeNum, final int dataRegionId, final int startNum) { final File dataRegionDir = @@ -398,6 +466,85 @@ private Future listen( }); } + private TsFileResource createTsFileResource(final int dataRegionId, final String fileName) + throws IOException { + final File dataRegionDir = + new File(tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0"); + Assert.assertTrue(dataRegionDir.mkdirs() || dataRegionDir.isDirectory()); + + final File tsFile = new File(dataRegionDir, fileName); + Assert.assertTrue(tsFile.createNewFile()); + + final TsFileResource resource = new TsFileResource(tsFile); + resource.updateStartTime( + IDeviceID.Factory.DEFAULT_FACTORY.create( + String.join(TsFileConstant.PATH_SEPARATOR, device)), + 0); + resource.close(); + return resource; + } + + private InsertRowNode createInsertRowNode(final String planNodeId, final String measurement) + throws Exception { + return new InsertRowNode( + new PlanNodeId(planNodeId), + new PartialPath(device), + false, + new String[] {measurement}, + new TSDataType[] {TSDataType.INT32}, + 0, + new Integer[] {1}, + false); + } + + private void registerTestPipeMeta() throws Exception { + final PipeMetaKeeper pipeMetaKeeper = getPipeMetaKeeper(); + pipeMetaKeeper.acquireWriteLock(); + try { + pipeMetaKeeper.removePipeMeta(TEST_PIPE_NAME); + pipeMetaKeeper.addPipeMeta( + new PipeMeta( + new PipeStaticMeta( + TEST_PIPE_NAME, + TEST_PIPE_CREATION_TIME, + new HashMap<>(), + new HashMap<>(), + new HashMap<>()), + new PipeRuntimeMeta()) + .deepCopy4TaskAgent()); + } finally { + pipeMetaKeeper.releaseWriteLock(); + } + } + + private void removeTestPipeMeta() throws Exception { + final PipeMetaKeeper pipeMetaKeeper = getPipeMetaKeeper(); + pipeMetaKeeper.acquireWriteLock(); + try { + pipeMetaKeeper.removePipeMeta(TEST_PIPE_NAME); + } finally { + pipeMetaKeeper.releaseWriteLock(); + } + } + + private Boolean getGlobalTsFileEpochDegraded() throws Exception { + final PipeMeta pipeMeta = getPipeMetaKeeper().getPipeMeta(TEST_PIPE_NAME); + Assert.assertNotNull(pipeMeta); + return ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()).getGlobalTsFileEpochDegraded(); + } + + private PipeMetaKeeper getPipeMetaKeeper() throws Exception { + final Field pipeMetaKeeperField = PipeTaskAgent.class.getDeclaredField("pipeMetaKeeper"); + pipeMetaKeeperField.setAccessible(true); + return (PipeMetaKeeper) pipeMetaKeeperField.get(PipeDataNodeAgent.task()); + } + + private void releaseSuppliedEvent(final Event event) { + if (event instanceof EnrichedEvent) { + ((EnrichedEvent) event).clearReferenceCount(TEST_REFERENCE_HOLDER); + } + } + private static class NoTsFileRealtimeDataRegionSource extends PipeRealtimeDataRegionSource { private final AtomicInteger observedTsFileEventCount = new AtomicInteger(0); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTaskTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTaskTest.java new file mode 100644 index 0000000000000..3f89e8ee9cce7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTaskTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe; + +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.util.concurrent.SettableFuture; +import org.apache.tsfile.read.common.block.TsBlock; +import org.junit.Test; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ShowPipeTaskTest { + + @Test + public void testBuildTSBlockWritesDegradedColumn() throws Exception { + final TShowPipeInfo degradedPipe = createPipeInfo("degraded_pipe"); + degradedPipe.setIsDegraded(true); + final TShowPipeInfo normalPipe = createPipeInfo("normal_pipe"); + normalPipe.setIsDegraded(false); + final TShowPipeInfo unknownPipe = createPipeInfo("unknown_pipe"); + + final SettableFuture future = SettableFuture.create(); + ShowPipeTask.buildTSBlock(Arrays.asList(degradedPipe, normalPipe, unknownPipe), future); + + final ConfigTaskResult result = future.get(); + final TsBlock resultSet = result.getResultSet(); + + assertEquals(TSStatusCode.SUCCESS_STATUS, result.getStatusCode()); + assertEquals( + ColumnHeaderConstant.IS_DEGRADED, result.getResultSetHeader().getRespColumns().get(9)); + assertEquals(3, resultSet.getPositionCount()); + assertTrue(resultSet.getColumn(9).getBoolean(0)); + assertFalse(resultSet.getColumn(9).getBoolean(1)); + assertTrue(resultSet.getColumn(9).isNull(2)); + } + + private TShowPipeInfo createPipeInfo(final String pipeName) { + return new TShowPipeInfo( + pipeName, + 1L, + "RUNNING", + "{source=iotdb-source}", + "{processor=do-nothing-processor}", + "{sink=iotdb-thrift-sink}", + ""); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 94904092cc472..b404a2ce00be1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -1176,6 +1176,28 @@ public void decreaseFloatingMemoryUsageInByte( } } + public void setPipeTsFileEpochDegraded( + final String pipeName, + final long creationTime, + final int regionId, + final boolean isDegraded) { + final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + // To avoid stale pipe before alter + if (Objects.nonNull(pipeMeta) && pipeMeta.getStaticMeta().getCreationTime() == creationTime) { + ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) + .setTsFileEpochDegraded(regionId, isDegraded); + } + } + + public void clearPipeTsFileEpochDegraded( + final String pipeName, final long creationTime, final int regionId) { + final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + // To avoid stale pipe before alter + if (Objects.nonNull(pipeMeta) && pipeMeta.getStaticMeta().getCreationTime() == creationTime) { + ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()).clearTsFileEpochDegraded(regionId); + } + } + public int getPipeCount() { return pipeMetaKeeper.getPipeMetaCount(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java index 5363f47190b48..d839924aacb58 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java @@ -19,4 +19,31 @@ package org.apache.iotdb.commons.pipe.agent.task.meta; -public interface PipeTemporaryMeta {} +public interface PipeTemporaryMeta { + + int TS_FILE_EPOCH_DEGRADED_STATUS_UNKNOWN = -1; + int TS_FILE_EPOCH_DEGRADED_STATUS_FALSE = 0; + int TS_FILE_EPOCH_DEGRADED_STATUS_TRUE = 1; + + static int encodeTsFileEpochDegradedStatus(final Boolean isDegraded) { + if (isDegraded == null) { + return TS_FILE_EPOCH_DEGRADED_STATUS_UNKNOWN; + } + return isDegraded ? TS_FILE_EPOCH_DEGRADED_STATUS_TRUE : TS_FILE_EPOCH_DEGRADED_STATUS_FALSE; + } + + static Boolean decodeTsFileEpochDegradedStatus(final Integer status) { + if (status == null) { + return null; + } + switch (status) { + case TS_FILE_EPOCH_DEGRADED_STATUS_FALSE: + return false; + case TS_FILE_EPOCH_DEGRADED_STATUS_TRUE: + return true; + case TS_FILE_EPOCH_DEGRADED_STATUS_UNKNOWN: + default: + return null; + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java index 23914fa8d8444..c28bb51236d8c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java @@ -24,12 +24,15 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; public class PipeTemporaryMetaInAgent implements PipeTemporaryMeta { // Statistics private final AtomicLong floatingMemoryUsageInByte = new AtomicLong(0L); + private final ConcurrentMap regionId2TsFileEpochDegradedMap = + new ConcurrentHashMap<>(); // Object pool private final String pipeNameWithCreationTime; @@ -53,6 +56,21 @@ public long getFloatingMemoryUsageInByte() { return floatingMemoryUsageInByte.get(); } + public void setTsFileEpochDegraded(final int regionId, final boolean isDegraded) { + regionId2TsFileEpochDegradedMap.put(regionId, isDegraded); + } + + public void clearTsFileEpochDegraded(final int regionId) { + regionId2TsFileEpochDegradedMap.remove(regionId); + } + + public Boolean getGlobalTsFileEpochDegraded() { + if (regionId2TsFileEpochDegradedMap.values().stream().anyMatch(Boolean.TRUE::equals)) { + return true; + } + return regionId2TsFileEpochDegradedMap.isEmpty() ? null : false; + } + public String getPipeNameWithCreationTime() { return pipeNameWithCreationTime; } @@ -87,12 +105,15 @@ public boolean equals(final Object o) { final PipeTemporaryMetaInAgent that = (PipeTemporaryMetaInAgent) o; return Objects.equals( this.floatingMemoryUsageInByte.get(), that.floatingMemoryUsageInByte.get()) + && Objects.equals( + this.regionId2TsFileEpochDegradedMap, that.regionId2TsFileEpochDegradedMap) && Objects.equals(this.regionId2CommitterKeyMap, that.regionId2CommitterKeyMap); } @Override public int hashCode() { - return Objects.hash(floatingMemoryUsageInByte, regionId2CommitterKeyMap); + return Objects.hash( + floatingMemoryUsageInByte.get(), regionId2TsFileEpochDegradedMap, regionId2CommitterKeyMap); } @Override @@ -100,6 +121,8 @@ public String toString() { return "PipeTemporaryMeta{" + "floatingMemoryUsage=" + floatingMemoryUsageInByte + + ", regionId2TsFileEpochDegradedMap=" + + regionId2TsFileEpochDegradedMap + ", regionId2CommitterKeyMap=" + regionId2CommitterKeyMap + '}'; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java index ee127bbae44a6..b44649ff30cea 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java @@ -32,6 +32,7 @@ public class PipeTemporaryMetaInCoordinator implements PipeTemporaryMeta { Collections.newSetFromMap(new ConcurrentHashMap<>()); private final ConcurrentMap nodeId2RemainingEventMap = new ConcurrentHashMap<>(); private final ConcurrentMap nodeId2RemainingTimeMap = new ConcurrentHashMap<>(); + private final ConcurrentMap nodeId2IsDegradedMap = new ConcurrentHashMap<>(); public void markDataNodeCompleted(final int dataNodeId) { completedDataNodeIds.add(dataNodeId); @@ -49,6 +50,14 @@ public void setRemainingTime(final int dataNodeId, final double remainingTime) { nodeId2RemainingTimeMap.put(dataNodeId, remainingTime); } + public void setDegraded(final int dataNodeId, final Boolean isDegraded) { + if (Objects.isNull(isDegraded)) { + nodeId2IsDegradedMap.remove(dataNodeId); + } else { + nodeId2IsDegradedMap.put(dataNodeId, isDegraded); + } + } + public Set getCompletedDataNodeIds() { return completedDataNodeIds; } @@ -61,6 +70,13 @@ public double getGlobalRemainingTime() { return nodeId2RemainingTimeMap.values().stream().reduce(Math::max).orElse(0d); } + public Boolean getGlobalDegraded() { + if (nodeId2IsDegradedMap.values().stream().anyMatch(Boolean.TRUE::equals)) { + return true; + } + return nodeId2IsDegradedMap.isEmpty() ? null : false; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -72,12 +88,17 @@ public boolean equals(final Object o) { final PipeTemporaryMetaInCoordinator that = (PipeTemporaryMetaInCoordinator) o; return Objects.equals(this.completedDataNodeIds, that.completedDataNodeIds) && Objects.equals(this.nodeId2RemainingEventMap, that.nodeId2RemainingEventMap) - && Objects.equals(this.nodeId2RemainingTimeMap, that.nodeId2RemainingTimeMap); + && Objects.equals(this.nodeId2RemainingTimeMap, that.nodeId2RemainingTimeMap) + && Objects.equals(this.nodeId2IsDegradedMap, that.nodeId2IsDegradedMap); } @Override public int hashCode() { - return Objects.hash(completedDataNodeIds, nodeId2RemainingEventMap, nodeId2RemainingTimeMap); + return Objects.hash( + completedDataNodeIds, + nodeId2RemainingEventMap, + nodeId2RemainingTimeMap, + nodeId2IsDegradedMap); } @Override @@ -87,8 +108,10 @@ public String toString() { + completedDataNodeIds + ", nodeId2RemainingEventMap=" + nodeId2RemainingEventMap - + ", nodeId2RemainingTimeMap" + + ", nodeId2RemainingTimeMap=" + nodeId2RemainingTimeMap + + ", nodeId2IsDegradedMap=" + + nodeId2IsDegradedMap + '}'; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index d9d6518ac0beb..a1c8b6ddd2086 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -196,6 +196,7 @@ private ColumnHeaderConstant() { public static final String EXCEPTION_MESSAGE = "ExceptionMessage"; public static final String REMAINING_EVENT_COUNT = "RemainingEventCount"; public static final String ESTIMATED_REMAINING_SECONDS = "EstimatedRemainingSeconds"; + public static final String IS_DEGRADED = "IsDegraded"; // column names for select into public static final String SOURCE_DEVICE = "SourceDevice"; @@ -272,6 +273,7 @@ private ColumnHeaderConstant() { public static final String REMAINING_EVENT_COUNT_TABLE_MODEL = "remaining_event_count"; public static final String ESTIMATED_REMAINING_SECONDS_TABLE_MODEL = "estimated_remaining_seconds"; + public static final String IS_DEGRADED_TABLE_MODEL = "is_degraded"; public static final String PLUGIN_NAME_TABLE_MODEL = "plugin_name"; public static final String PLUGIN_TYPE_TABLE_MODEL = "plugin_type"; @@ -608,7 +610,8 @@ private ColumnHeaderConstant() { new ColumnHeader(PIPE_CONNECTOR, TSDataType.TEXT), new ColumnHeader(EXCEPTION_MESSAGE, TSDataType.TEXT), new ColumnHeader(REMAINING_EVENT_COUNT, TSDataType.TEXT), - new ColumnHeader(ESTIMATED_REMAINING_SECONDS, TSDataType.TEXT)); + new ColumnHeader(ESTIMATED_REMAINING_SECONDS, TSDataType.TEXT), + new ColumnHeader(IS_DEGRADED, TSDataType.BOOLEAN)); public static final List showTopicColumnHeaders = ImmutableList.of( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java index 27a1dbdd9d2f7..32f51a173bec4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java @@ -225,6 +225,9 @@ public class InformationSchema { pipeTable.addColumnSchema( new AttributeColumnSchema( ColumnHeaderConstant.ESTIMATED_REMAINING_SECONDS_TABLE_MODEL, TSDataType.DOUBLE)); + pipeTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.IS_DEGRADED_TABLE_MODEL, TSDataType.BOOLEAN)); schemaTables.put(PIPES, pipeTable); final TsTable pipePluginTable = new TsTable(PIPE_PLUGINS); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaTest.java new file mode 100644 index 0000000000000..b6091ab76f7e6 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.agent.task.meta; + +import org.junit.Assert; +import org.junit.Test; + +public class PipeTemporaryMetaTest { + + @Test + public void testTsFileEpochDegradedStatusCodec() { + Assert.assertEquals( + PipeTemporaryMeta.TS_FILE_EPOCH_DEGRADED_STATUS_UNKNOWN, + PipeTemporaryMeta.encodeTsFileEpochDegradedStatus(null)); + Assert.assertEquals( + PipeTemporaryMeta.TS_FILE_EPOCH_DEGRADED_STATUS_FALSE, + PipeTemporaryMeta.encodeTsFileEpochDegradedStatus(false)); + Assert.assertEquals( + PipeTemporaryMeta.TS_FILE_EPOCH_DEGRADED_STATUS_TRUE, + PipeTemporaryMeta.encodeTsFileEpochDegradedStatus(true)); + + Assert.assertNull(PipeTemporaryMeta.decodeTsFileEpochDegradedStatus(null)); + Assert.assertNull( + PipeTemporaryMeta.decodeTsFileEpochDegradedStatus( + PipeTemporaryMeta.TS_FILE_EPOCH_DEGRADED_STATUS_UNKNOWN)); + Assert.assertEquals( + Boolean.FALSE, + PipeTemporaryMeta.decodeTsFileEpochDegradedStatus( + PipeTemporaryMeta.TS_FILE_EPOCH_DEGRADED_STATUS_FALSE)); + Assert.assertEquals( + Boolean.TRUE, + PipeTemporaryMeta.decodeTsFileEpochDegradedStatus( + PipeTemporaryMeta.TS_FILE_EPOCH_DEGRADED_STATUS_TRUE)); + } + + @Test + public void testAgentAggregatesTsFileEpochDegradedStatus() { + final PipeTemporaryMetaInAgent temporaryMeta = new PipeTemporaryMetaInAgent("test_pipe", 1L); + + Assert.assertNull(temporaryMeta.getGlobalTsFileEpochDegraded()); + + temporaryMeta.setTsFileEpochDegraded(1, false); + temporaryMeta.setTsFileEpochDegraded(2, false); + Assert.assertEquals(Boolean.FALSE, temporaryMeta.getGlobalTsFileEpochDegraded()); + + temporaryMeta.setTsFileEpochDegraded(2, true); + Assert.assertEquals(Boolean.TRUE, temporaryMeta.getGlobalTsFileEpochDegraded()); + + temporaryMeta.clearTsFileEpochDegraded(2); + Assert.assertEquals(Boolean.FALSE, temporaryMeta.getGlobalTsFileEpochDegraded()); + + temporaryMeta.clearTsFileEpochDegraded(1); + Assert.assertNull(temporaryMeta.getGlobalTsFileEpochDegraded()); + } + + @Test + public void testCoordinatorAggregatesNullableDegradedStatus() { + final PipeTemporaryMetaInCoordinator temporaryMeta = new PipeTemporaryMetaInCoordinator(); + + Assert.assertNull(temporaryMeta.getGlobalDegraded()); + + temporaryMeta.setDegraded(1, false); + temporaryMeta.setDegraded(2, false); + Assert.assertEquals(Boolean.FALSE, temporaryMeta.getGlobalDegraded()); + + temporaryMeta.setDegraded(2, true); + Assert.assertEquals(Boolean.TRUE, temporaryMeta.getGlobalDegraded()); + + temporaryMeta.setDegraded(2, null); + Assert.assertEquals(Boolean.FALSE, temporaryMeta.getGlobalDegraded()); + + temporaryMeta.setDegraded(1, null); + Assert.assertNull(temporaryMeta.getGlobalDegraded()); + } +} diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift index a107b0259ff09..d52dc08813fbd 100644 --- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift +++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift @@ -202,6 +202,7 @@ struct TPipeHeartbeatResp { 2: optional list pipeCompletedList 3: optional list pipeRemainingEventCountList 4: optional list pipeRemainingTimeList + 5: optional list pipeDegradedStatusList } struct TLicense { diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 852460fb60e8b..955aa6fe39ad7 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -869,6 +869,7 @@ struct TShowPipeInfo { 7: required string exceptionMessage 8: optional i64 remainingEventCount 9: optional double EstimatedRemainingTime + 10: optional bool isDegraded } struct TGetAllPipeInfoResp { diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 8316aa0ec5bc6..b74895260fa3b 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -316,6 +316,7 @@ struct TDataNodeHeartbeatResp { 15: optional list pipeRemainingEventCountList 16: optional list pipeRemainingTimeList 17: optional map dataRegionRawDataSize + 18: optional list pipeDegradedStatusList } struct TPipeHeartbeatReq {