diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 050de467f2687..13369fc5ee3f3 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -222,6 +222,9 @@ public final class DataNodePipeMessages { public static final String PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED = "Pipe sink subtasks with attributes {} is bounded with sinkExecutor {} and " + "callbackExecutor {}."; + public static final String PIPE_SINK_SUBTASK_DELAYED_TO_AVOID_FREQUENT_HANDSHAKES = + "Pipe sink subtask {} is delayed for {} ms before polling events to avoid frequent " + + "handshakes after client borrow failures."; public static final String PIPE_SKIPPING_TEMPORARY_TSFILE_WHICH_SHOULDN_T = "Pipe skipping temporary TsFile which shouldn't be transferred: {}"; public static final String PULLED_PIPE_META_FROM_CONFIG_NODE_RECOVERING = diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 147d977d57269..49ee222c3e9fa 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -213,6 +213,8 @@ public final class DataNodePipeMessages { public static final String PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED = "Pipe sink subtasks with attributes {} is bounded with sinkExecutor {} and " + "callbackExecutor {}."; + public static final String PIPE_SINK_SUBTASK_DELAYED_TO_AVOID_FREQUENT_HANDSHAKES = + "Pipe sink 子任务 {} 在拉取事件前延迟 {} ms,以避免客户端借用失败后频繁握手。"; public static final String PIPE_SKIPPING_TEMPORARY_TSFILE_WHICH_SHOULDN_T = "Pipe 跳过不应传输的临时 TsFile:{}"; public static final String PULLED_PIPE_META_FROM_CONFIG_NODE_RECOVERING = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index c37aab5af2c8d..295f30da3f202 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -49,6 +49,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException; import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +105,7 @@ public PipeProcessorSubtask( @Override public void bindExecutors( final ListeningExecutorService subtaskWorkerThreadPoolExecutor, + final ListeningScheduledExecutorService ignoredScheduledExecutor, final ExecutorService ignored, final PipeSubtaskScheduler subtaskScheduler) { this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 10b746778e35e..d9730d58b4990 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeSinkWithSchedulingDelay; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.db.i18n.DataNodePipeMessages; @@ -162,6 +163,37 @@ protected boolean executeOnce() { return true; } + @Override + protected long peekSchedulingDelayInMs() { + if (!(outputPipeSink instanceof PipeSinkWithSchedulingDelay)) { + return 0; + } + + return ((PipeSinkWithSchedulingDelay) outputPipeSink).peekSchedulingDelayMs(); + } + + @Override + protected long consumeSchedulingDelayInMs() { + if (!(outputPipeSink instanceof PipeSinkWithSchedulingDelay)) { + return 0; + } + + final long remainingSchedulingDelayMs = + ((PipeSinkWithSchedulingDelay) outputPipeSink).consumeSchedulingDelayMs(); + if (remainingSchedulingDelayMs <= 0) { + return 0; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + DataNodePipeMessages.PIPE_SINK_SUBTASK_DELAYED_TO_AVOID_FREQUENT_HANDSHAKES, + getDisplayTaskID(), + remainingSchedulingDelayMs); + } + + return remainingSchedulingDelayMs; + } + private void transferHeartbeatEvent(final PipeHeartbeatEvent event) { // DO NOT call heartbeat or transfer after closed, or will cause connection leak if (isClosed.get()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 7c85268e6d3a9..f02f1be367149 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -29,6 +29,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeSinkWithSchedulingDelay; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; @@ -102,7 +103,7 @@ @TreeModel @TableModel -public class IoTDBDataRegionAsyncSink extends IoTDBSink { +public class IoTDBDataRegionAsyncSink extends IoTDBSink implements PipeSinkWithSchedulingDelay { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionAsyncSink.class); @@ -130,6 +131,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { // use these variables to prevent reference count leaks under some corner cases when closing private final AtomicBoolean isClosed = new AtomicBoolean(false); + private int consecutiveHandshakeFailureCount = 0; + private final AtomicLong schedulingDelayMs = new AtomicLong(0); private final Map pendingHandlers = new ConcurrentHashMap<>(); @@ -352,8 +355,10 @@ private void transfer( AsyncPipeDataTransferServiceClient client = null; try { client = clientManager.borrowClient(endPoint); + markHandshakeSucceeded(); pipeTransferTabletBatchEventHandler.transfer(client); } catch (final Exception ex) { + markSchedulingDelayIfHandshakeFailed(client); logOnClientException(client, ex); pipeTransferTabletBatchEventHandler.onError(ex); } @@ -365,8 +370,10 @@ private void transfer( AsyncPipeDataTransferServiceClient client = null; try { client = clientManager.borrowClient(deviceId); + markHandshakeSucceeded(); pipeTransferInsertNodeReqHandler.transfer(client); } catch (final Exception ex) { + markSchedulingDelayIfHandshakeFailed(client); logOnClientException(client, ex); pipeTransferInsertNodeReqHandler.onError(ex); } @@ -377,8 +384,10 @@ private void transfer( AsyncPipeDataTransferServiceClient client = null; try { client = clientManager.borrowClient(deviceId); + markHandshakeSucceeded(); pipeTransferTabletReqHandler.transfer(client); } catch (final Exception ex) { + markSchedulingDelayIfHandshakeFailed(client); logOnClientException(client, ex); pipeTransferTabletReqHandler.onError(ex); } @@ -454,8 +463,10 @@ private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler) AsyncPipeDataTransferServiceClient client = null; try { client = transferTsFileClientManager.borrowClient(); + markHandshakeSucceeded(); pipeTransferTsFileHandler.transfer(transferTsFileClientManager, client); } catch (final Exception ex) { + markSchedulingDelayIfHandshakeFailed(client); logOnClientException(client, ex); pipeTransferTsFileHandler.onError(ex); } finally { @@ -555,6 +566,38 @@ private void logOnClientException( } } + private void markHandshakeSucceeded() { + consecutiveHandshakeFailureCount = 0; + } + + private void markSchedulingDelayIfHandshakeFailed( + final AsyncPipeDataTransferServiceClient client) { + if (client != null) { + return; + } + + if (++consecutiveHandshakeFailureCount < getSchedulingDelayFailureThreshold()) { + return; + } + + schedulingDelayMs.accumulateAndGet( + PipeConfig.getInstance().getPipeSinkRetryIntervalMs(), Math::max); + } + + private int getSchedulingDelayFailureThreshold() { + return Math.max(1, nodeUrls.size() << 1); + } + + @Override + public long peekSchedulingDelayMs() { + return schedulingDelayMs.get(); + } + + @Override + public long consumeSchedulingDelayMs() { + return schedulingDelayMs.getAndSet(0); + } + /** * Transfer queued {@link Event}s which are waiting for retry. * diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java index 4782988868a75..7fe169f7a38fe 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +38,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -54,6 +56,7 @@ public abstract class PipeSubtaskExecutor { protected final WrappedThreadPoolExecutor underlyingThreadPool; protected final ListeningExecutorService subtaskWorkerThreadPoolExecutor; + protected final ListeningScheduledExecutorService subtaskWorkerScheduledExecutor; private final Map registeredIdSubtaskMapper; @@ -90,6 +93,9 @@ protected PipeSubtaskExecutor( underlyingThreadPool.disableErrorLog(); } subtaskWorkerThreadPoolExecutor = MoreExecutors.listeningDecorator(underlyingThreadPool); + final ScheduledExecutorService underlyingScheduledExecutor = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(workingThreadName + "-Scheduler"); + subtaskWorkerScheduledExecutor = MoreExecutors.listeningDecorator(underlyingScheduledExecutor); subtaskCallbackListeningExecutor = Objects.nonNull(callbackThreadName) ? IoTDBThreadPoolFactory.newSingleThreadExecutor( @@ -112,7 +118,10 @@ public final synchronized void register(final PipeSubtask subtask) { registeredIdSubtaskMapper.put(subtask.getTaskID(), subtask); subtask.bindExecutors( - subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor, schedulerSupplier(this)); + subtaskWorkerThreadPoolExecutor, + subtaskWorkerScheduledExecutor, + subtaskCallbackListeningExecutor, + schedulerSupplier(this)); } private static String getSafeSubtaskStr(final String subtaskID) { @@ -191,6 +200,7 @@ public final synchronized void shutdown() { } subtaskWorkerThreadPoolExecutor.shutdown(); + subtaskWorkerScheduledExecutor.shutdown(); if (subtaskCallbackListeningExecutor != globalSubtaskCallbackListeningExecutor) { subtaskCallbackListeningExecutor.shutdown(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java index 9dbf5af2d0ce2..5078d14f5a516 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java @@ -36,11 +36,13 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask { @@ -72,9 +74,11 @@ protected PipeAbstractSinkSubtask( @Override public void bindExecutors( final ListeningExecutorService subtaskWorkerThreadPoolExecutor, + final ListeningScheduledExecutorService subtaskWorkerScheduledExecutor, final ExecutorService subtaskCallbackListeningExecutor, final PipeSubtaskScheduler subtaskScheduler) { this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor; + this.subtaskWorkerScheduledExecutor = subtaskWorkerScheduledExecutor; this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor; this.subtaskScheduler = subtaskScheduler; } @@ -230,9 +234,47 @@ public synchronized void submitSelf() { return; } + final long schedulingDelayInMs = getNextSchedulingDelayInMs(); + if (schedulingDelayInMs > 0) { + isSubmitted = true; + subtaskWorkerScheduledExecutor.schedule( + // Keep the isSubmitted placeholder set before the delayed submission to avoid duplicate + // schedules, so the delayed task should not mark it again. + () -> submitSelfToWorker(false), schedulingDelayInMs, TimeUnit.MILLISECONDS); + return; + } + + submitSelfToWorker(true); + } + + @Override + protected boolean shouldStopSubmittingSelfInCurrentCall() { + return peekSchedulingDelayInMs() > 0; + } + + private synchronized void submitSelfToWorker(final boolean shouldMarkSubmitted) { + if (shouldStopSubmittingSelf.get()) { + isSubmitted = false; + return; + } + final ListenableFuture nextFuture = subtaskWorkerThreadPoolExecutor.submit(this); registerCallbackHookAfterSubmit(nextFuture); - isSubmitted = true; + if (shouldMarkSubmitted) { + isSubmitted = true; + } + } + + private long getNextSchedulingDelayInMs() { + return consumeSchedulingDelayInMs(); + } + + protected long peekSchedulingDelayInMs() { + return 0; + } + + protected long consumeSchedulingDelayInMs() { + return 0; } protected void registerCallbackHookAfterSubmit(final ListenableFuture future) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java index 29f651a1ac943..a8c78490a790e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +47,7 @@ public abstract class PipeSubtask // For thread pool to execute subtasks protected ListeningExecutorService subtaskWorkerThreadPoolExecutor; + protected ListeningScheduledExecutorService subtaskWorkerScheduledExecutor; // For controlling the subtask execution protected final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true); @@ -65,6 +67,7 @@ protected PipeSubtask(final String taskID, final long creationTime) { public abstract void bindExecutors( ListeningExecutorService subtaskWorkerThreadPoolExecutor, + ListeningScheduledExecutorService subtaskWorkerScheduledExecutor, ExecutorService subtaskCallbackListeningExecutor, PipeSubtaskScheduler subtaskScheduler); @@ -81,6 +84,10 @@ public Boolean call() throws Exception { break; } hasAtLeastOneEventProcessed = true; + // Stop the current call early if the subtask asks to delay its next submission. + if (shouldStopSubmittingSelfInCurrentCall()) { + break; + } } } finally { // Reset the scheduler to make sure that the scheduler can schedule again @@ -105,6 +112,10 @@ protected synchronized void setLastEvent(final Event event) { @SuppressWarnings("squid:S112") // Allow to throw Exception protected abstract boolean executeOnce() throws Exception; + protected boolean shouldStopSubmittingSelfInCurrentCall() { + return false; + } + @Override public synchronized void onSuccess(final Boolean hasAtLeastOneEventProcessed) { final int totalRetryCount = retryCount.getAndSet(0); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeSinkWithSchedulingDelay.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeSinkWithSchedulingDelay.java new file mode 100644 index 0000000000000..943922798283d --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeSinkWithSchedulingDelay.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.sink.protocol; + +public interface PipeSinkWithSchedulingDelay { + + long peekSchedulingDelayMs(); + + long consumeSchedulingDelayMs(); +}