Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iotdb.confignode.manager.pipe.agent.task;

import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
Expand All @@ -41,7 +40,6 @@
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -192,30 +190,8 @@ protected boolean executeOnce() throws Exception {
}
decreaseReferenceCountAndReleaseLastEvent(event, true);
sleepInterval = PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
} catch (final PipeNonReportException e) {
sleep4NonReportException();
} catch (final PipeException e) {
setLastExceptionEvent(event);
if (!isClosed.get()) {
throw e;
} else {
LOGGER.info(
"{} in pipe transfer, ignored because pipe is dropped.",
e.getClass().getSimpleName(),
e);
clearReferenceCountAndReleaseLastEvent(event);
}
} catch (final Exception e) {
setLastExceptionEvent(event);
if (!isClosed.get()) {
throw new PipeException(
String.format(
"Exception in pipe transfer, subtask: %s, last event: %s", taskID, lastEvent),
e);
} else {
LOGGER.info("Exception in pipe transfer, ignored because pipe is dropped.", e);
clearReferenceCountAndReleaseLastEvent(event);
}
handleException(event, e);
}

return true;
Expand Down Expand Up @@ -260,6 +236,7 @@ protected String getRootCause(final Throwable throwable) {

@Override
protected void report(final EnrichedEvent event, final PipeRuntimeException exception) {
lastExceptionTime = Long.MAX_VALUE;
PipeConfigNodeAgent.runtime().report(event, exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,8 @@ private long calculateTsFileParserMemory(
// If the source is not history, we do not need to allocate memory
boolean isExtractorHistory =
sourceParameters.getBooleanOrDefault(
SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, SystemConstant.RESTART_DEFAULT_VALUE)
SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE)
|| sourceParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
Expand Down Expand Up @@ -865,7 +866,8 @@ private long calculateSendTsFileReadBufferMemory(
// If the source is history enable, we need to transfer tsfile
boolean needTransferTsFile =
sourceParameters.getBooleanOrDefault(
SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, SystemConstant.RESTART_DEFAULT_VALUE)
SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE)
|| sourceParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeReportableSubtask;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
Expand All @@ -36,7 +37,6 @@
import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
Expand Down Expand Up @@ -210,7 +210,7 @@ protected boolean executeOnce() throws Exception {
lastEvent instanceof EnrichedEvent
? ((EnrichedEvent) lastEvent).coreReportMessage()
: lastEvent,
ErrorHandlingUtils.getRootCause(e).getMessage()),
ErrorHandlingCommonUtils.getRootCause(e).getMessage()),
e);
} else {
LOGGER.info(
Expand Down Expand Up @@ -247,7 +247,7 @@ public void close() {
LOGGER.info(
"Exception occurred when closing pipe processor subtask {}, root cause: {}",
taskID,
ErrorHandlingUtils.getRootCause(e).getMessage(),
ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
e);
} finally {
// should be called after pipeProcessor.close()
Expand Down Expand Up @@ -291,7 +291,7 @@ public int getRegionId() {

@Override
protected String getRootCause(final Throwable throwable) {
return ErrorHandlingUtils.getRootCause(throwable).getMessage();
return ErrorHandlingCommonUtils.getRootCause(throwable).getMessage();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

package org.apache.iotdb.db.pipe.agent.task.subtask.sink;

import org.apache.iotdb.commons.exception.pipe.PipeNonReportException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
Expand All @@ -35,14 +35,12 @@
import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -134,37 +132,8 @@ protected boolean executeOnce() {

decreaseReferenceCountAndReleaseLastEvent(event, true);
sleepInterval = PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs();
} catch (final PipeNonReportException e) {
sleep4NonReportException();
} catch (final PipeException e) {
if (!isClosed.get()) {
setLastExceptionEvent(event);
throw e;
} else {
LOGGER.info(
"{} in pipe transfer, ignored because the connector subtask is dropped.{}",
e.getClass().getSimpleName(),
e.getMessage() != null ? " Message: " + e.getMessage() : "");
clearReferenceCountAndReleaseLastEvent(event);
}
} catch (final Exception e) {
if (!isClosed.get()) {
setLastExceptionEvent(event);
throw new PipeException(
String.format(
"Exception in pipe transfer, subtask: %s, last event: %s, root cause: %s",
taskID,
event instanceof EnrichedEvent
? ((EnrichedEvent) event).coreReportMessage()
: event,
ErrorHandlingUtils.getRootCause(e).getMessage()),
e);
} else {
LOGGER.info(
"Exception in pipe transfer, ignored because the sink subtask is dropped.{}",
e.getMessage() != null ? " Message: " + e.getMessage() : "");
clearReferenceCountAndReleaseLastEvent(event);
}
handleException(event, e);
}

return true;
Expand Down Expand Up @@ -216,7 +185,7 @@ public void close() {
LOGGER.info(
"Exception occurred when closing pipe connector subtask {}, root cause: {}",
taskID,
ErrorHandlingUtils.getRootCause(e).getMessage(),
ErrorHandlingCommonUtils.getRootCause(e).getMessage(),
e);
} finally {
inputPendingQueue.discardAllEvents();
Expand Down Expand Up @@ -377,11 +346,12 @@ public void setEventSizeHistogram(Histogram eventSizeHistogram) {

@Override
protected String getRootCause(final Throwable throwable) {
return ErrorHandlingUtils.getRootCause(throwable).getMessage();
return ErrorHandlingCommonUtils.getRootCause(throwable).getMessage();
}

@Override
protected void report(final EnrichedEvent event, final PipeRuntimeException exception) {
lastExceptionTime = Long.MAX_VALUE;
PipeDataNodeAgent.runtime().report(event, exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ public void validate(final PipeParameterValidator validator) {
// enabling the historical data extraction, which may affect the realtime data extraction.
isHistoricalSourceEnabled =
parameters.getBooleanOrDefault(
SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, SystemConstant.RESTART_DEFAULT_VALUE)
SystemConstant.RESTART_OR_NEWLY_ADDED_KEY,
SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE)
|| parameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY),
EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
import static org.apache.iotdb.db.queryengine.execution.QueryState.ABORTED;
import static org.apache.iotdb.db.queryengine.execution.QueryState.CANCELED;
import static org.apache.iotdb.db.queryengine.execution.QueryState.DISPATCHING;
Expand All @@ -42,7 +43,6 @@
import static org.apache.iotdb.db.queryengine.execution.QueryState.PLANNED;
import static org.apache.iotdb.db.queryengine.execution.QueryState.QUEUED;
import static org.apache.iotdb.db.queryengine.execution.QueryState.RUNNING;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;

/**
* State machine for a {@link QueryExecution}. It stores the states for the {@link QueryExecution}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.IConsensus;
Expand All @@ -33,7 +34,6 @@
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.rpc.TSStatusCode;

Expand Down Expand Up @@ -116,7 +116,7 @@ public RegionExecutionResult execute(
RegionExecutionResult resp =
RegionExecutionResult.create(
false, String.format(ERROR_MSG_FORMAT, e.getMessage()), null);
Throwable t = ErrorHandlingUtils.getRootCause(e);
Throwable t = ErrorHandlingCommonUtils.getRootCause(e);
if (t instanceof ReadException
|| t instanceof ReadIndexException
|| t instanceof NotLeaderException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.apache.iotdb.db.queryengine.execution.schedule;

import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.db.utils.SetThreadName;

import org.slf4j.Logger;
Expand Down Expand Up @@ -117,7 +117,7 @@ public void close() throws IOException {
}

private String getAbortCause(final Exception e) {
Throwable rootCause = ErrorHandlingUtils.getRootCause(e);
Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
if (rootCause instanceof MemoryNotEnoughException) {
return DriverTaskAbortedException.BY_MEMORY_NOT_ENOUGH;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.queryengine.plan.statement.crud;

import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
Expand Down Expand Up @@ -163,9 +164,12 @@ protected long calculateBytesUsed() {

@Override
public String toString() {
final int size = CommonDescriptor.getInstance().getConfig().getPathLogMaxSize();
return "InsertMultiTabletsStatement{"
+ "insertTabletStatementList="
+ insertTabletStatementList
+ (Objects.nonNull(insertTabletStatementList) && insertTabletStatementList.size() > size
? "(Partial) " + insertTabletStatementList.subList(0, size)
: insertTabletStatementList)
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
Expand Down Expand Up @@ -69,7 +70,7 @@ public static TSStatus onNpeOrUnexpectedException(
LOGGER.warn(ERROR_OPERATION_LOG, statusCode, operation, e);
}
if (e instanceof SemanticException) {
Throwable rootCause = getRootCause(e);
Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
if (e.getCause() instanceof IoTDBException) {
return RpcUtils.getStatus(
((IoTDBException) e.getCause()).getErrorCode(), rootCause.getMessage());
Expand All @@ -86,13 +87,6 @@ public static TSStatus onNpeOrUnexpectedException(
return onNpeOrUnexpectedException(e, operation.getName(), statusCode);
}

public static Throwable getRootCause(Throwable e) {
while (e.getCause() != null) {
e = e.getCause();
}
return e;
}

public static TSStatus onQueryException(Exception e, String operation, TSStatusCode statusCode) {
TSStatus status = tryCatchQueryException(e);
if (status != null) {
Expand Down Expand Up @@ -135,7 +129,7 @@ public static TSStatus onQueryException(Exception e, OperationType operation) {
}

private static TSStatus tryCatchQueryException(Exception e) {
Throwable rootCause = getRootCause(e);
Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
// ignore logging sg not ready exception
if (rootCause instanceof StorageGroupNotReadyException) {
return RpcUtils.getStatus(TSStatusCode.STORAGE_ENGINE_NOT_READY, rootCause.getMessage());
Expand Down Expand Up @@ -210,7 +204,7 @@ private static TSStatus tryCatchNonQueryException(Exception e) {
LOGGER.warn(message, e);
return RpcUtils.getStatus(Arrays.asList(batchException.getFailingStatus()));
} else if (e instanceof IoTDBException) {
Throwable rootCause = getRootCause(e);
Throwable rootCause = ErrorHandlingCommonUtils.getRootCause(e);
// ignore logging sg not ready exception
if (!(rootCause instanceof StorageGroupNotReadyException)) {
LOGGER.warn(message, e);
Expand Down
Loading