diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index 0a090f99b8df6..abd28ffdb925e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -273,8 +273,11 @@ private void openClientSession() throws TException { if (openSessionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { final String errorMsg = String.format( - "Failed to login to receiver %s:%s for legacy pipe transfer because %s", - ipAddress, port, openSessionResp.getStatus().getMessage()); + "Failed to login to receiver %s:%s for legacy pipe transfer because code: %s, message: %s", + ipAddress, + port, + openSessionResp.getStatus().getCode(), + openSessionResp.getStatus().getMessage()); LOGGER.warn(errorMsg); throw new PipeRuntimeCriticalException(errorMsg); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java index fc02d25328044..071b5897920ca 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java @@ -186,7 +186,7 @@ private boolean onPipeConnectionException(final Throwable throwable) { MAX_RETRY_TIMES, e); try { - sleepIfNoHighPriorityTask(retry * PipeConfig.getInstance().getPipeSinkRetryIntervalMs()); + sleepIfNoHighPriorityTask(getHandshakeRetrySleepInterval(e, retry)); } catch (final InterruptedException interruptedException) { LOGGER.info( "Interrupted while sleeping, will retry to handshake with the target system.", @@ -231,6 +231,13 @@ private boolean onPipeConnectionException(final Throwable throwable) { return false; } + private long getHandshakeRetrySleepInterval(final Throwable throwable, final int retry) { + final long defaultInterval = retry * PipeConfig.getInstance().getPipeSinkRetryIntervalMs(); + return isAuthenticationFailure(throwable) + ? Math.max(defaultInterval, AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS) + : defaultInterval; + } + /** * Submit a {@link PipeSubtask} to the executor to keep it running. Note that the function will be * called when connector starts or the subTask finishes the last round, Thus the {@link diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java index 251acfb7b1289..ce0bfa9a245ba 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java @@ -26,15 +26,35 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; public abstract class PipeReportableSubtask extends PipeSubtask { private static final Logger LOGGER = LoggerFactory.getLogger(PipeReportableSubtask.class); + private static final long DEFAULT_LOGIN_LOCK_WINDOW_MS = TimeUnit.MINUTES.toMillis(10); + private static final int DEFAULT_LOGIN_LOCK_FAILED_ATTEMPTS = 5; + private static final int AUTHENTICATION_FAILURE_IMMEDIATE_ATTEMPTS = 2; + // Keep the literal for compatibility because dev/1.3 does not define this status enum. + private static final int USER_LOGIN_LOCKED_STATUS_CODE = 822; + protected static final long AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS = + DEFAULT_LOGIN_LOCK_WINDOW_MS + / (DEFAULT_LOGIN_LOCK_FAILED_ATTEMPTS - AUTHENTICATION_FAILURE_IMMEDIATE_ATTEMPTS) + + TimeUnit.SECONDS.toMillis(1); + private static final Pattern AUTHENTICATION_FAILURE_STATUS_CODE_PATTERN = + Pattern.compile( + String.format( + "(?i)(?:\\b(?:code|status code)\\s*[:=]\\s*(?:%d|%d)\\b|\\b(?:%d|%d):|\\b(?:WRONG_LOGIN_PASSWORD|USER_LOGIN_LOCKED)\\b)", + TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode(), + USER_LOGIN_LOCKED_STATUS_CODE, + TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode(), + USER_LOGIN_LOCKED_STATUS_CODE)); // To ensure that high-priority tasks can obtain object locks first, a counter is now used to save // the number of high-priority tasks. protected final AtomicLong highPriorityLockTaskCount = new AtomicLong(0); @@ -64,6 +84,26 @@ public synchronized void onFailure(final Throwable throwable) { // is dropped or the process is running normally. } + protected long getSleepIntervalBasedOnThrowable(final Throwable throwable) { + long sleepInterval = retryCount.get() * PipeConfig.getInstance().getPipeSinkRetryIntervalMs(); + if (isAuthenticationFailure(throwable)) { + sleepInterval = Math.max(sleepInterval, AUTHENTICATION_FAILURE_RETRY_INTERVAL_MS); + } + return sleepInterval; + } + + protected static boolean isAuthenticationFailure(final Throwable throwable) { + Throwable current = throwable; + while (current != null) { + final String message = current.getMessage(); + if (message != null && AUTHENTICATION_FAILURE_STATUS_CODE_PATTERN.matcher(message).find()) { + return true; + } + current = current.getCause(); + } + return false; + } + private void onReportEventFailure(final Throwable throwable) { final int maxRetryTimes = throwable instanceof PipeRuntimeSinkRetryTimesConfigurableException @@ -94,8 +134,7 @@ private void onReportEventFailure(final Throwable throwable) { maxRetryTimes, throwable.getMessage()); try { - sleepIfNoHighPriorityTask( - retryCount.get() * PipeConfig.getInstance().getPipeSinkRetryIntervalMs()); + sleepIfNoHighPriorityTask(getSleepIntervalBasedOnThrowable(throwable)); } catch (final InterruptedException e) { LOGGER.warn( "Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})", @@ -163,8 +202,7 @@ private void onNonReportEventFailure(final Throwable throwable) { throwable.getMessage(), throwable); try { - sleepIfNoHighPriorityTask( - retryCount.get() * PipeConfig.getInstance().getPipeSinkRetryIntervalMs()); + sleepIfNoHighPriorityTask(getSleepIntervalBasedOnThrowable(throwable)); } catch (final InterruptedException e) { LOGGER.warn( "Interrupted when retrying to execute subtask {} (creation time: {}, simple class: {})", @@ -191,10 +229,13 @@ protected void preScheduleLowPriorityTask(int maxRetries) { } protected void sleepIfNoHighPriorityTask(long sleepMillis) throws InterruptedException { + if (sleepMillis <= 0) { + return; + } synchronized (highPriorityLockTaskCount) { // The wait operation will release the highPriorityLockTaskCount lock, so there will be // no deadlock. - if (highPriorityLockTaskCount.get() > 0) { + if (highPriorityLockTaskCount.get() == 0) { highPriorityLockTaskCount.wait(sleepMillis); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java index eb25c7553db16..9ce1c3657a72b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java @@ -259,7 +259,11 @@ public void sendHandshakeReq(final Pair clientAndStatu client.getIpAddress(), client.getPort(), resp.getStatus()); - endPoint2HandshakeErrorMessage.put(client.getEndPoint(), resp.getStatus().getMessage()); + endPoint2HandshakeErrorMessage.put( + client.getEndPoint(), + String.format( + "code: %d, message: %s", + resp.getStatus().getCode(), resp.getStatus().getMessage())); } else { clientAndStatus.setRight(true); client.setTimeout(CONNECTION_TIMEOUT_MS.get()); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java index 07ae50e992e6b..a9b72391b2b75 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeSleepIntervalTest.java @@ -25,13 +25,48 @@ import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.concurrent.TimeUnit; + public class PipeSleepIntervalTest { + private static class TestSinkSubtask extends PipeAbstractSinkSubtask { + + TestSinkSubtask() { + super(null, 0, null); + } + + @Override + protected String getRootCause(Throwable throwable) { + return null; + } + + @Override + protected void report(EnrichedEvent event, PipeRuntimeException exception) {} + + @Override + protected boolean executeOnce() { + return false; + } + + long getSleepInterval(final Throwable throwable) { + return getSleepIntervalBasedOnThrowable(throwable); + } + + boolean isAuthenticationFailureException(final Throwable throwable) { + return isAuthenticationFailure(throwable); + } + + void sleepWithoutHighPriorityTask(final long sleepMillis) throws InterruptedException { + sleepIfNoHighPriorityTask(sleepMillis); + } + } + private long oldPipeSinkSubtaskSleepIntervalInitMs; private long oldPipeSinkSubtaskSleepIntervalMaxMs; @@ -53,21 +88,7 @@ public void tearDown() throws Exception { @Test public void test() { - try (final PipeAbstractSinkSubtask subtask = - new PipeAbstractSinkSubtask(null, 0, null) { - @Override - protected String getRootCause(Throwable throwable) { - return null; - } - - @Override - protected void report(EnrichedEvent event, PipeRuntimeException exception) {} - - @Override - protected boolean executeOnce() { - return false; - } - }) { + try (final TestSinkSubtask subtask = new TestSinkSubtask()) { long startTime = System.currentTimeMillis(); subtask.sleep4NonReportException(); Assert.assertTrue( @@ -80,4 +101,39 @@ protected boolean executeOnce() { >= PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs()); } } + + @Test + public void testAuthenticationFailureRetryInterval() { + try (final TestSinkSubtask subtask = new TestSinkSubtask()) { + Assert.assertTrue( + subtask.isAuthenticationFailureException( + new PipeConnectionException( + "Handshake error with receiver 127.0.0.1:6667, code: 801, message: Authentication failed."))); + Assert.assertTrue( + subtask.isAuthenticationFailureException( + new PipeConnectionException("801: Failed to check password for pipe a2b."))); + Assert.assertTrue( + subtask.isAuthenticationFailureException( + new PipeConnectionException("status code: 822, message: Account is blocked."))); + Assert.assertFalse( + subtask.isAuthenticationFailureException( + new PipeConnectionException("Network error 801 bytes sent."))); + + final long authenticationFailureRetryInterval = + subtask.getSleepInterval(new PipeConnectionException("code: 801")); + Assert.assertTrue(authenticationFailureRetryInterval > TimeUnit.MINUTES.toMillis(3)); + Assert.assertTrue(authenticationFailureRetryInterval * 3 > TimeUnit.MINUTES.toMillis(10)); + Assert.assertTrue( + subtask.getSleepInterval(new PipeConnectionException("network error")) <= 10000); + } + } + + @Test + public void testSleepIfNoHighPriorityTaskWaits() throws Exception { + try (final TestSinkSubtask subtask = new TestSinkSubtask()) { + final long startTime = System.currentTimeMillis(); + subtask.sleepWithoutHighPriorityTask(20L); + Assert.assertTrue(System.currentTimeMillis() - startTime >= 15L); + } + } }