@@ -64,6 +64,8 @@ public class PipeSinkSubtaskManager {
6464 private final Map <String , List <PipeSinkSubtaskLifeCycle >>
6565 attributeSortedString2SubtaskLifeCycleMap = new HashMap <>();
6666
67+ private final Map <String , String > attributeSortedString2DisplayString = new HashMap <>();
68+
6769 public synchronized String register (
6870 final Supplier <? extends PipeSinkSubtaskExecutor > executorSupplier ,
6971 final PipeParameters pipeSinkParameters ,
@@ -92,19 +94,14 @@ public synchronized String register(
9294 final int sinkNum ;
9395 boolean realTimeFirst = false ;
9496 String attributeSortedString = generateAttributeSortedString (pipeSinkParameters );
97+ final String attributeDisplayString = generateAttributeDisplayString (pipeSinkParameters );
9598 if (isDataRegionSink ) {
9699 sinkNum =
97100 pipeSinkParameters .getIntOrDefault (
98101 Arrays .asList (
99102 PipeSinkConstant .CONNECTOR_IOTDB_PARALLEL_TASKS_KEY ,
100103 PipeSinkConstant .SINK_IOTDB_PARALLEL_TASKS_KEY ),
101- PipeSinkConstant .SINGLE_THREAD_DEFAULT_SINK .contains (
102- pipeSinkParameters
103- .getStringOrDefault (
104- Arrays .asList (
105- PipeSinkConstant .CONNECTOR_KEY , PipeSinkConstant .SINK_KEY ),
106- BuiltinPipePlugin .IOTDB_THRIFT_SINK .getPipePluginName ())
107- .toLowerCase ())
104+ PipeSinkConstant .SINGLE_THREAD_DEFAULT_SINK .contains (connectorKey )
108105 ? 1
109106 : PipeSinkConstant .CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE );
110107 realTimeFirst =
@@ -120,7 +117,9 @@ public synchronized String register(
120117 sinkNum = 1 ;
121118 attributeSortedString = "schema_" + attributeSortedString ;
122119 }
123- environment .setAttributeSortedString (attributeSortedString );
120+ final String attributeDisplayStringWithPrefix =
121+ isDataRegionSink ? "data_" + attributeDisplayString : "schema_" + attributeDisplayString ;
122+ environment .setAttributeSortedString (attributeDisplayStringWithPrefix );
124123
125124 if (!attributeSortedString2SubtaskLifeCycleMap .containsKey (attributeSortedString )) {
126125 final PipeSinkSubtaskExecutor executor = executorSupplier .get ();
@@ -138,6 +137,11 @@ public synchronized String register(
138137 }
139138
140139 for (int sinkIndex = 0 ; sinkIndex < sinkNum ; sinkIndex ++) {
140+ final String taskID =
141+ String .format (
142+ "%s_%s_%s" ,
143+ attributeDisplayStringWithPrefix , environment .getCreationTime (), sinkIndex );
144+
141145 final PipeConnector pipeSink =
142146 isDataRegionSink
143147 ? PipeDataNodeAgent .plugin ().dataRegion ().reflectSink (pipeSinkParameters )
@@ -168,10 +172,10 @@ public synchronized String register(
168172 // 2. Construct PipeConnectorSubtaskLifeCycle to manage PipeConnectorSubtask's life cycle
169173 final PipeSinkSubtask pipeSinkSubtask =
170174 new PipeSinkSubtask (
171- String .format (
172- "%s_%s_%s" , attributeSortedString , environment .getCreationTime (), sinkIndex ),
175+ taskID ,
173176 environment .getCreationTime (),
174177 attributeSortedString ,
178+ attributeDisplayStringWithPrefix ,
175179 sinkIndex ,
176180 pendingQueue ,
177181 pipeSink );
@@ -182,11 +186,13 @@ public synchronized String register(
182186
183187 LOGGER .info (
184188 DataNodePipeMessages .PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED ,
185- attributeSortedString ,
189+ attributeDisplayStringWithPrefix ,
186190 executor .getWorkingThreadName (),
187191 executor .getCallbackThreadName ());
188192 attributeSortedString2SubtaskLifeCycleMap .put (
189193 attributeSortedString , pipeSinkSubtaskLifeCycleList );
194+ attributeSortedString2DisplayString .put (
195+ attributeSortedString , attributeDisplayStringWithPrefix );
190196 }
191197
192198 for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -203,7 +209,7 @@ public synchronized void deregister(
203209 final int regionId ,
204210 final String attributeSortedString ) {
205211 if (!attributeSortedString2SubtaskLifeCycleMap .containsKey (attributeSortedString )) {
206- throw new PipeException ( FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString );
212+ throwNoSuchSubtaskException ( attributeSortedString );
207213 }
208214
209215 final List <PipeSinkSubtaskLifeCycle > lifeCycles =
@@ -219,6 +225,7 @@ public synchronized void deregister(
219225
220226 if (lifeCycles .isEmpty ()) {
221227 attributeSortedString2SubtaskLifeCycleMap .remove (attributeSortedString );
228+ attributeSortedString2DisplayString .remove (attributeSortedString );
222229 executor .shutdown ();
223230 LOGGER .info (
224231 DataNodePipeMessages .THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN ,
@@ -234,7 +241,7 @@ public synchronized void deregister(
234241
235242 public synchronized void start (final String attributeSortedString ) {
236243 if (!attributeSortedString2SubtaskLifeCycleMap .containsKey (attributeSortedString )) {
237- throw new PipeException ( FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString );
244+ throwNoSuchSubtaskException ( attributeSortedString );
238245 }
239246
240247 for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -245,7 +252,7 @@ public synchronized void start(final String attributeSortedString) {
245252
246253 public synchronized void stop (final String attributeSortedString ) {
247254 if (!attributeSortedString2SubtaskLifeCycleMap .containsKey (attributeSortedString )) {
248- throw new PipeException ( FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + attributeSortedString );
255+ throwNoSuchSubtaskException ( attributeSortedString );
249256 }
250257
251258 for (final PipeSinkSubtaskLifeCycle lifeCycle :
@@ -258,7 +265,8 @@ public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue(
258265 final String attributeSortedString ) {
259266 if (!attributeSortedString2SubtaskLifeCycleMap .containsKey (attributeSortedString )) {
260267 throw new PipeException (
261- DataNodePipeMessages .FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK + attributeSortedString );
268+ DataNodePipeMessages .FAILED_TO_GET_PENDINGQUEUE_NO_SUCH_SUBTASK
269+ + getDisplayStringForException (attributeSortedString ));
262270 }
263271
264272 // All subtasks share the same pending queue
@@ -268,13 +276,35 @@ public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue(
268276 .getPendingQueue ();
269277 }
270278
271- private String generateAttributeSortedString (final PipeParameters pipeConnectorParameters ) {
279+ private static String generateAttributeSortedString (
280+ final PipeParameters pipeConnectorParameters ) {
272281 final TreeMap <String , String > sortedStringSourceMap =
273282 new TreeMap <>(pipeConnectorParameters .getAttribute ());
274283 sortedStringSourceMap .remove (SystemConstant .RESTART_OR_NEWLY_ADDED_KEY );
275284 return sortedStringSourceMap .toString ();
276285 }
277286
287+ /**
288+ * Attribute string for logs, metrics and exception messages with sensitive attributes removed.
289+ */
290+ static String generateAttributeDisplayString (final PipeParameters pipeConnectorParameters ) {
291+ final TreeMap <String , String > filteredAttributes =
292+ new TreeMap <>(pipeConnectorParameters .getAttribute ());
293+ filteredAttributes .remove (SystemConstant .RESTART_OR_NEWLY_ADDED_KEY );
294+ filteredAttributes .keySet ().removeIf (PipeParameters .ValueHider ::isHiddenKey );
295+ return filteredAttributes .toString ();
296+ }
297+
298+ private void throwNoSuchSubtaskException (final String attributeSortedString ) {
299+ throw new PipeException (
300+ FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE
301+ + getDisplayStringForException (attributeSortedString ));
302+ }
303+
304+ private String getDisplayStringForException (final String attributeSortedString ) {
305+ return attributeSortedString2DisplayString .getOrDefault (attributeSortedString , "unknown" );
306+ }
307+
278308 ///////////////////////// Singleton Instance Holder /////////////////////////
279309
280310 private PipeSinkSubtaskManager () {
0 commit comments