Skip to content

Commit 41eb7e6

Browse files
committed
[Pipe] Improve pipe control-flow lock responsiveness (#18076)
* Improve pipe control flow lock responsiveness * Fix pipe meta helper test initialization
1 parent eb42ffb commit 41eb7e6

5 files changed

Lines changed: 528 additions & 48 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import java.util.Collections;
4747
import java.util.List;
4848
import java.util.Map;
49-
import java.util.concurrent.TimeUnit;
5049
import java.util.concurrent.atomic.AtomicBoolean;
5150
import java.util.concurrent.atomic.AtomicReference;
5251

@@ -93,6 +92,10 @@ public abstract class AbstractOperatePipeProcedureV2
9392
// putting it here is just for convenience
9493
protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
9594

95+
// Only used to release global locks before retrying the same state. Do not serialize it because a
96+
// recovered procedure is already re-scheduled by the procedure framework.
97+
private transient boolean shouldYieldAfterExecution;
98+
9699
private static final String SKIP_PIPE_PROCEDURE_MESSAGE =
97100
"Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";
98101

@@ -162,15 +165,17 @@ protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
162165
LOGGER.warn("ProcedureId {} release lock. No need to release pipe lock.", getProcId());
163166
} else {
164167
LOGGER.debug("ProcedureId {} release lock. Pipe lock will be released.", getProcId());
165-
if (this instanceof PipeMetaSyncProcedure) {
168+
if (isSuccess() && this instanceof PipeMetaSyncProcedure) {
166169
configNodeProcedureEnv
167170
.getConfigManager()
168171
.getPipeManager()
169172
.getPipeTaskCoordinator()
170173
.updateLastSyncedVersion();
171174
}
172-
PipeProcedureMetrics.getInstance()
173-
.updateTimer(this.getOperation().getName(), this.elapsedTime());
175+
if (isFinished()) {
176+
PipeProcedureMetrics.getInstance()
177+
.updateTimer(this.getOperation().getName(), this.elapsedTime());
178+
}
174179
releasePipeTaskCoordinatorLock(configNodeProcedureEnv);
175180
}
176181
}
@@ -196,7 +201,7 @@ private void releasePipeTaskCoordinatorLock(ConfigNodeProcedureEnv configNodePro
196201
public abstract void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env);
197202

