Skip to content

Commit 42e94f6

Browse files
committed
Add degraded status to show pipes
1 parent fcd52d7 commit 42e94f6

30 files changed

Lines changed: 234 additions & 24 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,8 @@ public void testInformationSchema() throws SQLException {
551551
"pipe_sink,STRING,ATTRIBUTE,",
552552
"exception_message,STRING,ATTRIBUTE,",
553553
"remaining_event_count,INT64,ATTRIBUTE,",
554-
"estimated_remaining_seconds,DOUBLE,ATTRIBUTE,")));
554+
"estimated_remaining_seconds,DOUBLE,ATTRIBUTE,",
555+
"is_degraded,BOOLEAN,ATTRIBUTE,")));
555556
TestUtils.assertResultSetEqual(
556557
statement.executeQuery("desc pipe_plugins"),
557558
"ColumnName,DataType,Category,",
@@ -673,7 +674,7 @@ public void testInformationSchema() throws SQLException {
673674
// Filter out not self-created pipes
674675
TestUtils.assertResultSetEqual(
675676
statement.executeQuery("select * from pipes"),
676-
"id,creation_time,state,pipe_source,pipe_processor,pipe_sink,exception_message,remaining_event_count,estimated_remaining_seconds,",
677+
"id,creation_time,state,pipe_source,pipe_processor,pipe_sink,exception_message,remaining_event_count,estimated_remaining_seconds,is_degraded,",
677678
Collections.emptySet());
678679

679680
// No auth needed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ private void cachePipeHeartbeat(TDataNodeHeartbeatResp heartbeatResp) {
184184
heartbeatResp.getPipeMetaList(),
185185
heartbeatResp.getPipeCompletedList(),
186186
heartbeatResp.getPipeRemainingEventCountList(),
187-
heartbeatResp.getPipeRemainingTimeList());
187+
heartbeatResp.getPipeRemainingTimeList(),
188+
heartbeatResp.getPipeDegradedList());
188189
}
189190
}
190191

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ public TShowPipeResp convertToTShowPipeResp() {
270270
canCalculateOnLocal ? -1 : temporaryMeta.getGlobalRemainingEvents());
271271
showPipeInfo.setEstimatedRemainingTime(
272272
canCalculateOnLocal ? -1 : temporaryMeta.getGlobalRemainingTime());
273+
showPipeInfo.setIsDegraded(temporaryMeta.isGlobalDegraded());
273274
showPipeInfoList.add(showPipeInfo);
274275
}
275276

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3178,7 +3178,8 @@ public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp res
31783178
resp.getPipeMetaList(),
31793179
resp.getPipeCompletedList(),
31803180
resp.getPipeRemainingEventCountList(),
3181-
resp.getPipeRemainingTimeList());
3181+
resp.getPipeRemainingTimeList(),
3182+
resp.getPipeDegradedList());
31823183
return StatusUtils.OK;
31833184
}
31843185

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
2929
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
3030
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
31+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInAgent;
3132
import org.apache.iotdb.commons.pipe.config.PipeConfig;
3233
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
3334
import org.apache.iotdb.confignode.i18n.ManagerMessages;
@@ -216,6 +217,7 @@ protected void collectPipeMetaListInternal(
216217
final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
217218
final List<Long> pipeRemainingEventCountList = new ArrayList<>();
218219
final List<Double> pipeRemainingTimeList = new ArrayList<>();
220+
final List<Boolean> pipeDegradedList = new ArrayList<>();
219221
try {
220222
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
221223
pipeMetaBinaryList.add(pipeMeta.serialize());
@@ -230,6 +232,7 @@ protected void collectPipeMetaListInternal(
230232

231233
pipeRemainingEventCountList.add(remainingEventCount);
232234
pipeRemainingTimeList.add(estimatedRemainingTime);
235+
pipeDegradedList.add(((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()).isDegraded());
233236

234237
logger.ifPresent(
235238
l ->
@@ -246,6 +249,7 @@ protected void collectPipeMetaListInternal(
246249
resp.setPipeMetaList(pipeMetaBinaryList);
247250
resp.setPipeRemainingEventCountList(pipeRemainingEventCountList);
248251
resp.setPipeRemainingTimeList(pipeRemainingTimeList);
252+
resp.setPipeDegradedList(pipeDegradedList);
249253
}
250254

251255
@Override

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,15 @@ public void parseHeartbeat(
9595
final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
9696
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
9797
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
98-
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
98+
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent,
99+
/* @Nullable */ final List<Boolean> pipeDegradedListFromAgent) {
99100
pipeHeartbeatScheduler.parseHeartbeat(
100101
dataNodeId,
101102
new PipeHeartbeat(
102103
pipeMetaByteBufferListFromDataNode,
103104
pipeCompletedListFromAgent,
104105
pipeRemainingEventCountListFromAgent,
105-
pipeRemainingTimeListFromAgent));
106+
pipeRemainingTimeListFromAgent,
107+
pipeDegradedListFromAgent));
106108
}
107109
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,14 @@ public class PipeHeartbeat {
3333
private final Map<PipeStaticMeta, Boolean> isCompletedMap = new HashMap<>();
3434
private final Map<PipeStaticMeta, Long> remainingEventCountMap = new HashMap<>();
3535
private final Map<PipeStaticMeta, Double> remainingTimeMap = new HashMap<>();
36+
private final Map<PipeStaticMeta, Boolean> isDegradedMap = new HashMap<>();
3637

3738
public PipeHeartbeat(
3839
final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
3940
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
4041
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
41-
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
42+
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent,
43+
/* @Nullable */ final List<Boolean> pipeDegradedListFromAgent) {
4244
// Shall not reach here, just in case
4345
if (Objects.isNull(pipeMetaByteBufferListFromAgent)) {
4446
return;
@@ -49,20 +51,29 @@ public PipeHeartbeat(
4951
pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta);
5052
isCompletedMap.put(
5153
pipeMeta.getStaticMeta(),
52-
Objects.nonNull(pipeCompletedListFromAgent) && pipeCompletedListFromAgent.get(i));
54+
Objects.nonNull(pipeCompletedListFromAgent)
55+
&& i < pipeCompletedListFromAgent.size()
56+
&& pipeCompletedListFromAgent.get(i));
5357
// If remaining event count & remaining time can not be got, it implies that the heartbeat is
5458
// from an ancient version of DataNode. Here we guarantee that "0" will not affect both of
5559
// the final results and namely these dataNodes are omitted in calculation.
5660
remainingEventCountMap.put(
5761
pipeMeta.getStaticMeta(),
5862
Objects.nonNull(pipeRemainingEventCountListFromAgent)
63+
&& i < pipeRemainingEventCountListFromAgent.size()
5964
? pipeRemainingEventCountListFromAgent.get(i)
6065
: 0L);
6166
remainingTimeMap.put(
6267
pipeMeta.getStaticMeta(),
6368
Objects.nonNull(pipeRemainingTimeListFromAgent)
69+
&& i < pipeRemainingTimeListFromAgent.size()
6470
? pipeRemainingTimeListFromAgent.get(i)
6571
: 0d);
72+
isDegradedMap.put(
73+
pipeMeta.getStaticMeta(),
74+
Objects.nonNull(pipeDegradedListFromAgent)
75+
&& i < pipeDegradedListFromAgent.size()
76+
&& pipeDegradedListFromAgent.get(i));
6677
}
6778
}
6879

@@ -86,6 +97,10 @@ public Double getRemainingTime(final PipeStaticMeta pipeStaticMeta) {
8697
return remainingTimeMap.get(pipeStaticMeta);
8798
}
8899

100+
public boolean isDegraded(final PipeStaticMeta pipeStaticMeta) {
101+
return Boolean.TRUE.equals(isDegradedMap.get(pipeStaticMeta));
102+
}
103+
89104
public boolean isEmpty() {
90105
return pipeMetaMap.isEmpty();
91106
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
192192
// Record statistics
193193
temporaryMeta.setRemainingEvent(nodeId, pipeHeartbeat.getRemainingEventCount(staticMeta));
194194
temporaryMeta.setRemainingTime(nodeId, pipeHeartbeat.getRemainingTime(staticMeta));
195+
temporaryMeta.setDegraded(nodeId, pipeHeartbeat.isDegraded(staticMeta));
195196

196197
final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromCoordinator =
197198
pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ private synchronized void heartbeat() {
115115
resp.getPipeMetaList(),
116116
resp.getPipeCompletedList(),
117117
resp.getPipeRemainingEventCountList(),
118-
resp.getPipeRemainingTimeList())));
118+
resp.getPipeRemainingTimeList(),
119+
resp.getPipeDegradedList())));
119120

