From aef70bcdbcb090ff648d3f85c00056abd573d4a9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Jun 2025 15:17:20 +0800 Subject: [PATCH 1/5] Update IoTDBDataNodeReceiver.java --- .../db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java | 1 - 1 file changed, 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index b8634f92b6475..75476028cd019 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -230,7 +230,6 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { return handleTransferTabletInsertNode( PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req)); - } finally { PipeDataNodeReceiverMetrics.getInstance() .recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); From a968bb522cbeeb814e41edd814666b35450c5560 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Jun 2025 17:48:42 +0800 Subject: [PATCH 2/5] May completion --- .../receiver/PipeDataNodeReceiverMetrics.java | 427 ++++++++++++++++++ .../thrift/IoTDBDataNodeReceiver.java | 81 ++-- .../commons/service/metric/enums/Metric.java | 1 + 3 files changed, 469 insertions(+), 40 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java index 1e877a7b2a04e..9b76f8540b036 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java @@ -24,6 +24,7 @@ import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.impl.DoNothingMetricManager; import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.type.Rate; import org.apache.iotdb.metrics.type.Timer; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; @@ -53,6 +54,27 @@ public class PipeDataNodeReceiverMetrics implements IMetricSet { private Timer transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Rate handshakeDatanodeV1Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate handshakeDatanodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletInsertNodeRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletInsertNodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletRawRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletRawV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletBinaryRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletBinaryV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletBatchRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletBatchV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTsFilePieceRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTsFileSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTsFilePieceWithModRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTsFileSealWithModRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferSchemaPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferSchemaSnapshotPieceRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferSchemaSnapshotSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferConfigPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferCompressedRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferSliceRate = DoNothingMetricManager.DO_NOTHING_RATE; + private static final String RECEIVER = "pipeDataNodeReceiver"; private PipeDataNodeReceiverMetrics() {} @@ -137,6 +159,86 @@ public void recordTransferSliceTimer(final long costTimeInNanos) { transferSliceTimer.updateNanos(costTimeInNanos); } + public void markHandshakeDatanodeV1Size(final long reqBytes) { + handshakeDatanodeV1Rate.mark(reqBytes); + } + + public void markHandshakeDatanodeV2Size(final long reqBytes) { + handshakeDatanodeV2Rate.mark(reqBytes); + } + + public void markTransferTabletInsertNodeSize(final long reqBytes) { + transferTabletInsertNodeRate.mark(reqBytes); + } + + public void markTransferTabletInsertNodeV2Size(final long reqBytes) { + transferTabletInsertNodeV2Rate.mark(reqBytes); + } + + public void markTransferTabletRawSize(final long reqBytes) { + transferTabletRawRate.mark(reqBytes); + } + + public void markTransferTabletRawV2Size(final long reqBytes) { + transferTabletRawV2Rate.mark(reqBytes); + } + + public void markTransferTabletBinarySize(final long reqBytes) { + transferTabletBinaryRate.mark(reqBytes); + } + + public void markTransferTabletBinaryV2Size(final long reqBytes) { + transferTabletBinaryV2Rate.mark(reqBytes); + } + + public void markTransferTabletBatchSize(final long reqBytes) { + transferTabletBatchRate.mark(reqBytes); + } + + public void markTransferTabletBatchV2Size(final long reqBytes) { + transferTabletBatchV2Rate.mark(reqBytes); + } + + public void markTransferTsFilePieceSize(final long reqBytes) { + transferTsFilePieceRate.mark(reqBytes); + } + + public void markTransferTsFileSealSize(final long reqBytes) { + transferTsFileSealRate.mark(reqBytes); + } + + public void markTransferTsFilePieceWithModSize(final long reqBytes) { + transferTsFilePieceWithModRate.mark(reqBytes); + } + + public void markTransferTsFileSealWithModSize(final long reqBytes) { + transferTsFileSealWithModRate.mark(reqBytes); + } + + public void markTransferSchemaPlanSize(final long reqBytes) { + transferSchemaPlanRate.mark(reqBytes); + } + + public void markTransferSchemaSnapshotPieceSize(final long reqBytes) { + transferSchemaSnapshotPieceRate.mark(reqBytes); + } + + public void markTransferSchemaSnapshotSealSize(final long reqBytes) { + transferSchemaSnapshotSealRate.mark(reqBytes); + } + + public void markTransferConfigPlanSize(final long reqBytes) { + transferConfigPlanRate.mark(reqBytes); + } + + public void markTransferCompressedSize(final long reqBytes) { + transferCompressedRate.mark(reqBytes); + } + + public void markTransferSliceSize(final long reqBytes) { + transferSliceRate.mark(reqBytes); + } + @Override public void bindTo(final AbstractMetricService metricService) { bindToTimer(metricService); @@ -303,6 +405,168 @@ private void bindToTimer(final AbstractMetricService metricService) { RECEIVER, Tag.TYPE.toString(), "transferSlice"); + + // Rate + handshakeDatanodeV1Rate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeDataNodeV1"); + handshakeDatanodeV2Rate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeDataNodeV2"); + transferTabletInsertNodeRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletInsertNode"); + transferTabletInsertNodeV2Rate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletInsertNodeV2"); + transferTabletRawRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletRaw"); + transferTabletRawV2Rate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletRawV2"); + transferTabletBinaryRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBinary"); + transferTabletBinaryV2Rate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBinaryV2"); + transferTabletBatchRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBatch"); + transferTabletBatchV2Rate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBatchV2"); + transferTsFilePieceRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFilePiece"); + transferTsFileSealRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFileSeal"); + transferTsFilePieceWithModRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFilePieceWithMod"); + transferTsFileSealWithModRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFileSealWithMod"); + transferSchemaPlanRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaPlan"); + transferSchemaSnapshotPieceRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaSnapshotPiece"); + transferSchemaSnapshotSealRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaSnapshotSeal"); + transferConfigPlanRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + transferCompressedRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); + transferSliceRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSlice"); } @Override @@ -332,6 +596,27 @@ private void unbind(final AbstractMetricService metricService) { transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER; transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + handshakeDatanodeV1Rate = DoNothingMetricManager.DO_NOTHING_RATE; + handshakeDatanodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletInsertNodeRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletInsertNodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletRawRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletRawV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletBinaryRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletBinaryV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletBatchRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletBatchV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTsFilePieceRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTsFileSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTsFilePieceWithModRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTsFileSealWithModRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferSchemaPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferSchemaSnapshotPieceRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferSchemaSnapshotSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferConfigPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferCompressedRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferSliceRate = DoNothingMetricManager.DO_NOTHING_RATE; + metricService.remove( MetricType.TIMER, Metric.PIPE_DATANODE_RECEIVER.toString(), @@ -472,6 +757,148 @@ private void unbind(final AbstractMetricService metricService) { RECEIVER, Tag.TYPE.toString(), "transferSlice"); + + // Rate + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeDatanodeV1"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeDatanodeV2"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletInsertNode"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletInsertNodeV2"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletRaw"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletRawV2"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBinary"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBinaryV2"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBatch"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBatchV2"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFilePiece"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFileSeal"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFilePieceWithMod"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFileSealWithMod"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaPlan"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaSnapshotPiece"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaSnapshotSeal"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSlice"); } public static PipeDataNodeReceiverMetrics getInstance() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 75476028cd019..104db05e2d3af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -199,6 +199,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { final long startTime = System.nanoTime(); final short rawRequestType = req.getType(); + final PipeDataNodeReceiverMetrics metrics = PipeDataNodeReceiverMetrics.getInstance(); if (PipeRequestType.isValidatedRequestType(rawRequestType)) { final PipeRequestType requestType = PipeRequestType.valueOf(rawRequestType); if (requestType != PipeRequestType.TRANSFER_SLICE) { @@ -211,8 +212,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferHandshakeV1( PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); + metrics.recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV1Size(req.body.limit()); } } case HANDSHAKE_DATANODE_V2: @@ -221,8 +222,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferHandshakeV2( PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime); + metrics.recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TABLET_INSERT_NODE: @@ -231,8 +232,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletInsertNode( PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); + metrics.recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TABLET_INSERT_NODE_V2: @@ -241,8 +242,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletInsertNode( PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletInsertNodeV2Timer(System.nanoTime() - startTime); + metrics.recordTransferTabletInsertNodeV2Timer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TABLET_RAW: @@ -250,8 +251,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { return handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletRawTimer(System.nanoTime() - startTime); + metrics.recordTransferTabletRawTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TABLET_RAW_V2: @@ -260,8 +261,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletRaw( PipeTransferTabletRawReqV2.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletRawV2Timer(System.nanoTime() - startTime); + metrics.recordTransferTabletRawV2Timer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TABLET_BINARY: @@ -270,8 +271,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletBinary( PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBinaryTimer(System.nanoTime() - startTime); + metrics.recordTransferTabletBinaryTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TABLET_BINARY_V2: @@ -280,8 +281,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletBinary( PipeTransferTabletBinaryReqV2.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBinaryV2Timer(System.nanoTime() - startTime); + metrics.recordTransferTabletBinaryV2Timer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TABLET_BATCH: @@ -290,8 +291,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletBatch( PipeTransferTabletBatchReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBatchTimer(System.nanoTime() - startTime); + metrics.recordTransferTabletBatchTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TABLET_BATCH_V2: @@ -300,8 +301,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletBatchV2( PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBatchV2Timer(System.nanoTime() - startTime); + metrics.recordTransferTabletBatchV2Timer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TS_FILE_PIECE: @@ -312,8 +313,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { req instanceof AirGapPseudoTPipeTransferRequest, true); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFilePieceTimer(System.nanoTime() - startTime); + metrics.recordTransferTsFilePieceTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TS_FILE_SEAL: @@ -322,8 +323,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferFileSealV1( PipeTransferTsFileSealReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFileSealTimer(System.nanoTime() - startTime); + metrics.recordTransferTsFileSealTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TS_FILE_PIECE_WITH_MOD: @@ -335,8 +336,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { false); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime); + metrics.recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TS_FILE_SEAL_WITH_MOD: @@ -345,8 +346,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferFileSealV2( PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); + metrics.recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_PLAN_NODE: @@ -354,8 +355,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { return handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaPlanTimer(System.nanoTime() - startTime); + metrics.recordTransferSchemaPlanTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_SCHEMA_SNAPSHOT_PIECE: @@ -367,8 +368,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { false); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime); + metrics.recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_SCHEMA_SNAPSHOT_SEAL: @@ -378,8 +379,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); + metrics.recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case HANDSHAKE_CONFIGNODE_V1: @@ -393,8 +394,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { // then transferred to ConfigNode receiver to execute. return handleTransferConfigPlan(req); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferConfigPlanTimer(System.nanoTime() - startTime); + metrics.recordTransferConfigPlanTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_SLICE: @@ -402,8 +403,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { return handleTransferSlice(PipeTransferSliceReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSliceTimer(System.nanoTime() - startTime); + metrics.recordTransferSliceTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_COMPRESSED: @@ -411,8 +412,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferCompressedTimer(System.nanoTime() - startTime); + metrics.recordTransferCompressedTimer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } default: diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 17444b3e5ac73..28ee0fc674355 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -151,6 +151,7 @@ public enum Metric { PIPE_TOTAL_COMPRESSED_SIZE("pipe_total_compressed_size"), PIPE_COMPRESSION_TIME("pipe_compression_time"), PIPE_DATANODE_RECEIVER("pipe_datanode_receiver"), + PIPE_DATANODE_RECEIVER_REQ_SIZE("pipe_datanode_receiver_req_size"), PIPE_CONFIGNODE_RECEIVER("pipe_confignode_receiver"), PIPE_EXTRACTOR_TABLET_SUPPLY("pipe_extractor_tablet_supply"), PIPE_EXTRACTOR_TSFILE_SUPPLY("pipe_extractor_tsfile_supply"), From 905cfaddd7c4e6fd673029e35991fe870adadf13 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Jun 2025 18:28:58 +0800 Subject: [PATCH 3/5] Fix --- .../protocol/IoTDBConfigNodeReceiver.java | 63 ++++++++++++------- .../thrift/IoTDBDataNodeReceiver.java | 38 ++++++----- 2 files changed, 57 insertions(+), 44 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java index b9d17db5b6acb..e926bc9ace215 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java @@ -157,6 +157,7 @@ public class IoTDBConfigNodeReceiver extends IoTDBFileReceiver { public TPipeTransferResp receive(final TPipeTransferReq req) { try { final short rawRequestType = req.getType(); + final PipeConfigNodeReceiverMetrics metrics = PipeConfigNodeReceiverMetrics.getInstance(); if (PipeRequestType.isValidatedRequestType(rawRequestType)) { final PipeRequestType type = PipeRequestType.valueOf(rawRequestType); if (needHandshake(type)) { @@ -169,41 +170,55 @@ public TPipeTransferResp receive(final TPipeTransferReq req) { final long startTime = System.nanoTime(); switch (type) { case HANDSHAKE_CONFIGNODE_V1: - resp = - handleTransferHandshakeV1( + { + try { + return handleTransferHandshakeV1( PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req)); - PipeConfigNodeReceiverMetrics.getInstance() - .recordHandshakeConfigNodeV1Timer(System.nanoTime() - startTime); - return resp; + } finally { + metrics.recordHandshakeConfigNodeV1Timer(System.nanoTime() - startTime); + } + } case HANDSHAKE_CONFIGNODE_V2: - resp = - handleTransferHandshakeV2( + { + try { + return handleTransferHandshakeV2( PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req)); - PipeConfigNodeReceiverMetrics.getInstance() - .recordHandshakeConfigNodeV2Timer(System.nanoTime() - startTime); - return resp; + } finally { + metrics.recordHandshakeConfigNodeV2Timer(System.nanoTime() - startTime); + } + } case TRANSFER_CONFIG_PLAN: - resp = handleTransferConfigPlan(PipeTransferConfigPlanReq.fromTPipeTransferReq(req)); - PipeConfigNodeReceiverMetrics.getInstance() - .recordTransferConfigPlanTimer(System.nanoTime() - startTime); - return resp; + { + try { + return handleTransferConfigPlan( + PipeTransferConfigPlanReq.fromTPipeTransferReq(req)); + } finally { + metrics.recordTransferConfigPlanTimer(System.nanoTime() - startTime); + } + } case TRANSFER_CONFIG_SNAPSHOT_PIECE: - resp = - handleTransferFilePiece( + { + try { + return handleTransferFilePiece( PipeTransferConfigSnapshotPieceReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, false); - PipeConfigNodeReceiverMetrics.getInstance() - .recordTransferConfigSnapshotPieceTimer(System.nanoTime() - startTime); - return resp; + } finally { + metrics.recordTransferConfigSnapshotPieceTimer(System.nanoTime() - startTime); + } + } case TRANSFER_CONFIG_SNAPSHOT_SEAL: - resp = - handleTransferFileSealV2( + { + try { + return handleTransferFileSealV2( PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req)); - PipeConfigNodeReceiverMetrics.getInstance() - .recordTransferConfigSnapshotSealTimer(System.nanoTime() - startTime); - return resp; + } finally { + metrics.recordTransferConfigSnapshotSealTimer(System.nanoTime() - startTime); + } + } case TRANSFER_COMPRESSED: + { + } return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); default: break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 104db05e2d3af..cabfec93176a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -233,7 +233,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req)); } finally { metrics.recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferTabletInsertNodeSize(req.body.limit()); } } case TRANSFER_TABLET_INSERT_NODE_V2: @@ -243,7 +243,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req)); } finally { metrics.recordTransferTabletInsertNodeV2Timer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferTabletInsertNodeV2Size(req.body.limit()); } } case TRANSFER_TABLET_RAW: @@ -252,7 +252,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)); } finally { metrics.recordTransferTabletRawTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferTabletRawSize(req.body.limit()); } } case TRANSFER_TABLET_RAW_V2: @@ -262,7 +262,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferTabletRawReqV2.fromTPipeTransferReq(req)); } finally { metrics.recordTransferTabletRawV2Timer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferTabletRawV2Size(req.body.limit()); } } case TRANSFER_TABLET_BINARY: @@ -272,7 +272,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)); } finally { metrics.recordTransferTabletBinaryTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferTabletBinarySize(req.body.limit()); } } case TRANSFER_TABLET_BINARY_V2: @@ -282,7 +282,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferTabletBinaryReqV2.fromTPipeTransferReq(req)); } finally { metrics.recordTransferTabletBinaryV2Timer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferTabletBinaryV2Size(req.body.limit()); } } case TRANSFER_TABLET_BATCH: @@ -292,7 +292,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferTabletBatchReq.fromTPipeTransferReq(req)); } finally { metrics.recordTransferTabletBatchTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferTabletBatchSize(req.body.limit()); } } case TRANSFER_TABLET_BATCH_V2: @@ -302,7 +302,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req)); } finally { metrics.recordTransferTabletBatchV2Timer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferTabletBatchV2Size(req.body.limit()); } } case TRANSFER_TS_FILE_PIECE: @@ -314,7 +314,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { true); } finally { metrics.recordTransferTsFilePieceTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferTsFilePieceSize(req.body.limit()); } } case TRANSFER_TS_FILE_SEAL: @@ -324,7 +324,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferTsFileSealReq.fromTPipeTransferReq(req)); } finally { metrics.recordTransferTsFileSealTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferTsFileSealSize(req.body.limit()); } } case TRANSFER_TS_FILE_PIECE_WITH_MOD: @@ -334,10 +334,9 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, false); - } finally { metrics.recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferTsFilePieceWithModSize(req.body.limit()); } } case TRANSFER_TS_FILE_SEAL_WITH_MOD: @@ -347,7 +346,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req)); } finally { metrics.recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferTsFileSealWithModSize(req.body.limit()); } } case TRANSFER_PLAN_NODE: @@ -356,7 +355,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)); } finally { metrics.recordTransferSchemaPlanTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferSchemaPlanSize(req.body.limit()); } } case TRANSFER_SCHEMA_SNAPSHOT_PIECE: @@ -366,10 +365,9 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, false); - } finally { metrics.recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferSchemaSnapshotPieceSize(req.body.limit()); } } case TRANSFER_SCHEMA_SNAPSHOT_SEAL: @@ -380,7 +378,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { } finally { metrics.recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferSchemaSnapshotSealSize(req.body.limit()); } } case HANDSHAKE_CONFIGNODE_V1: @@ -395,7 +393,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferConfigPlan(req); } finally { metrics.recordTransferConfigPlanTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferConfigPlanSize(req.body.limit()); } } case TRANSFER_SLICE: @@ -404,7 +402,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferSlice(PipeTransferSliceReq.fromTPipeTransferReq(req)); } finally { metrics.recordTransferSliceTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferSliceSize(req.body.limit()); } } case TRANSFER_COMPRESSED: @@ -413,7 +411,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); } finally { metrics.recordTransferCompressedTimer(System.nanoTime() - startTime); - metrics.markHandshakeDatanodeV2Size(req.body.limit()); + metrics.markTransferCompressedSize(req.body.limit()); } } default: From 057daa02e83d5e4ce38fc732cf3683b0b25a525b Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Jun 2025 22:21:08 +0800 Subject: [PATCH 4/5] partial --- .../PipeConfigNodeReceiverMetrics.java | 115 ++++++++++++++++++ .../thrift/IoTDBDataNodeReceiver.java | 1 - .../commons/service/metric/enums/Metric.java | 1 + 3 files changed, 116 insertions(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/receiver/PipeConfigNodeReceiverMetrics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/receiver/PipeConfigNodeReceiverMetrics.java index d264abe3c31bc..e46e3707829bb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/receiver/PipeConfigNodeReceiverMetrics.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/receiver/PipeConfigNodeReceiverMetrics.java @@ -24,6 +24,7 @@ import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.impl.DoNothingMetricManager; import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.type.Rate; import org.apache.iotdb.metrics.type.Timer; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; @@ -38,6 +39,12 @@ public class PipeConfigNodeReceiverMetrics implements IMetricSet { private Timer transferConfigSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer transferConfigSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Rate handshakeConfigNodeV1Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate handshakeConfigNodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferConfigPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferConfigSnapshotPieceRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferConfigSnapshotSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + private static final String RECEIVER = "pipeConfigNodeReceiver"; private PipeConfigNodeReceiverMetrics() {} @@ -62,6 +69,26 @@ public void recordTransferConfigSnapshotSealTimer(long costTimeInNanos) { transferConfigSnapshotSealTimer.updateNanos(costTimeInNanos); } + public void markHandshakeConfigNodeV1Size(final long reqBytes) { + handshakeConfigNodeV1Rate.mark(reqBytes); + } + + public void markHandshakeConfigNodeV2Size(final long reqBytes) { + handshakeConfigNodeV2Rate.mark(reqBytes); + } + + public void markTransferConfigPlanSize(final long reqBytes) { + transferConfigPlanRate.mark(reqBytes); + } + + public void markTransferConfigSnapshotPieceSize(final long reqBytes) { + transferConfigSnapshotPieceRate.mark(reqBytes); + } + + public void markTransferConfigSnapshotSealSize(final long reqBytes) { + transferConfigSnapshotSealRate.mark(reqBytes); + } + @Override public void bindTo(AbstractMetricService metricService) { bindToTimer(metricService); @@ -112,6 +139,52 @@ private void bindToTimer(AbstractMetricService metricService) { RECEIVER, Tag.TYPE.toString(), "transferConfigSnapshotSeal"); + + // Rate + handshakeConfigNodeV1Rate = + metricService.getOrCreateRate( + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeConfigNodeV1"); + + handshakeConfigNodeV2Rate = + metricService.getOrCreateRate( + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeConfigNodeV2"); + + transferConfigPlanRate = + metricService.getOrCreateRate( + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + + transferConfigSnapshotPieceRate = + metricService.getOrCreateRate( + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigSnapshotPiece"); + + transferConfigSnapshotSealRate = + metricService.getOrCreateRate( + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigSnapshotSeal"); } @Override @@ -126,6 +199,12 @@ private void unbind(AbstractMetricService metricService) { transferConfigSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; transferConfigSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + handshakeConfigNodeV1Rate = DoNothingMetricManager.DO_NOTHING_RATE; + handshakeConfigNodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + transferConfigPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferConfigSnapshotPieceRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferConfigSnapshotSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + metricService.remove( MetricType.TIMER, Metric.PIPE_CONFIGNODE_RECEIVER.toString(), @@ -161,6 +240,42 @@ private void unbind(AbstractMetricService metricService) { RECEIVER, Tag.TYPE.toString(), "transferConfigSnapshotSeal"); + + metricService.remove( + MetricType.RATE, + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeConfigNodeV1"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeConfigNodeV2"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigSnapshotPiece"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigSnapshotSeal"); } public static PipeConfigNodeReceiverMetrics getInstance() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index cabfec93176a9..797054cf31453 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -375,7 +375,6 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { return handleTransferFileSealV2( PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req)); - } finally { metrics.recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); metrics.markTransferSchemaSnapshotSealSize(req.body.limit()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 28ee0fc674355..471d9ee36ab3b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -153,6 +153,7 @@ public enum Metric { PIPE_DATANODE_RECEIVER("pipe_datanode_receiver"), PIPE_DATANODE_RECEIVER_REQ_SIZE("pipe_datanode_receiver_req_size"), PIPE_CONFIGNODE_RECEIVER("pipe_confignode_receiver"), + PIPE_CONFIGNODE_RECEIVER_REQ_SIZE("pipe_confignode_receiver_req_size"), PIPE_EXTRACTOR_TABLET_SUPPLY("pipe_extractor_tablet_supply"), PIPE_EXTRACTOR_TSFILE_SUPPLY("pipe_extractor_tsfile_supply"), PIPE_EXTRACTOR_HEARTBEAT_SUPPLY("pipe_extractor_heartbeat_supply"), From e58dcd55b4fcb1c95892869b270fc1ce2056019c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 24 Jun 2025 09:47:41 +0800 Subject: [PATCH 5/5] Partial --- .../PipeConfigNodeReceiverMetrics.java | 44 +++++++++++++++++++ .../protocol/IoTDBConfigNodeReceiver.java | 12 ++++- 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/receiver/PipeConfigNodeReceiverMetrics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/receiver/PipeConfigNodeReceiverMetrics.java index e46e3707829bb..7db1be9d8b502 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/receiver/PipeConfigNodeReceiverMetrics.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/receiver/PipeConfigNodeReceiverMetrics.java @@ -38,12 +38,14 @@ public class PipeConfigNodeReceiverMetrics implements IMetricSet { private Timer transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer transferConfigSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer transferConfigSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Rate handshakeConfigNodeV1Rate = DoNothingMetricManager.DO_NOTHING_RATE; private Rate handshakeConfigNodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; private Rate transferConfigPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; private Rate transferConfigSnapshotPieceRate = DoNothingMetricManager.DO_NOTHING_RATE; private Rate transferConfigSnapshotSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferCompressedRate = DoNothingMetricManager.DO_NOTHING_RATE; private static final String RECEIVER = "pipeConfigNodeReceiver"; @@ -69,6 +71,10 @@ public void recordTransferConfigSnapshotSealTimer(long costTimeInNanos) { transferConfigSnapshotSealTimer.updateNanos(costTimeInNanos); } + public void recordTransferCompressedTimer(long costTimeInNanos) { + transferCompressedTimer.updateNanos(costTimeInNanos); + } + public void markHandshakeConfigNodeV1Size(final long reqBytes) { handshakeConfigNodeV1Rate.mark(reqBytes); } @@ -89,6 +95,10 @@ public void markTransferConfigSnapshotSealSize(final long reqBytes) { transferConfigSnapshotSealRate.mark(reqBytes); } + public void markTransferCompressedSize(final long reqBytes) { + transferCompressedRate.mark(reqBytes); + } + @Override public void bindTo(AbstractMetricService metricService) { bindToTimer(metricService); @@ -140,6 +150,15 @@ private void bindToTimer(AbstractMetricService metricService) { Tag.TYPE.toString(), "transferConfigSnapshotSeal"); + transferCompressedTimer = + metricService.getOrCreateTimer( + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); + // Rate handshakeConfigNodeV1Rate = metricService.getOrCreateRate( @@ -185,6 +204,15 @@ private void bindToTimer(AbstractMetricService metricService) { RECEIVER, Tag.TYPE.toString(), "transferConfigSnapshotSeal"); + + transferCompressedRate = + metricService.getOrCreateRate( + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); } @Override @@ -198,12 +226,14 @@ private void unbind(AbstractMetricService metricService) { transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; transferConfigSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; transferConfigSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER; handshakeConfigNodeV1Rate = DoNothingMetricManager.DO_NOTHING_RATE; handshakeConfigNodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; transferConfigPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; transferConfigSnapshotPieceRate = DoNothingMetricManager.DO_NOTHING_RATE; transferConfigSnapshotSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferCompressedRate = DoNothingMetricManager.DO_NOTHING_RATE; metricService.remove( MetricType.TIMER, @@ -240,6 +270,13 @@ private void unbind(AbstractMetricService metricService) { RECEIVER, Tag.TYPE.toString(), "transferConfigSnapshotSeal"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); metricService.remove( MetricType.RATE, @@ -276,6 +313,13 @@ private void unbind(AbstractMetricService metricService) { RECEIVER, Tag.TYPE.toString(), "transferConfigSnapshotSeal"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); } public static PipeConfigNodeReceiverMetrics getInstance() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java index e926bc9ace215..8b7ba587f5f30 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java @@ -176,6 +176,7 @@ public TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req)); } finally { metrics.recordHandshakeConfigNodeV1Timer(System.nanoTime() - startTime); + metrics.markHandshakeConfigNodeV1Size(req.body.limit()); } } case HANDSHAKE_CONFIGNODE_V2: @@ -185,6 +186,7 @@ public TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req)); } finally { metrics.recordHandshakeConfigNodeV2Timer(System.nanoTime() - startTime); + metrics.markHandshakeConfigNodeV2Size(req.body.limit()); } } case TRANSFER_CONFIG_PLAN: @@ -194,6 +196,7 @@ public TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferConfigPlanReq.fromTPipeTransferReq(req)); } finally { metrics.recordTransferConfigPlanTimer(System.nanoTime() - startTime); + metrics.markTransferConfigPlanSize(req.body.limit()); } } case TRANSFER_CONFIG_SNAPSHOT_PIECE: @@ -205,6 +208,7 @@ public TPipeTransferResp receive(final TPipeTransferReq req) { false); } finally { metrics.recordTransferConfigSnapshotPieceTimer(System.nanoTime() - startTime); + metrics.markTransferConfigSnapshotPieceSize(req.body.limit()); } } case TRANSFER_CONFIG_SNAPSHOT_SEAL: @@ -214,12 +218,18 @@ public TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req)); } finally { metrics.recordTransferConfigSnapshotSealTimer(System.nanoTime() - startTime); + metrics.markTransferConfigSnapshotSealSize(req.body.limit()); } } case TRANSFER_COMPRESSED: { + try { + return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); + } finally { + metrics.recordTransferCompressedTimer(System.nanoTime() - startTime); + metrics.markTransferCompressedSize(req.body.limit()); + } } - return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); default: break; }