Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ public final class DataNodePipeMessages {
public static final String PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED =
"Pipe sink subtasks with attributes {} is bounded with sinkExecutor {} and "
+ "callbackExecutor {}.";
public static final String PIPE_SINK_SUBTASK_DELAYED_TO_AVOID_FREQUENT_HANDSHAKES =
"Pipe sink subtask {} is delayed for {} ms before polling events to avoid frequent "
+ "handshakes after client borrow failures.";
public static final String PIPE_SKIPPING_TEMPORARY_TSFILE_WHICH_SHOULDN_T =
"Pipe skipping temporary TsFile which shouldn't be transferred: {}";
public static final String PULLED_PIPE_META_FROM_CONFIG_NODE_RECOVERING =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ public final class DataNodePipeMessages {
public static final String PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED =
"Pipe sink subtasks with attributes {} is bounded with sinkExecutor {} and "
+ "callbackExecutor {}.";
public static final String PIPE_SINK_SUBTASK_DELAYED_TO_AVOID_FREQUENT_HANDSHAKES =
"Pipe sink 子任务 {} 在拉取事件前延迟 {} ms,以避免客户端借用失败后频繁握手。";
public static final String PIPE_SKIPPING_TEMPORARY_TSFILE_WHICH_SHOULDN_T =
"Pipe 跳过不应传输的临时 TsFile:{}";
public static final String PULLED_PIPE_META_FROM_CONFIG_NODE_RECOVERING =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iotdb.pipe.api.exception.PipeException;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -104,6 +105,7 @@ public PipeProcessorSubtask(
@Override
public void bindExecutors(
final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
final ListeningScheduledExecutorService ignoredScheduledExecutor,
final ExecutorService ignored,
final PipeSubtaskScheduler subtaskScheduler) {
this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
import org.apache.iotdb.commons.pipe.sink.protocol.PipeSinkWithSchedulingDelay;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
Expand Down Expand Up @@ -162,6 +163,37 @@ protected boolean executeOnce() {
return true;
}

@Override
protected long peekSchedulingDelayInMs() {
if (!(outputPipeSink instanceof PipeSinkWithSchedulingDelay)) {
return 0;
}

return ((PipeSinkWithSchedulingDelay) outputPipeSink).peekSchedulingDelayMs();
}

@Override
protected long consumeSchedulingDelayInMs() {
if (!(outputPipeSink instanceof PipeSinkWithSchedulingDelay)) {
return 0;
}

final long remainingSchedulingDelayMs =
((PipeSinkWithSchedulingDelay) outputPipeSink).consumeSchedulingDelayMs();
if (remainingSchedulingDelayMs <= 0) {
return 0;
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
DataNodePipeMessages.PIPE_SINK_SUBTASK_DELAYED_TO_AVOID_FREQUENT_HANDSHAKES,
getDisplayTaskID(),
remainingSchedulingDelayMs);
}

return remainingSchedulingDelayMs;
}

private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
// DO NOT call heartbeat or transfer after closed, or will cause connection leak
if (isClosed.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
import org.apache.iotdb.commons.pipe.sink.protocol.PipeSinkWithSchedulingDelay;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
Expand Down Expand Up @@ -102,7 +103,7 @@

@TreeModel
@TableModel
public class IoTDBDataRegionAsyncSink extends IoTDBSink {
public class IoTDBDataRegionAsyncSink extends IoTDBSink implements PipeSinkWithSchedulingDelay {

private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionAsyncSink.class);

Expand Down Expand Up @@ -130,6 +131,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {

// use these variables to prevent reference count leaks under some corner cases when closing
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private int consecutiveHandshakeFailureCount = 0;
private final AtomicLong schedulingDelayMs = new AtomicLong(0);
private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers =
new ConcurrentHashMap<>();

Expand Down Expand Up @@ -352,8 +355,10 @@ private void transfer(
AsyncPipeDataTransferServiceClient client = null;
try {
client = clientManager.borrowClient(endPoint);
markHandshakeSucceeded();
pipeTransferTabletBatchEventHandler.transfer(client);
} catch (final Exception ex) {
markSchedulingDelayIfHandshakeFailed(client);
logOnClientException(client, ex);
pipeTransferTabletBatchEventHandler.onError(ex);
}
Expand All @@ -365,8 +370,10 @@ private void transfer(
AsyncPipeDataTransferServiceClient client = null;
try {
client = clientManager.borrowClient(deviceId);
markHandshakeSucceeded();
pipeTransferInsertNodeReqHandler.transfer(client);
} catch (final Exception ex) {
markSchedulingDelayIfHandshakeFailed(client);
logOnClientException(client, ex);
pipeTransferInsertNodeReqHandler.onError(ex);
}
Expand All @@ -377,8 +384,10 @@ private void transfer(
AsyncPipeDataTransferServiceClient client = null;
try {
client = clientManager.borrowClient(deviceId);
markHandshakeSucceeded();
pipeTransferTabletReqHandler.transfer(client);
} catch (final Exception ex) {
markSchedulingDelayIfHandshakeFailed(client);
logOnClientException(client, ex);
pipeTransferTabletReqHandler.onError(ex);
}
Expand Down Expand Up @@ -454,8 +463,10 @@ private void transfer(final PipeTransferTsFileHandler pipeTransferTsFileHandler)
AsyncPipeDataTransferServiceClient client = null;
try {
client = transferTsFileClientManager.borrowClient();
markHandshakeSucceeded();
pipeTransferTsFileHandler.transfer(transferTsFileClientManager, client);
} catch (final Exception ex) {
markSchedulingDelayIfHandshakeFailed(client);
logOnClientException(client, ex);
pipeTransferTsFileHandler.onError(ex);
} finally {
Expand Down Expand Up @@ -555,6 +566,38 @@ private void logOnClientException(
}
}

private void markHandshakeSucceeded() {
consecutiveHandshakeFailureCount = 0;
}

private void markSchedulingDelayIfHandshakeFailed(
final AsyncPipeDataTransferServiceClient client) {
if (client != null) {
return;
}

if (++consecutiveHandshakeFailureCount < getSchedulingDelayFailureThreshold()) {
return;
}

schedulingDelayMs.accumulateAndGet(
PipeConfig.getInstance().getPipeSinkRetryIntervalMs(), Math::max);
}

private int getSchedulingDelayFailureThreshold() {
return Math.max(1, nodeUrls.size() << 1);
}

@Override
public long peekSchedulingDelayMs() {
return schedulingDelayMs.get();
}

@Override
public long consumeSchedulingDelayMs() {
return schedulingDelayMs.getAndSet(0);
}

/**
* Transfer queued {@link Event}s which are waiting for retry.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.utils.TestOnly;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,6 +38,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -54,6 +56,7 @@ public abstract class PipeSubtaskExecutor {

protected final WrappedThreadPoolExecutor underlyingThreadPool;
protected final ListeningExecutorService subtaskWorkerThreadPoolExecutor;
protected final ListeningScheduledExecutorService subtaskWorkerScheduledExecutor;

private final Map<String, PipeSubtask> registeredIdSubtaskMapper;

Expand Down Expand Up @@ -90,6 +93,9 @@ protected PipeSubtaskExecutor(
underlyingThreadPool.disableErrorLog();
}
subtaskWorkerThreadPoolExecutor = MoreExecutors.listeningDecorator(underlyingThreadPool);
final ScheduledExecutorService underlyingScheduledExecutor =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(workingThreadName + "-Scheduler");
subtaskWorkerScheduledExecutor = MoreExecutors.listeningDecorator(underlyingScheduledExecutor);
subtaskCallbackListeningExecutor =
Objects.nonNull(callbackThreadName)
? IoTDBThreadPoolFactory.newSingleThreadExecutor(
Expand All @@ -112,7 +118,10 @@ public final synchronized void register(final PipeSubtask subtask) {

registeredIdSubtaskMapper.put(subtask.getTaskID(), subtask);
subtask.bindExecutors(
subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor, schedulerSupplier(this));
subtaskWorkerThreadPoolExecutor,
subtaskWorkerScheduledExecutor,
subtaskCallbackListeningExecutor,
schedulerSupplier(this));
}

private static String getSafeSubtaskStr(final String subtaskID) {
Expand Down Expand Up @@ -191,6 +200,7 @@ public final synchronized void shutdown() {
}

subtaskWorkerThreadPoolExecutor.shutdown();
subtaskWorkerScheduledExecutor.shutdown();
if (subtaskCallbackListeningExecutor != globalSubtaskCallbackListeningExecutor) {
subtaskCallbackListeningExecutor.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask {

Expand Down Expand Up @@ -72,9 +74,11 @@ protected PipeAbstractSinkSubtask(
@Override
public void bindExecutors(
final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
final ListeningScheduledExecutorService subtaskWorkerScheduledExecutor,
final ExecutorService subtaskCallbackListeningExecutor,
final PipeSubtaskScheduler subtaskScheduler) {
this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
this.subtaskWorkerScheduledExecutor = subtaskWorkerScheduledExecutor;
this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
this.subtaskScheduler = subtaskScheduler;
}
Expand Down Expand Up @@ -230,9 +234,47 @@ public synchronized void submitSelf() {
return;
}

final long schedulingDelayInMs = getNextSchedulingDelayInMs();
if (schedulingDelayInMs > 0) {
isSubmitted = true;
subtaskWorkerScheduledExecutor.schedule(
// Keep the isSubmitted placeholder set before the delayed submission to avoid duplicate
// schedules, so the delayed task should not mark it again.
() -> submitSelfToWorker(false), schedulingDelayInMs, TimeUnit.MILLISECONDS);
return;
}

submitSelfToWorker(true);
}

@Override
protected boolean shouldStopSubmittingSelfInCurrentCall() {
return peekSchedulingDelayInMs() > 0;
}

private synchronized void submitSelfToWorker(final boolean shouldMarkSubmitted) {
if (shouldStopSubmittingSelf.get()) {
isSubmitted = false;
return;
}

final ListenableFuture<Boolean> nextFuture = subtaskWorkerThreadPoolExecutor.submit(this);
registerCallbackHookAfterSubmit(nextFuture);
isSubmitted = true;
if (shouldMarkSubmitted) {
isSubmitted = true;
}
}

private long getNextSchedulingDelayInMs() {
return consumeSchedulingDelayInMs();
}

protected long peekSchedulingDelayInMs() {
return 0;
}

protected long consumeSchedulingDelayInMs() {
return 0;
}

protected void registerCallbackHookAfterSubmit(final ListenableFuture<Boolean> future) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,6 +47,7 @@ public abstract class PipeSubtask

// For thread pool to execute subtasks
protected ListeningExecutorService subtaskWorkerThreadPoolExecutor;
protected ListeningScheduledExecutorService subtaskWorkerScheduledExecutor;

// For controlling the subtask execution
protected final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true);
Expand All @@ -65,6 +67,7 @@ protected PipeSubtask(final String taskID, final long creationTime) {

public abstract void bindExecutors(
ListeningExecutorService subtaskWorkerThreadPoolExecutor,
ListeningScheduledExecutorService subtaskWorkerScheduledExecutor,
ExecutorService subtaskCallbackListeningExecutor,
PipeSubtaskScheduler subtaskScheduler);

Expand All @@ -81,6 +84,10 @@ public Boolean call() throws Exception {
break;
}
hasAtLeastOneEventProcessed = true;
// Stop the current call early if the subtask asks to delay its next submission.
if (shouldStopSubmittingSelfInCurrentCall()) {
break;
}
}
} finally {
// Reset the scheduler to make sure that the scheduler can schedule again
Expand All @@ -105,6 +112,10 @@ protected synchronized void setLastEvent(final Event event) {
@SuppressWarnings("squid:S112") // Allow to throw Exception
protected abstract boolean executeOnce() throws Exception;

protected boolean shouldStopSubmittingSelfInCurrentCall() {
return false;
}

@Override
public synchronized void onSuccess(final Boolean hasAtLeastOneEventProcessed) {
final int totalRetryCount = retryCount.getAndSet(0);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.commons.pipe.sink.protocol;

public interface PipeSinkWithSchedulingDelay {

long peekSchedulingDelayMs();

long consumeSchedulingDelayMs();
}
Loading