120121
// config node heartbeat
121122
try {
@@ -127,7 +128,8 @@ private synchronized void heartbeat() {
127128
configNodeResp.getPipeMetaList(),
128129
null,
129130
configNodeResp.getPipeRemainingEventCountList(),
130-
configNodeResp.getPipeRemainingTimeList()));
131+
configNodeResp.getPipeRemainingTimeList(),
132+
configNodeResp.getPipeDegradedList()));
131133
} catch (final Exception e) {
132134
PipeLogger.log(
133135
LOGGER::warn, e, ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK);

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@
2020
package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
2121

2222
import org.apache.iotdb.commons.conf.CommonDescriptor;
23+
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
24+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
25+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
26+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
27+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
28+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
29+
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
2330
import org.apache.iotdb.confignode.manager.ConfigManager;
2431
import org.apache.iotdb.confignode.manager.ProcedureManager;
2532
import org.apache.iotdb.confignode.manager.node.NodeManager;
@@ -35,11 +42,13 @@
3542

3643
import java.lang.reflect.Field;
3744
import java.util.Collections;
45+
import java.util.HashMap;
3846
import java.util.concurrent.CompletableFuture;
3947
import java.util.concurrent.ExecutorService;
4048
import java.util.concurrent.atomic.AtomicBoolean;
4149
import java.util.concurrent.atomic.AtomicReference;
4250

51+
import static org.junit.Assert.assertTrue;
4352
import static org.mockito.ArgumentMatchers.any;
4453
import static org.mockito.ArgumentMatchers.anyBoolean;
4554
import static org.mockito.Mockito.never;
@@ -117,7 +126,38 @@ public void testParseHeartbeatKeepsPendingFlagsWhenProcedureSubmissionFails() th
117126
verify(context.procedureManager, times(2)).pipeHandleMetaChange(true, false);
118127
}
119128

