diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java index 66778f258935f..6f52cd33ab0db 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java @@ -19,7 +19,6 @@ package org.apache.iotdb.confignode.manager.pipe.agent.task; -import org.apache.iotdb.commons.exception.pipe.PipeNonReportException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; @@ -41,7 +40,6 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.exception.PipeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -192,30 +190,8 @@ protected boolean executeOnce() throws Exception { } decreaseReferenceCountAndReleaseLastEvent(event, true); sleepInterval = PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs(); - } catch (final PipeNonReportException e) { - sleep4NonReportException(); - } catch (final PipeException e) { - setLastExceptionEvent(event); - if (!isClosed.get()) { - throw e; - } else { - LOGGER.info( - "{} in pipe transfer, ignored because pipe is dropped.", - e.getClass().getSimpleName(), - e); - clearReferenceCountAndReleaseLastEvent(event); - } } catch (final Exception e) { - setLastExceptionEvent(event); - if (!isClosed.get()) { - throw new PipeException( - String.format( - "Exception in pipe transfer, subtask: %s, last event: %s", taskID, lastEvent), - e); - } else { - LOGGER.info("Exception in pipe transfer, ignored because pipe is dropped.", e); - clearReferenceCountAndReleaseLastEvent(event); - } + handleException(event, e); } return true; @@ -260,6 +236,7 @@ protected String getRootCause(final Throwable throwable) { @Override protected void report(final EnrichedEvent event, final PipeRuntimeException exception) { + lastExceptionTime = Long.MAX_VALUE; PipeConfigNodeAgent.runtime().report(event, exception); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index cabf17b7f4f8a..64d88d22683b5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -781,7 +781,8 @@ private long calculateTsFileParserMemory( // If the source is not history, we do not need to allocate memory boolean isExtractorHistory = sourceParameters.getBooleanOrDefault( - SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, SystemConstant.RESTART_DEFAULT_VALUE) + SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, + SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE) || sourceParameters.getBooleanOrDefault( Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE); @@ -865,7 +866,8 @@ private long calculateSendTsFileReadBufferMemory( // If the source is history enable, we need to transfer tsfile boolean needTransferTsFile = sourceParameters.getBooleanOrDefault( - SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, SystemConstant.RESTART_DEFAULT_VALUE) + SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, + SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE) || sourceParameters.getBooleanOrDefault( Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index ac68fe6ca93ca..ca5a8d0f4dba7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector; import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; @@ -36,7 +37,6 @@ import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics; import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor; import org.apache.iotdb.db.storageengine.StorageEngine; -import org.apache.iotdb.db.utils.ErrorHandlingUtils; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -210,7 +210,7 @@ protected boolean executeOnce() throws Exception { lastEvent instanceof EnrichedEvent ? ((EnrichedEvent) lastEvent).coreReportMessage() : lastEvent, - ErrorHandlingUtils.getRootCause(e).getMessage()), + ErrorHandlingCommonUtils.getRootCause(e).getMessage()), e); } else { LOGGER.info( @@ -247,7 +247,7 @@ public void close() { LOGGER.info( "Exception occurred when closing pipe processor subtask {}, root cause: {}", taskID, - ErrorHandlingUtils.getRootCause(e).getMessage(), + ErrorHandlingCommonUtils.getRootCause(e).getMessage(), e); } finally { // should be called after pipeProcessor.close() @@ -291,7 +291,7 @@ public int getRegionId() { @Override protected String getRootCause(final Throwable throwable) { - return ErrorHandlingUtils.getRootCause(throwable).getMessage(); + return ErrorHandlingCommonUtils.getRootCause(throwable).getMessage(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 73681d6f54925..4b4794891f6cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -19,13 +19,13 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.sink; -import org.apache.iotdb.commons.exception.pipe.PipeNonReportException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; +import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; @@ -35,14 +35,12 @@ import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; -import org.apache.iotdb.db.utils.ErrorHandlingUtils; import org.apache.iotdb.metrics.type.Histogram; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; -import org.apache.iotdb.pipe.api.exception.PipeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,37 +132,8 @@ protected boolean executeOnce() { decreaseReferenceCountAndReleaseLastEvent(event, true); sleepInterval = PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs(); - } catch (final PipeNonReportException e) { - sleep4NonReportException(); - } catch (final PipeException e) { - if (!isClosed.get()) { - setLastExceptionEvent(event); - throw e; - } else { - LOGGER.info( - "{} in pipe transfer, ignored because the connector subtask is dropped.{}", - e.getClass().getSimpleName(), - e.getMessage() != null ? " Message: " + e.getMessage() : ""); - clearReferenceCountAndReleaseLastEvent(event); - } } catch (final Exception e) { - if (!isClosed.get()) { - setLastExceptionEvent(event); - throw new PipeException( - String.format( - "Exception in pipe transfer, subtask: %s, last event: %s, root cause: %s", - taskID, - event instanceof EnrichedEvent - ? ((EnrichedEvent) event).coreReportMessage() - : event, - ErrorHandlingUtils.getRootCause(e).getMessage()), - e); - } else { - LOGGER.info( - "Exception in pipe transfer, ignored because the sink subtask is dropped.{}", - e.getMessage() != null ? " Message: " + e.getMessage() : ""); - clearReferenceCountAndReleaseLastEvent(event); - } + handleException(event, e); } return true; @@ -216,7 +185,7 @@ public void close() { LOGGER.info( "Exception occurred when closing pipe connector subtask {}, root cause: {}", taskID, - ErrorHandlingUtils.getRootCause(e).getMessage(), + ErrorHandlingCommonUtils.getRootCause(e).getMessage(), e); } finally { inputPendingQueue.discardAllEvents(); @@ -377,11 +346,12 @@ public void setEventSizeHistogram(Histogram eventSizeHistogram) { @Override protected String getRootCause(final Throwable throwable) { - return ErrorHandlingUtils.getRootCause(throwable).getMessage(); + return ErrorHandlingCommonUtils.getRootCause(throwable).getMessage(); } @Override protected void report(final EnrichedEvent event, final PipeRuntimeException exception) { + lastExceptionTime = Long.MAX_VALUE; PipeDataNodeAgent.runtime().report(event, exception); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java index 91f5be62f1153..282be05c25985 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java @@ -212,7 +212,8 @@ public void validate(final PipeParameterValidator validator) { // enabling the historical data extraction, which may affect the realtime data extraction. isHistoricalSourceEnabled = parameters.getBooleanOrDefault( - SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, SystemConstant.RESTART_DEFAULT_VALUE) + SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, + SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE) || parameters.getBooleanOrDefault( Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java index c7c16f20baf42..146359bc215ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java @@ -33,6 +33,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause; import static org.apache.iotdb.db.queryengine.execution.QueryState.ABORTED; import static org.apache.iotdb.db.queryengine.execution.QueryState.CANCELED; import static org.apache.iotdb.db.queryengine.execution.QueryState.DISPATCHING; @@ -42,7 +43,6 @@ import static org.apache.iotdb.db.queryengine.execution.QueryState.PLANNED; import static org.apache.iotdb.db.queryengine.execution.QueryState.QUEUED; import static org.apache.iotdb.db.queryengine.execution.QueryState.RUNNING; -import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause; /** * State machine for a {@link QueryExecution}. It stores the states for the {@link QueryExecution}. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java index cc00f094cfc62..7772c4ceaeeba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.IConsensus; @@ -33,7 +34,6 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion; -import org.apache.iotdb.db.utils.ErrorHandlingUtils; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.rpc.TSStatusCode; @@ -116,7 +116,7 @@ public RegionExecutionResult execute( RegionExecutionResult resp = RegionExecutionResult.create( false, String.format(ERROR_MSG_FORMAT, e.getMessage()), null); - Throwable t = ErrorHandlingUtils.getRootCause(e); + Throwable t = ErrorHandlingCommonUtils.getRootCause(e); if (t instanceof ReadException || t instanceof ReadIndexException || t instanceof NotLeaderException diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java index 381c9b9378a99..ce30549c78d6c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java @@ -19,10 +19,10 @@ package org.apache.iotdb.db.queryengine.execution.schedule; +import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException; import org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue; import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask; -import org.apache.iotdb.db.utils.ErrorHandlingUtils; import org.apache.iotdb.db.utils.SetThreadName; import org.slf4j.Logger; @@ -117,7 +117,7 @@ public void close() throws IOException { } private String getAbortCause(final Exception e) { - Throwable rootCause = ErrorHandlingUtils.getRootCause(e); + Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e); if (rootCause instanceof MemoryNotEnoughException) { return DriverTaskAbortedException.BY_MEMORY_NOT_ENOUGH; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java index 8225d2f60202b..b300da7b26e95 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.statement.crud; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation; @@ -163,9 +164,12 @@ protected long calculateBytesUsed() { @Override public String toString() { + final int size = CommonDescriptor.getInstance().getConfig().getPathLogMaxSize(); return "InsertMultiTabletsStatement{" + "insertTabletStatementList=" - + insertTabletStatementList + + (Objects.nonNull(insertTabletStatementList) && insertTabletStatementList.size() > size + ? "(Partial) " + insertTabletStatementList.subList(0, size) + : insertTabletStatementList) + '}'; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 987eaaf10198f..08e8325c78702 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.exception.QueryInBatchStatementException; import org.apache.iotdb.db.exception.StorageGroupNotReadyException; @@ -69,7 +70,7 @@ public static TSStatus onNpeOrUnexpectedException( LOGGER.warn(ERROR_OPERATION_LOG, statusCode, operation, e); } if (e instanceof SemanticException) { - Throwable rootCause = getRootCause(e); + Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e); if (e.getCause() instanceof IoTDBException) { return RpcUtils.getStatus( ((IoTDBException) e.getCause()).getErrorCode(), rootCause.getMessage()); @@ -86,13 +87,6 @@ public static TSStatus onNpeOrUnexpectedException( return onNpeOrUnexpectedException(e, operation.getName(), statusCode); } - public static Throwable getRootCause(Throwable e) { - while (e.getCause() != null) { - e = e.getCause(); - } - return e; - } - public static TSStatus onQueryException(Exception e, String operation, TSStatusCode statusCode) { TSStatus status = tryCatchQueryException(e); if (status != null) { @@ -135,7 +129,7 @@ public static TSStatus onQueryException(Exception e, OperationType operation) { } private static TSStatus tryCatchQueryException(Exception e) { - Throwable rootCause = getRootCause(e); + Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e); // ignore logging sg not ready exception if (rootCause instanceof StorageGroupNotReadyException) { return RpcUtils.getStatus(TSStatusCode.STORAGE_ENGINE_NOT_READY, rootCause.getMessage()); @@ -210,7 +204,7 @@ private static TSStatus tryCatchNonQueryException(Exception e) { LOGGER.warn(message, e); return RpcUtils.getStatus(Arrays.asList(batchException.getFailingStatus())); } else if (e instanceof IoTDBException) { - Throwable rootCause = getRootCause(e); + Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e); // ignore logging sg not ready exception if (!(rootCause instanceof StorageGroupNotReadyException)) { LOGGER.warn(message, e); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java new file mode 100644 index 0000000000000..eafa4f45a7f0d --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkNonReportTimeConfigurableException.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.exception.pipe; + +import java.util.Objects; + +public class PipeRuntimeSinkNonReportTimeConfigurableException + extends PipeRuntimeSinkCriticalException { + + private final long interval; + + public PipeRuntimeSinkNonReportTimeConfigurableException( + final String message, final long interval) { + super(message); + this.interval = interval; + } + + public long getInterval() { + return interval; + } + + // We do not record the timestamp here for logger reduction detection + @Override + public String toString() { + return "PipeRuntimeSinkNonReportTimeConfigurableException{" + + "message='" + + "', interval='" + + interval + + "'}"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final PipeRuntimeSinkNonReportTimeConfigurableException that = + (PipeRuntimeSinkNonReportTimeConfigurableException) o; + return super.equals(that) && interval == that.interval; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), interval); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java index 62b54334a9361..aa64e533528fd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeRuntimeSinkRetryTimesConfigurableException.java @@ -19,6 +19,8 @@ package org.apache.iotdb.commons.exception.pipe; +import java.util.Objects; + public class PipeRuntimeSinkRetryTimesConfigurableException extends PipeRuntimeSinkCriticalException { @@ -37,6 +39,29 @@ public int getRetryTimes() { // We do not record the timestamp here for logger reduction detection @Override public String toString() { - return "PipeRuntimeSinkRetryTimesConfigurableException{" + "message='" + getMessage() + "}"; + return "PipeRuntimeSinkRetryTimesConfigurableException{" + + "message='" + + getMessage() + + "', retryTimes='" + + retryTimes + + "'}"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final PipeRuntimeSinkRetryTimesConfigurableException that = + (PipeRuntimeSinkRetryTimesConfigurableException) o; + return super.equals(that) && retryTimes == that.retryTimes; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), retryTimes); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java index 87bb0c9594988..56e142d6efd04 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java @@ -20,12 +20,15 @@ package org.apache.iotdb.commons.pipe.agent.task.subtask; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException; import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; +import org.apache.iotdb.pipe.api.exception.PipeException; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -54,6 +57,7 @@ public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask { protected volatile Event lastExceptionEvent; protected long sleepInterval = PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs(); + protected long lastExceptionTime = Long.MAX_VALUE; protected PipeAbstractSinkSubtask( final String taskID, final long creationTime, final PipeConnector outputPipeConnector) { @@ -261,4 +265,52 @@ public void sleep4NonReportException() { Thread.currentThread().interrupt(); } } + + @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning + protected void handleException(final Event event, final Exception e) { + if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) { + if (lastExceptionTime == Long.MAX_VALUE) { + lastExceptionTime = System.currentTimeMillis(); + } + if (System.currentTimeMillis() - lastExceptionTime + < ((PipeRuntimeSinkNonReportTimeConfigurableException) e).getInterval()) { + sleep4NonReportException(); + return; + } + handlePipeException(event, (PipeException) e); + } else if (e instanceof PipeException) { + handlePipeException(event, (PipeException) e); + } else { + if (!isClosed.get()) { + setLastExceptionEvent(event); + throw new PipeException( + String.format( + "Exception in pipe transfer, subtask: %s, last event: %s, root cause: %s", + taskID, + event instanceof EnrichedEvent + ? ((EnrichedEvent) event).coreReportMessage() + : event, + ErrorHandlingCommonUtils.getRootCause(e).getMessage()), + e); + } else { + LOGGER.info( + "Exception in pipe transfer, ignored because the sink subtask is dropped.{}", + e.getMessage() != null ? " Message: " + e.getMessage() : ""); + clearReferenceCountAndReleaseLastEvent(event); + } + } + } + + protected void handlePipeException(final Event event, final PipeException e) { + if (!isClosed.get()) { + setLastExceptionEvent(event); + throw e; + } else { + LOGGER.info( + "{} in pipe transfer, ignored because the connector subtask is dropped.{}", + e.getClass().getSimpleName(), + e.getMessage() != null ? " Message: " + e.getMessage() : ""); + clearReferenceCountAndReleaseLastEvent(event); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java index 9c1bc1b521c6e..a4119979e768c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java @@ -23,7 +23,7 @@ public class SystemConstant { // This can be arbitrarily changed since it's only a memory key and not stored public static final String RESTART_OR_NEWLY_ADDED_KEY = "__system.restart_or_newly_added"; - public static final boolean RESTART_DEFAULT_VALUE = false; + public static final boolean RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE = false; private SystemConstant() { throw new IllegalStateException("Utility class"); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java index 21597702ae55a..d48a9fdbcb2d5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java @@ -20,14 +20,11 @@ package org.apache.iotdb.commons.pipe.receiver; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.exception.pipe.PipeNonReportException; -import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException; -import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.pipe.api.event.Event; -import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -50,8 +47,6 @@ public class PipeReceiverStatusHandler { private static final String UNCLASSIFIED_EXCEPTION = "Unclassified exception"; private static final String NO_PERMISSION_STR = "No permissions for this operation"; - private static final int CONFLICT_RETRY_MAX_TIMES = 100; - private final boolean isRetryAllowedWhenConflictOccurs; private final long retryMaxMillisWhenConflictOccurs; private final boolean shouldRecordIgnoredDataWhenConflictOccurs; @@ -97,7 +92,7 @@ public void handle( * exception if retry the {@link Event}. Upper class must ensure that the method is invoked only * by a single thread. * - * @throws PipeException to retry the current {@link Event} + * @throws PipeRuntimeSinkNonReportTimeConfigurableException to retry the current {@link Event} * @param status the {@link TSStatus} to judge * @param exceptionMessage The exception message to throw * @param recordMessage The message to record an ignored {@link Event}, the caller should assure @@ -128,7 +123,8 @@ public void handle( LOGGER::info, "Temporary unavailable exception: will retry forever. status: %s", status); - throw new PipeNonReportException(exceptionMessage); + throw new PipeRuntimeSinkNonReportTimeConfigurableException( + exceptionMessage, Long.MAX_VALUE); } case 1810: // PIPE_RECEIVER_USER_CONFLICT_EXCEPTION @@ -167,16 +163,12 @@ public void handle( + " seconds", status); exceptionEventHasBeenRetried.set(true); - throw status.getCode() == 1815 - && PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict() - ? new PipeNonReportException(exceptionMessage) - : new PipeRuntimeSinkRetryTimesConfigurableException( - exceptionMessage, - (int) - Math.max( - PipeSubtask.MAX_RETRY_TIMES, - Math.min( - CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenConflictOccurs * 1.1))); + throw new PipeRuntimeSinkNonReportTimeConfigurableException( + exceptionMessage, + status.getCode() == 1815 + && PipeConfig.getInstance().isPipeRetryLocallyForParallelOrUserConflict() + ? Long.MAX_VALUE + : retryMaxMillisWhenConflictOccurs); } case 803: // NO_PERMISSION @@ -252,12 +244,8 @@ private synchronized void handleOtherExceptions( } exceptionEventHasBeenRetried.set(true); - throw new PipeRuntimeSinkRetryTimesConfigurableException( - exceptionMessage, - (int) - Math.max( - PipeSubtask.MAX_RETRY_TIMES, - Math.min(CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1))); + throw new PipeRuntimeSinkNonReportTimeConfigurableException( + exceptionMessage, retryMaxMillisWhenOtherExceptionsOccur); } private static String getNoPermission(final boolean noPermission) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ErrorHandlingCommonUtils.java similarity index 70% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ErrorHandlingCommonUtils.java index 572a7764518a5..01b8b64442bb0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/pipe/PipeNonReportException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/ErrorHandlingCommonUtils.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -17,11 +17,17 @@ * under the License. */ -package org.apache.iotdb.commons.exception.pipe; +package org.apache.iotdb.commons.utils; -public class PipeNonReportException extends PipeRuntimeNonCriticalException { +public class ErrorHandlingCommonUtils { + public static Throwable getRootCause(Throwable e) { + while (e.getCause() != null) { + e = e.getCause(); + } + return e; + } - public PipeNonReportException(final String message) { - super(message); + private ErrorHandlingCommonUtils() { + // Utility class } }