198203
/**
199-
* Execute at state {@link OperatePipeTaskState#WRITE_CONFIG_NODE_CONSENSUS}.
204+
* Execute at state {@link OperatePipeTaskState#WRITE_CONFIG_NODE_CONSENSUS}.
200205
*
201206
* @throws PipeException if configNode consensus write failed
202207
*/
@@ -215,6 +220,7 @@ public abstract void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
215220
@Override
216221
protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState state)
217222
throws InterruptedException {
223+
shouldYieldAfterExecution = false;
218224
if (pipeTaskInfo == null) {
219225
LOGGER.warn(
220226
"ProcedureId {}: Pipe lock is not acquired, executeFromState's execution will be skipped.",
@@ -262,8 +268,7 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState
262268
RETRY_THRESHOLD,
263269
e);
264270
setNextState(getCurrentState());
265-
// Wait 3s for next retry
266-
TimeUnit.MILLISECONDS.sleep(3000L);
271+
shouldYieldAfterExecution = true;
267272
} else {
268273
LOGGER.warn(
269274
"ProcedureId {}: All {} retries failed when trying to {} at state [{}], will rollback...",
@@ -283,6 +288,11 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState
283288
return Flow.HAS_MORE_STATE;
284289
}
285290

291+
@Override
292+
protected boolean isYieldAfterExecution(final ConfigNodeProcedureEnv env) {
293+
return shouldYieldAfterExecution;
294+
}
295+
286296
@Override
287297
protected boolean isRollbackSupported(OperatePipeTaskState state) {
288298
return true;
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.procedure.impl.pipe;
21+
22+
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
23+
import org.apache.iotdb.confignode.procedure.Procedure;
24+
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
25+
import org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState;
26+
import org.apache.iotdb.pipe.api.exception.PipeException;
27+
28+
import org.junit.Assert;
29+
import org.junit.Test;
30+
31+
import java.io.IOException;
32+
import java.util.concurrent.atomic.AtomicReference;
33+
34+
public class AbstractOperatePipeProcedureV2Test {
35+
36+
@Test
37+
public void testSuccessfulStateDoesNotYield() throws Exception {
38+
final TestOperatePipeProcedure procedure = new TestOperatePipeProcedure();
39+
40+
Assert.assertEquals(
41+
StateMachineProcedure.Flow.HAS_MORE_STATE,
42+
procedure.executeFromState(null, OperatePipeTaskState.VALIDATE_TASK));
43+
44+
Assert.assertFalse(procedure.isYieldAfterExecution(null));
45+
Assert.assertEquals(1, procedure.validateExecutionCount);
46+
}
47+
48+
@Test
49+
public void testRetryStateYieldsAndResetsAfterNextExecution() throws Exception {
50+
final TestOperatePipeProcedure procedure = new TestOperatePipeProcedure();
51+
procedure.failValidation = true;
52+
53+
Assert.assertEquals(
54+
StateMachineProcedure.Flow.HAS_MORE_STATE,
55+
procedure.executeFromState(null, OperatePipeTaskState.VALIDATE_TASK));
56+
57+
Assert.assertTrue(procedure.isYieldAfterExecution(null));
58+
Assert.assertEquals(1, procedure.validateExecutionCount);
59+
60+
procedure.failValidation = false;
61+
Assert.assertEquals(
62+
StateMachineProcedure.Flow.HAS_MORE_STATE,
63+
procedure.executeFromState(null, OperatePipeTaskState.VALIDATE_TASK));
64+
65+
Assert.assertFalse(procedure.isYieldAfterExecution(null));
66+
Assert.assertEquals(2, procedure.validateExecutionCount);
67+
}
68+
69+
@Test
70+
public void testRetryStateYieldsOnlyBeforeRetryThreshold() throws Exception {
71+
final TestOperatePipeProcedure procedure = new TestOperatePipeProcedure();
72+
73+
final Procedure<?>[] validateSubProcedures = procedure.runOnce();
74+
Assert.assertEquals(1, validateSubProcedures.length);
75+
Assert.assertSame(procedure, validateSubProcedures[0]);
76+
Assert.assertFalse(procedure.isYieldAfterExecution(null));
77+
78+
procedure.failCalculation = true;
79+
final Procedure<?>[] calculateSubProcedures = procedure.runOnce();
80+
Assert.assertEquals(1, calculateSubProcedures.length);
81+
Assert.assertSame(procedure, calculateSubProcedures[0]);
82+
Assert.assertTrue(procedure.isYieldAfterExecution(null));
83+
Assert.assertEquals(1, procedure.calculateExecutionCount);
84+
85+
Assert.assertNull(procedure.runOnce());
86+
Assert.assertTrue(procedure.hasException());
87+
Assert.assertFalse(procedure.isYieldAfterExecution(null));
88+
Assert.assertEquals(2, procedure.calculateExecutionCount);
89+
}
90+
91+
private static class TestOperatePipeProcedure extends AbstractOperatePipeProcedureV2 {
92+
93+
private int validateExecutionCount;
94+
private int calculateExecutionCount;
95+
private boolean failValidation;
96+
private boolean failCalculation;
97+
98+
private TestOperatePipeProcedure() {
99+
pipeTaskInfo = new AtomicReference<>(new PipeTaskInfo());
100+
}
101+
102+
private Procedure<?>[] runOnce() throws InterruptedException {
103+
return execute(null);
104+
}
105+
106+
@Override
107+
protected PipeTaskOperation getOperation() {
108+
return PipeTaskOperation.START_PIPE;
109+
}
110+
111+
@Override
112+
public boolean executeFromValidateTask(
113+
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env)
114+
throws PipeException {
115+
validateExecutionCount++;
116+
if (failValidation) {
117+
throw new PipeException("retry");
118+
}
119+
return true;
120+
}
121+
122+
@Override
123+
public void executeFromCalculateInfoForTask(
124+
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) {
125+
calculateExecutionCount++;
126+
if (failCalculation) {
127+
throw new RuntimeException("retry");
128+
}
129+
}
130+
131+
@Override
132+
public void executeFromWriteConfigNodeConsensus(
133+
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) {
134+
// Do nothing
135+
}
136+
137+
@Override
138+
public void executeFromOperateOnDataNodes(
139+
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) {
140+
// Do nothing
141+
}
142+
143+
@Override
144+
public void rollbackFromValidateTask(
145+
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) {
146+
// Do nothing
147+
}
148+
149+
@Override
150+
public void rollbackFromCalculateInfoForTask(
151+
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) {
152+
// Do nothing
153+
}
154+
155+
@Override
156+
public void rollbackFromWriteConfigNodeConsensus(
157+
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) {
158+
// Do nothing
159+
}
160+
161+
@Override
162+
public void rollbackFromOperateOnDataNodes(
163+
final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env)
164+
throws IOException {
165+
// Do nothing
166+
}
167+
}
168+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java

Lines changed: 12 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,49 +1155,20 @@ public TPushPipeMetaResp pushSinglePipeMeta(TPushSinglePipeMetaReq req) {
11551155

11561156
@Override
11571157
public TPushPipeMetaResp pushMultiPipeMeta(TPushMultiPipeMetaReq req) {
1158-
boolean hasException = false;
1159-
// If there is any exception, we use the size of exceptionMessages to record the fail index
1160-
List<TPushPipeMetaRespExceptionMessage> exceptionMessages = new ArrayList<>();
1161-
try {
1162-
if (req.isSetPipeNamesToDrop()) {
1163-
for (String pipeNameToDrop : req.getPipeNamesToDrop()) {
1164-
TPushPipeMetaRespExceptionMessage message =
1165-
PipeDataNodeAgent.task().handleDropPipe(pipeNameToDrop);
1166-
exceptionMessages.add(message);
1167-
if (message != null) {
1168-
// If there is any exception, skip the remaining pipes
1169-
hasException = true;
1170-
break;
1171-
}
1172-
}
1173-
} else if (req.isSetPipeMetas()) {
1174-
for (ByteBuffer byteBuffer : req.getPipeMetas()) {
1175-
final PipeMeta pipeMeta = PipeMeta.deserialize4TaskAgent(byteBuffer);
1176-
TPushPipeMetaRespExceptionMessage message =
1177-
PipeDataNodeAgent.task().handleSinglePipeMetaChanges(pipeMeta);
1178-
exceptionMessages.add(message);
1179-
if (message != null) {
1180-
// If there is any exception, skip the remaining pipes
1181-
hasException = true;
1182-
break;
1158+
return PushMultiPipeMetaHelper.pushMultiPipeMeta(
1159+
req,
1160+
new PushMultiPipeMetaHelper.Handler() {
1161+
@Override
1162+
public TPushPipeMetaRespExceptionMessage handleDropPipe(final String pipeName) {
1163+
return PipeDataNodeAgent.task().handleDropPipe(pipeName);
11831164
}
1184-
}
1185-
} else {
1186-
throw new Exception("Invalid TPushMultiPipeMetaReq");
1187-
}
11881165

1189-
return hasException
1190-
? new TPushPipeMetaResp()
1191-
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
1192-
.setExceptionMessages(exceptionMessages)
1193-
: new TPushPipeMetaResp()
1194-
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
1195-
} catch (Exception e) {
1196-
LOGGER.warn("Error occurred when pushing multi pipe meta", e);
1197-
return new TPushPipeMetaResp()
1198-
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
1199-
.setExceptionMessages(exceptionMessages);
1200-
}
1166+
@Override
1167+
public TPushPipeMetaRespExceptionMessage handleSinglePipeMeta(final ByteBuffer pipeMeta) {
1168+
return PipeDataNodeAgent.task()
1169+
.handleSinglePipeMetaChanges(PipeMeta.deserialize4TaskAgent(pipeMeta));
1170+
}
1171+
});
12011172
}
12021173

12031174
@Override
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.protocol.thrift.impl;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
24+
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
25+
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
26+
import org.apache.iotdb.rpc.TSStatusCode;
27+
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.nio.ByteBuffer;
32+
import java.util.ArrayList;
33+
import java.util.List;
34+
35+
final class PushMultiPipeMetaHelper {
36+
37+
private static final Logger LOGGER = LoggerFactory.getLogger(PushMultiPipeMetaHelper.class);
38+
39+
private PushMultiPipeMetaHelper() {
40+
// Utility class
41+
}
42+
43+
interface Handler {
44+
45+
TPushPipeMetaRespExceptionMessage handleDropPipe(String pipeName) throws Exception;
46+
47+
TPushPipeMetaRespExceptionMessage handleSinglePipeMeta(ByteBuffer pipeMeta) throws Exception;
48+
}
49+
50+
static TPushPipeMetaResp pushMultiPipeMeta(
51+
final TPushMultiPipeMetaReq req, final Handler handler) {
52+
final List<TPushPipeMetaRespExceptionMessage> exceptionMessages = new ArrayList<>();
53+
try {
54+
if (req.isSetPipeNamesToDrop()) {
55+
for (final String pipeNameToDrop : req.getPipeNamesToDrop()) {
56+
final TPushPipeMetaRespExceptionMessage message = handler.handleDropPipe(pipeNameToDrop);
57+
if (message != null) {
58+
exceptionMessages.add(message);
59+
}
60+
}
61+
} else if (req.isSetPipeMetas()) {
62+
for (final ByteBuffer pipeMeta : req.getPipeMetas()) {
63+
final TPushPipeMetaRespExceptionMessage message = handler.handleSinglePipeMeta(pipeMeta);
64+
if (message != null) {
65+
exceptionMessages.add(message);
66+
}
67+
}
68+
} else {
69+
throw new Exception("Invalid TPushMultiPipeMetaReq");
70+
}
71+
72+
return exceptionMessages.isEmpty()
73+
? new TPushPipeMetaResp()
74+
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
75+
: new TPushPipeMetaResp()
76+
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
77+
.setExceptionMessages(exceptionMessages);
78+
} catch (final Exception e) {
79+
LOGGER.warn("Error occurred when pushing multi pipe meta", e);
80+
return new TPushPipeMetaResp()
81+
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
82+
.setExceptionMessages(exceptionMessages);
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)