129+
@Test
130+
public void testParseHeartbeatRecordsPipeDegradedStatus() throws Exception {
131+
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);
132+
133+
final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo();
134+
final PipeMeta pipeMeta = createPipeMeta();
135+
pipeTaskInfo.createPipe(
136+
new CreatePipePlanV2(pipeMeta.getStaticMeta(), pipeMeta.getRuntimeMeta()));
137+
138+
final ParserTestContext context = createParserTestContext(1, pipeTaskInfo);
139+
context.parser.parseHeartbeat(
140+
1,
141+
new PipeHeartbeat(
142+
Collections.singletonList(pipeMeta.serialize()),
143+
Collections.singletonList(false),
144+
Collections.singletonList(0L),
145+
Collections.singletonList(0d),
146+
Collections.singletonList(true)));
147+
148+
assertTrue(
149+
((PipeTemporaryMetaInCoordinator)
150+
pipeTaskInfo.getPipeMetaByPipeName("test_pipe").getTemporaryMeta())
151+
.isGlobalDegraded());
152+
verify(context.procedureManager, never()).pipeHandleMetaChange(anyBoolean(), anyBoolean());
153+
}
154+
120155
private ParserTestContext createParserTestContext(final int registeredDataNodeCount) {
156+
return createParserTestContext(registeredDataNodeCount, new PipeTaskInfo());
157+
}
158+
159+
private ParserTestContext createParserTestContext(
160+
final int registeredDataNodeCount, final PipeTaskInfo pipeTaskInfo) {
121161
final ConfigManager configManager = Mockito.mock(ConfigManager.class);
122162
final NodeManager nodeManager = Mockito.mock(NodeManager.class);
123163
final ProcedureManager procedureManager = Mockito.mock(ProcedureManager.class);
@@ -134,7 +174,7 @@ private ParserTestContext createParserTestContext(final int registeredDataNodeCo
134174
when(pipeManager.getPipeRuntimeCoordinator()).thenReturn(pipeRuntimeCoordinator);
135175
when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator);
136176
when(pipeRuntimeCoordinator.getProcedureSubmitter()).thenReturn(procedureSubmitter);
137-
when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(new PipeTaskInfo()));
177+
when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(pipeTaskInfo));
138178
when(procedureManager.pipeHandleMetaChange(anyBoolean(), anyBoolean())).thenReturn(true);
139179
Mockito.doAnswer(
140180
invocation -> {
@@ -165,8 +205,18 @@ private void setAtomicBooleanField(
165205
((AtomicBoolean) field.get(parser)).set(value);
166206
}
167207

208+
private PipeMeta createPipeMeta() {
209+
final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
210+
pipeRuntimeMeta
211+
.getConsensusGroupId2TaskMetaMap()
212+
.put(1, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1));
213+
return new PipeMeta(
214+
new PipeStaticMeta("test_pipe", 1L, new HashMap<>(), new HashMap<>(), new HashMap<>()),
215+
pipeRuntimeMeta);
216+
}
217+
168218
private PipeHeartbeat emptyHeartbeat() {
169-
return new PipeHeartbeat(Collections.emptyList(), null, null, null);
219+
return new PipeHeartbeat(Collections.emptyList(), null, null, null, null);
170220
}
171221

172222
private static class ParserTestContext {

0 commit comments

Comments
 (0)