diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index f9ef2a64a8346..7286d12099161 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -434,13 +435,14 @@ public static class ValueHider { } static String hide(final String key, final String value) { - if (Objects.isNull(key)) { - return value; - } - if (KEYS.contains(KeyReducer.reduce(key))) { + if (isHiddenKey(key)) { return PLACEHOLDER; } return value; } + + public static boolean isHiddenKey(final String key) { + return Objects.nonNull(key) && KEYS.contains(KeyReducer.reduce(key).toLowerCase(Locale.ROOT)); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index fe9737b0d7d35..c37aab5af2c8d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -260,7 +260,7 @@ protected boolean executeOnce() throws Exception { throw new PipeException( String.format( "Exception in pipe process, subtask: %s, last event: %s, root cause: %s", - taskID, + getDisplayTaskID(), lastEvent instanceof EnrichedEvent ? ((EnrichedEvent) lastEvent).coreReportMessage() : lastEvent, @@ -300,7 +300,7 @@ public void close() { } catch (final Exception e) { LOGGER.info( DataNodePipeMessages.EXCEPTION_OCCURRED_WHEN_CLOSING_PIPE_PROCESSOR_SUBTASK, - taskID, + getDisplayTaskID(), ErrorHandlingCommonUtils.getRootCause(e).getMessage(), e); } finally { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 90d325f6d238e..10b746778e35e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -60,6 +60,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { // Record these variables to provide corresponding value to tag key of monitoring metrics private final String attributeSortedString; + private final String attributeDisplayString; private final int sinkIndex; // Now parallel connectors run the same time, thus the heartbeat events are not sure @@ -75,8 +76,27 @@ public PipeSinkSubtask( final int sinkIndex, final UnboundedBlockingPendingQueue inputPendingQueue, final PipeConnector outputPipeConnector) { + this( + taskID, + creationTime, + attributeSortedString, + attributeSortedString, + sinkIndex, + inputPendingQueue, + outputPipeConnector); + } + + public PipeSinkSubtask( + final String taskID, + final long creationTime, + final String attributeSortedString, + final String attributeDisplayString, + final int sinkIndex, + final UnboundedBlockingPendingQueue inputPendingQueue, + final PipeConnector outputPipeConnector) { super(taskID, creationTime, outputPipeConnector); this.attributeSortedString = attributeSortedString; + this.attributeDisplayString = attributeDisplayString; this.sinkIndex = sinkIndex; this.inputPendingQueue = inputPendingQueue; @@ -156,7 +176,7 @@ private void transferHeartbeatEvent(final PipeHeartbeatEvent event) { DataNodePipeMessages.PIPECONNECTOR + outputPipeSink.getClass().getName() + "(id: " - + taskID + + getDisplayTaskID() + ")" + " heartbeat failed, or encountered failure when transferring generic event. Failure: " + e.getMessage(), @@ -181,13 +201,13 @@ public void close() { outputPipeSink.close(); LOGGER.info( DataNodePipeMessages.PIPE_CONNECTOR_SUBTASK_WAS_CLOSED_WITHIN_MS, - taskID, + getDisplayTaskID(), outputPipeSink, System.currentTimeMillis() - startTime); } catch (final Exception e) { LOGGER.info( DataNodePipeMessages.EXCEPTION_OCCURRED_WHEN_CLOSING_PIPE_CONNECTOR_SUBTASK, - taskID, + getDisplayTaskID(), ErrorHandlingCommonUtils.getRootCause(e).getMessage(), e); } finally { @@ -366,4 +386,14 @@ protected void report(final EnrichedEvent event, final PipeRuntimeException exce lastExceptionTime = Long.MAX_VALUE; PipeDataNodeAgent.runtime().report(event, exception); } + + @Override + public String getDisplayTaskID() { + return generateDisplayTaskID(attributeDisplayString, creationTime, sinkIndex); + } + + static String generateDisplayTaskID( + final String attributeDisplayString, final long creationTime, final int sinkIndex) { + return String.format("%s_%s_%s", attributeDisplayString, creationTime, sinkIndex); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java index 42b1ae9136676..b49332346591d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java @@ -75,7 +75,7 @@ public synchronized void register() { registeredTaskCount++; LOGGER.info( DataNodePipeMessages.REGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT, - subtask, + subtask.getDisplayTaskID(), runningTaskCount, registeredTaskCount); } @@ -112,7 +112,7 @@ public synchronized boolean deregister(final CommitterKey committerKey) { registeredTaskCount--; LOGGER.info( DataNodePipeMessages.DEREGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT, - subtask, + subtask.getDisplayTaskID(), runningTaskCount, registeredTaskCount); } @@ -135,7 +135,7 @@ public synchronized void start() { runningTaskCount++; LOGGER.info( DataNodePipeMessages.START_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT, - subtask, + subtask.getDisplayTaskID(), runningTaskCount, registeredTaskCount); } @@ -152,7 +152,7 @@ public synchronized void stop() { runningTaskCount--; LOGGER.info( DataNodePipeMessages.STOP_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT, - subtask, + subtask.getDisplayTaskID(), runningTaskCount, registeredTaskCount); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index 01552eec5ae3a..977afa4a7f316 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -145,8 +145,7 @@ public synchronized String register( for (int sinkIndex = 0; sinkIndex < sinkNum; sinkIndex++) { final String taskID = String.format( - "%s_%s_%s", - attributeDisplayStringWithPrefix, environment.getCreationTime(), sinkIndex); + "%s_%s_%s", attributeSortedString, environment.getCreationTime(), sinkIndex); environment.setSinkTaskId(taskID); final PipeConnector pipeSink = @@ -181,6 +180,7 @@ public synchronized String register( new PipeSinkSubtask( taskID, environment.getCreationTime(), + attributeSortedString, attributeDisplayStringWithPrefix, sinkIndex, pendingQueue, @@ -290,13 +290,15 @@ private static String generateAttributeSortedString( return sortedStringSourceMap.toString(); } - /** Masked attribute string for logs, metrics and exception messages. */ - private static String generateAttributeDisplayString( - final PipeParameters pipeConnectorParameters) { + /** + * Attribute string for logs, metrics and exception messages with sensitive attributes removed. + */ + static String generateAttributeDisplayString(final PipeParameters pipeConnectorParameters) { final TreeMap filteredAttributes = new TreeMap<>(pipeConnectorParameters.getAttribute()); filteredAttributes.remove(SystemConstant.RESTART_OR_NEWLY_ADDED_KEY); - return new PipeParameters(filteredAttributes).toString(); + filteredAttributes.keySet().removeIf(PipeParameters.ValueHider::isHiddenKey); + return filteredAttributes.toString(); } private void throwNoSuchSubtaskException(final String attributeSortedString) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java index d9b79fdda8e82..17ff545251c05 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/schema/PipeSchemaRegionSinkMetrics.java @@ -110,7 +110,9 @@ public void register(final PipeSinkSubtask pipeSinkSubtask) { public void deregister(final String taskID) { if (!connectorMap.containsKey(taskID)) { - LOGGER.warn(DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_SCHEMA_REGION_CONNECTOR, taskID); + LOGGER.warn( + DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_SCHEMA_REGION_CONNECTOR, + getDisplayTaskID(taskID)); return; } if (Objects.nonNull(metricService)) { @@ -125,12 +127,18 @@ public void markSchemaEvent(final String taskID) { } final Rate rate = schemaRateMap.get(taskID); if (rate == null) { - LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_SCHEMA_REGION_WRITE, taskID); + LOGGER.info( + DataNodePipeMessages.FAILED_TO_MARK_PIPE_SCHEMA_REGION_WRITE, getDisplayTaskID(taskID)); return; } rate.mark(); } + private String getDisplayTaskID(final String taskID) { + final PipeSinkSubtask connector = connectorMap.get(taskID); + return Objects.nonNull(connector) ? connector.getDisplayTaskID() : "unknown"; + } + //////////////////////////// singleton //////////////////////////// private static class PipeSchemaRegionSinkMetricsHolder { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java index 9b2f876ff2d20..41f08f055c7c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/sink/PipeDataRegionSinkMetrics.java @@ -447,7 +447,9 @@ public void register(final PipeSinkSubtask pipeSinkSubtask) { public void deregister(final String taskID) { if (!sinkMap.containsKey(taskID)) { - LOGGER.warn(DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_DATA_REGION_SINK, taskID); + LOGGER.warn( + DataNodePipeMessages.FAILED_TO_DEREGISTER_PIPE_DATA_REGION_SINK, + getDisplayTaskID(taskID)); return; } if (Objects.nonNull(metricService)) { @@ -462,7 +464,8 @@ public void markTabletEvent(final String taskID) { } final Rate rate = tabletRateMap.get(taskID); if (rate == null) { - LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK, taskID); + LOGGER.info( + DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK, getDisplayTaskID(taskID)); return; } rate.mark(); @@ -474,12 +477,18 @@ public void markTsFileEvent(final String taskID) { } final Rate rate = tsFileRateMap.get(taskID); if (rate == null) { - LOGGER.info(DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK_1, taskID); + LOGGER.info( + DataNodePipeMessages.FAILED_TO_MARK_PIPE_DATA_REGION_SINK_1, getDisplayTaskID(taskID)); return; } rate.mark(); } + private String getDisplayTaskID(final String taskID) { + final PipeSinkSubtask sink = sinkMap.get(taskID); + return Objects.nonNull(sink) ? sink.getDisplayTaskID() : "unknown"; + } + public void markPipeHeartbeatEvent(final String taskID) { if (Objects.isNull(metricService)) { return; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java index a237008ab4e70..9cd54b425cf74 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeSinkSubtaskExecutorTest.java @@ -41,6 +41,7 @@ public void setUp() throws Exception { "PipeConnectorSubtaskExecutorTest", System.currentTimeMillis(), "TestAttributeSortedString", + "TestAttributeSortedString", 0, mock(UnboundedBlockingPendingQueue.class), mock(PipeConnector.class))); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java index 2a15fb9ea181b..e9d76a4fc7ebf 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java @@ -23,12 +23,18 @@ import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; public class PipeSinkSubtaskTest { @@ -47,6 +53,7 @@ public void testDiscardEventsOfPipeDelegatesToConnector() { "PipeSinkSubtaskTest", System.currentTimeMillis(), "data_test", + "data_test", 0, (UnboundedBlockingPendingQueue) pendingQueue, connector)); @@ -60,4 +67,41 @@ public void testDiscardEventsOfPipeDelegatesToConnector() { subtask.close(); } } + + @Test + public void testTransferExceptionUsesDisplayTaskID() throws Exception { + final PipeConnector connector = mock(PipeConnector.class); + final UnboundedBlockingPendingQueue pendingQueue = + mock(UnboundedBlockingPendingQueue.class); + final Event event = mock(Event.class); + + when(pendingQueue.waitedPoll()).thenReturn(event); + doThrow(new RuntimeException("No more authentication methods available")) + .when(connector) + .transfer(any(Event.class)); + + final PipeSinkSubtask subtask = + new PipeSinkSubtask( + "data_{sink=TSFILE_REMOTE_SINK, sink.scp.password=Iotdb@2026}_1701687309493_0", + 1701687309493L, + "data_{sink=TSFILE_REMOTE_SINK, sink.scp.password=Iotdb@2026}", + "data_{sink=TSFILE_REMOTE_SINK, sink.scp.host=172.20.70.119}", + 0, + pendingQueue, + connector); + + try { + subtask.executeOnce(); + Assert.fail(); + } catch (final PipeException e) { + Assert.assertTrue(e.getMessage().contains("Exception in pipe transfer, subtask: data_{")); + Assert.assertTrue(e.getMessage().contains("sink=TSFILE_REMOTE_SINK")); + Assert.assertTrue(e.getMessage().contains("sink.scp.host=172.20.70.119")); + Assert.assertTrue(e.getMessage().contains("No more authentication methods available")); + Assert.assertFalse(e.getMessage().contains("sink.scp.password")); + Assert.assertFalse(e.getMessage().contains("Iotdb@2026")); + } finally { + subtask.close(); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java index c28cfb2569245..f14bb483e53ac 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java @@ -99,7 +99,7 @@ protected PipeSubtaskExecutor( public final synchronized void register(final PipeSubtask subtask) { if (registeredIdSubtaskMapper.containsKey(subtask.getTaskID())) { - LOGGER.warn(PipeMessages.SUBTASK_ALREADY_REGISTERED, getSafeSubtaskStr(subtask.getTaskID())); + LOGGER.warn(PipeMessages.SUBTASK_ALREADY_REGISTERED, subtask.getDisplayTaskID()); return; } @@ -125,13 +125,13 @@ public final synchronized void start(final String subTaskID) { final PipeSubtask subtask = registeredIdSubtaskMapper.get(subTaskID); if (subtask.isSubmittingSelf()) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug(PipeMessages.SUBTASK_ALREADY_RUNNING, getSafeSubtaskStr(subTaskID)); + LOGGER.debug(PipeMessages.SUBTASK_ALREADY_RUNNING, subtask.getDisplayTaskID()); } } else { subtask.allowSubmittingSelf(); subtask.submitSelf(); ++runningSubtaskNumber; - LOGGER.info(PipeMessages.SUBTASK_STARTED, getSafeSubtaskStr(subTaskID)); + LOGGER.info(PipeMessages.SUBTASK_STARTED, subtask.getDisplayTaskID()); } } @@ -154,9 +154,9 @@ public final synchronized void deregister(final String subTaskID) { if (subtask != null) { try { subtask.close(); - LOGGER.info(PipeMessages.SUBTASK_CLOSED, getSafeSubtaskStr(subTaskID)); + LOGGER.info(PipeMessages.SUBTASK_CLOSED, subtask.getDisplayTaskID()); } catch (final Exception e) { - LOGGER.error(PipeMessages.SUBTASK_CLOSE_FAILED, getSafeSubtaskStr(subTaskID), e); + LOGGER.error(PipeMessages.SUBTASK_CLOSE_FAILED, subtask.getDisplayTaskID(), e); } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java index 790e5fe4c6d2e..9dbf5af2d0ce2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java @@ -199,7 +199,7 @@ private boolean onPipeConnectionException(final Throwable throwable) { PipeMessages.HANDSHAKE_FAILED_STOPPING, outputPipeSink.getClass().getName(), MAX_RETRY_TIMES, - taskID, + getDisplayTaskID(), creationTime, this.getClass().getSimpleName(), throwable); @@ -287,7 +287,7 @@ protected void handleException(final Event event, final Exception e) { throw new PipeException( String.format( PipeMessages.EXCEPTION_IN_PIPE_TRANSFER_FORMAT, - taskID, + getDisplayTaskID(), event instanceof EnrichedEvent ? ((EnrichedEvent) event).coreReportMessage() : event, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java index 1f43403157b9a..52bc3bd61f811 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java @@ -88,7 +88,7 @@ private void onReportEventFailure(final Throwable throwable) { if (retryCount.get() == 0) { LOGGER.warn( PipeMessages.FAILED_TO_EXECUTE_SUBTASK, - taskID, + getDisplayTaskID(), creationTime, this.getClass().getSimpleName(), throwable.getMessage(), @@ -102,7 +102,7 @@ private void onReportEventFailure(final Throwable throwable) { LOGGER::warn, throwable, PipeMessages.RETRY_EXECUTING_SUBTASK, - taskID, + getDisplayTaskID(), creationTime, this.getClass().getSimpleName(), retryCount.get(), @@ -113,7 +113,7 @@ private void onReportEventFailure(final Throwable throwable) { } catch (final InterruptedException e) { LOGGER.warn( PipeMessages.INTERRUPTED_RETRYING_SUBTASK, - taskID, + getDisplayTaskID(), creationTime, this.getClass().getSimpleName(), e); @@ -125,7 +125,7 @@ private void onReportEventFailure(final Throwable throwable) { final String errorMessage = String.format( PipeMessages.SUBTASK_RETRY_EXCEEDED_FORMAT, - taskID, + getDisplayTaskID(), creationTime, this.getClass().getSimpleName(), retryCount.get() - 1, @@ -139,7 +139,7 @@ private void onReportEventFailure(final Throwable throwable) { : new PipeRuntimeCriticalException(errorMessage)); LOGGER.warn( PipeMessages.SUBTASK_EXCEPTION_REPORTED, - taskID, + getDisplayTaskID(), creationTime, this.getClass().getSimpleName(), throwable); @@ -154,7 +154,7 @@ private void onNonReportEventFailure(final Throwable throwable) { if (retryCount.get() == 0) { LOGGER.warn( PipeMessages.FAILED_TO_EXECUTE_SUBTASK_RETRY_FOREVER, - taskID, + getDisplayTaskID(), creationTime, this.getClass().getSimpleName(), throwable.getMessage(), @@ -165,7 +165,7 @@ private void onNonReportEventFailure(final Throwable throwable) { PipeLogger.log( LOGGER::warn, PipeMessages.RETRY_EXECUTING_SUBTASK_FOREVER, - taskID, + getDisplayTaskID(), creationTime, this.getClass().getSimpleName(), retryCount.get(), @@ -176,7 +176,7 @@ private void onNonReportEventFailure(final Throwable throwable) { } catch (final InterruptedException e) { LOGGER.warn( PipeMessages.INTERRUPTED_RETRYING_SUBTASK, - taskID, + getDisplayTaskID(), creationTime, this.getClass().getSimpleName()); Thread.currentThread().interrupt(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java index 2da797c2b3b5d..29f651a1ac943 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java @@ -114,7 +114,7 @@ public synchronized void onSuccess(final Boolean hasAtLeastOneEventProcessed) { if (totalRetryCount != 0) { LOGGER.warn( "Successfully executed subtask {}({}) after {} retries.", - taskID, + getDisplayTaskID(), this.getClass().getSimpleName(), totalRetryCount); } @@ -196,6 +196,10 @@ public String getTaskID() { return taskID; } + public String getDisplayTaskID() { + return taskID; + } + public long getCreationTime() { return creationTime; }