diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/receiver/PipeConfigNodeReceiverMetrics.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/receiver/PipeConfigNodeReceiverMetrics.java index d264abe3c31bc..7db1be9d8b502 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/receiver/PipeConfigNodeReceiverMetrics.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/receiver/PipeConfigNodeReceiverMetrics.java @@ -24,6 +24,7 @@ import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.impl.DoNothingMetricManager; import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.type.Rate; import org.apache.iotdb.metrics.type.Timer; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; @@ -37,6 +38,14 @@ public class PipeConfigNodeReceiverMetrics implements IMetricSet { private Timer transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer transferConfigSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer transferConfigSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Timer transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + + private Rate handshakeConfigNodeV1Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate handshakeConfigNodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferConfigPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferConfigSnapshotPieceRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferConfigSnapshotSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferCompressedRate = DoNothingMetricManager.DO_NOTHING_RATE; private static final String RECEIVER = "pipeConfigNodeReceiver"; @@ -62,6 +71,34 @@ public void recordTransferConfigSnapshotSealTimer(long costTimeInNanos) { transferConfigSnapshotSealTimer.updateNanos(costTimeInNanos); } + public void recordTransferCompressedTimer(long costTimeInNanos) { + transferCompressedTimer.updateNanos(costTimeInNanos); + } + + public void markHandshakeConfigNodeV1Size(final long reqBytes) { + handshakeConfigNodeV1Rate.mark(reqBytes); + } + + public void markHandshakeConfigNodeV2Size(final long reqBytes) { + handshakeConfigNodeV2Rate.mark(reqBytes); + } + + public void markTransferConfigPlanSize(final long reqBytes) { + transferConfigPlanRate.mark(reqBytes); + } + + public void markTransferConfigSnapshotPieceSize(final long reqBytes) { + transferConfigSnapshotPieceRate.mark(reqBytes); + } + + public void markTransferConfigSnapshotSealSize(final long reqBytes) { + transferConfigSnapshotSealRate.mark(reqBytes); + } + + public void markTransferCompressedSize(final long reqBytes) { + transferCompressedRate.mark(reqBytes); + } + @Override public void bindTo(AbstractMetricService metricService) { bindToTimer(metricService); @@ -112,6 +149,70 @@ private void bindToTimer(AbstractMetricService metricService) { RECEIVER, Tag.TYPE.toString(), "transferConfigSnapshotSeal"); + + transferCompressedTimer = + metricService.getOrCreateTimer( + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); + + // Rate + handshakeConfigNodeV1Rate = + metricService.getOrCreateRate( + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeConfigNodeV1"); + + handshakeConfigNodeV2Rate = + metricService.getOrCreateRate( + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeConfigNodeV2"); + + transferConfigPlanRate = + metricService.getOrCreateRate( + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + + transferConfigSnapshotPieceRate = + metricService.getOrCreateRate( + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigSnapshotPiece"); + + transferConfigSnapshotSealRate = + metricService.getOrCreateRate( + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigSnapshotSeal"); + + transferCompressedRate = + metricService.getOrCreateRate( + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); } @Override @@ -125,6 +226,14 @@ private void unbind(AbstractMetricService metricService) { transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER; transferConfigSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; transferConfigSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + + handshakeConfigNodeV1Rate = DoNothingMetricManager.DO_NOTHING_RATE; + handshakeConfigNodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + transferConfigPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferConfigSnapshotPieceRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferConfigSnapshotSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferCompressedRate = DoNothingMetricManager.DO_NOTHING_RATE; metricService.remove( MetricType.TIMER, @@ -161,6 +270,56 @@ private void unbind(AbstractMetricService metricService) { RECEIVER, Tag.TYPE.toString(), "transferConfigSnapshotSeal"); + metricService.remove( + MetricType.TIMER, + Metric.PIPE_CONFIGNODE_RECEIVER.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); + + metricService.remove( + MetricType.RATE, + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeConfigNodeV1"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeConfigNodeV2"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigSnapshotPiece"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigSnapshotSeal"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); } public static PipeConfigNodeReceiverMetrics getInstance() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java index 5f414f2cd69fd..6449b08d16add 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java @@ -177,6 +177,7 @@ public class IoTDBConfigNodeReceiver extends IoTDBFileReceiver { public TPipeTransferResp receive(final TPipeTransferReq req) { try { final short rawRequestType = req.getType(); + final PipeConfigNodeReceiverMetrics metrics = PipeConfigNodeReceiverMetrics.getInstance(); if (PipeRequestType.isValidatedRequestType(rawRequestType)) { final PipeRequestType type = PipeRequestType.valueOf(rawRequestType); if (needHandshake(type)) { @@ -189,43 +190,69 @@ public TPipeTransferResp receive(final TPipeTransferReq req) { final long startTime = System.nanoTime(); switch (type) { case HANDSHAKE_CONFIGNODE_V1: - resp = - handleTransferHandshakeV1( + { + try { + return handleTransferHandshakeV1( PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req)); - PipeConfigNodeReceiverMetrics.getInstance() - .recordHandshakeConfigNodeV1Timer(System.nanoTime() - startTime); - return resp; + } finally { + metrics.recordHandshakeConfigNodeV1Timer(System.nanoTime() - startTime); + metrics.markHandshakeConfigNodeV1Size(req.body.limit()); + } + } case HANDSHAKE_CONFIGNODE_V2: - resp = - handleTransferHandshakeV2( - PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req)); - userEntity.setAuditLogOperation(AuditLogOperation.DDL); - PipeConfigNodeReceiverMetrics.getInstance() - .recordHandshakeConfigNodeV2Timer(System.nanoTime() - startTime); - return resp; + { + try { + resp = + handleTransferHandshakeV2( + PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req)); + userEntity.setAuditLogOperation(AuditLogOperation.DDL); + return resp; + } finally { + metrics.recordHandshakeConfigNodeV2Timer(System.nanoTime() - startTime); + metrics.markHandshakeConfigNodeV2Size(req.body.limit()); + } + } case TRANSFER_CONFIG_PLAN: - resp = handleTransferConfigPlan(PipeTransferConfigPlanReq.fromTPipeTransferReq(req)); - PipeConfigNodeReceiverMetrics.getInstance() - .recordTransferConfigPlanTimer(System.nanoTime() - startTime); - return resp; + { + try { + return handleTransferConfigPlan( + PipeTransferConfigPlanReq.fromTPipeTransferReq(req)); + } finally { + metrics.recordTransferConfigPlanTimer(System.nanoTime() - startTime); + metrics.markTransferConfigPlanSize(req.body.limit()); + } + } case TRANSFER_CONFIG_SNAPSHOT_PIECE: - resp = - handleTransferFilePiece( + { + try { + return handleTransferFilePiece( PipeTransferConfigSnapshotPieceReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, false); - PipeConfigNodeReceiverMetrics.getInstance() - .recordTransferConfigSnapshotPieceTimer(System.nanoTime() - startTime); - return resp; + } finally { + metrics.recordTransferConfigSnapshotPieceTimer(System.nanoTime() - startTime); + metrics.markTransferConfigSnapshotPieceSize(req.body.limit()); + } + } case TRANSFER_CONFIG_SNAPSHOT_SEAL: - resp = - handleTransferFileSealV2( + { + try { + return handleTransferFileSealV2( PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req)); - PipeConfigNodeReceiverMetrics.getInstance() - .recordTransferConfigSnapshotSealTimer(System.nanoTime() - startTime); - return resp; + } finally { + metrics.recordTransferConfigSnapshotSealTimer(System.nanoTime() - startTime); + metrics.markTransferConfigSnapshotSealSize(req.body.limit()); + } + } case TRANSFER_COMPRESSED: - return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); + { + try { + return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); + } finally { + metrics.recordTransferCompressedTimer(System.nanoTime() - startTime); + metrics.markTransferCompressedSize(req.body.limit()); + } + } default: break; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java index 82defa546bb6c..5f46ac504ebb0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java @@ -24,6 +24,7 @@ import org.apache.iotdb.metrics.AbstractMetricService; import org.apache.iotdb.metrics.impl.DoNothingMetricManager; import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.type.Rate; import org.apache.iotdb.metrics.type.Timer; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; @@ -53,6 +54,27 @@ public class PipeDataNodeReceiverMetrics implements IMetricSet { private Timer transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + private Rate handshakeDatanodeV1Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate handshakeDatanodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletInsertNodeRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletInsertNodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletRawRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletRawV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletBinaryRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletBinaryV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletBatchRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTabletBatchV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTsFilePieceRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTsFileSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTsFilePieceWithModRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferTsFileSealWithModRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferSchemaPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferSchemaSnapshotPieceRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferSchemaSnapshotSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferConfigPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferCompressedRate = DoNothingMetricManager.DO_NOTHING_RATE; + private Rate transferSliceRate = DoNothingMetricManager.DO_NOTHING_RATE; + private static final String RECEIVER = "pipeDataNodeReceiver"; private PipeDataNodeReceiverMetrics() {} @@ -137,6 +159,86 @@ public void recordTransferSliceTimer(final long costTimeInNanos) { transferSliceTimer.updateNanos(costTimeInNanos); } + public void markHandshakeDatanodeV1Size(final long reqBytes) { + handshakeDatanodeV1Rate.mark(reqBytes); + } + + public void markHandshakeDatanodeV2Size(final long reqBytes) { + handshakeDatanodeV2Rate.mark(reqBytes); + } + + public void markTransferTabletInsertNodeSize(final long reqBytes) { + transferTabletInsertNodeRate.mark(reqBytes); + } + + public void markTransferTabletInsertNodeV2Size(final long reqBytes) { + transferTabletInsertNodeV2Rate.mark(reqBytes); + } + + public void markTransferTabletRawSize(final long reqBytes) { + transferTabletRawRate.mark(reqBytes); + } + + public void markTransferTabletRawV2Size(final long reqBytes) { + transferTabletRawV2Rate.mark(reqBytes); + } + + public void markTransferTabletBinarySize(final long reqBytes) { + transferTabletBinaryRate.mark(reqBytes); + } + + public void markTransferTabletBinaryV2Size(final long reqBytes) { + transferTabletBinaryV2Rate.mark(reqBytes); + } + + public void markTransferTabletBatchSize(final long reqBytes) { + transferTabletBatchRate.mark(reqBytes); + } + + public void markTransferTabletBatchV2Size(final long reqBytes) { + transferTabletBatchV2Rate.mark(reqBytes); + } + + public void markTransferTsFilePieceSize(final long reqBytes) { + transferTsFilePieceRate.mark(reqBytes); + } + + public void markTransferTsFileSealSize(final long reqBytes) { + transferTsFileSealRate.mark(reqBytes); + } + + public void markTransferTsFilePieceWithModSize(final long reqBytes) { + transferTsFilePieceWithModRate.mark(reqBytes); + } + + public void markTransferTsFileSealWithModSize(final long reqBytes) { + transferTsFileSealWithModRate.mark(reqBytes); + } + + public void markTransferSchemaPlanSize(final long reqBytes) { + transferSchemaPlanRate.mark(reqBytes); + } + + public void markTransferSchemaSnapshotPieceSize(final long reqBytes) { + transferSchemaSnapshotPieceRate.mark(reqBytes); + } + + public void markTransferSchemaSnapshotSealSize(final long reqBytes) { + transferSchemaSnapshotSealRate.mark(reqBytes); + } + + public void markTransferConfigPlanSize(final long reqBytes) { + transferConfigPlanRate.mark(reqBytes); + } + + public void markTransferCompressedSize(final long reqBytes) { + transferCompressedRate.mark(reqBytes); + } + + public void markTransferSliceSize(final long reqBytes) { + transferSliceRate.mark(reqBytes); + } + @Override public void bindTo(final AbstractMetricService metricService) { bindToTimer(metricService); @@ -303,6 +405,168 @@ private void bindToTimer(final AbstractMetricService metricService) { RECEIVER, Tag.TYPE.toString(), "transferSlice"); + + // Rate + handshakeDatanodeV1Rate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeDataNodeV1"); + handshakeDatanodeV2Rate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeDataNodeV2"); + transferTabletInsertNodeRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletInsertNode"); + transferTabletInsertNodeV2Rate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletInsertNodeV2"); + transferTabletRawRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletRaw"); + transferTabletRawV2Rate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletRawV2"); + transferTabletBinaryRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBinary"); + transferTabletBinaryV2Rate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBinaryV2"); + transferTabletBatchRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBatch"); + transferTabletBatchV2Rate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBatchV2"); + transferTsFilePieceRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFilePiece"); + transferTsFileSealRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFileSeal"); + transferTsFilePieceWithModRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFilePieceWithMod"); + transferTsFileSealWithModRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFileSealWithMod"); + transferSchemaPlanRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaPlan"); + transferSchemaSnapshotPieceRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaSnapshotPiece"); + transferSchemaSnapshotSealRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaSnapshotSeal"); + transferConfigPlanRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + transferCompressedRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); + transferSliceRate = + metricService.getOrCreateRate( + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSlice"); } @Override @@ -332,6 +596,27 @@ private void unbind(final AbstractMetricService metricService) { transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER; transferSliceTimer = DoNothingMetricManager.DO_NOTHING_TIMER; + handshakeDatanodeV1Rate = DoNothingMetricManager.DO_NOTHING_RATE; + handshakeDatanodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletInsertNodeRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletInsertNodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletRawRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletRawV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletBinaryRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletBinaryV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletBatchRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTabletBatchV2Rate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTsFilePieceRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTsFileSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTsFilePieceWithModRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferTsFileSealWithModRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferSchemaPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferSchemaSnapshotPieceRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferSchemaSnapshotSealRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferConfigPlanRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferCompressedRate = DoNothingMetricManager.DO_NOTHING_RATE; + transferSliceRate = DoNothingMetricManager.DO_NOTHING_RATE; + metricService.remove( MetricType.TIMER, Metric.PIPE_DATANODE_RECEIVER.toString(), @@ -472,6 +757,148 @@ private void unbind(final AbstractMetricService metricService) { RECEIVER, Tag.TYPE.toString(), "transferSlice"); + + // Rate + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeDatanodeV1"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "handshakeDatanodeV2"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletInsertNode"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletInsertNodeV2"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletRaw"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletRawV2"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBinary"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBinaryV2"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBatch"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTabletBatchV2"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFilePiece"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFileSeal"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFilePieceWithMod"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferTsFileSealWithMod"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaPlan"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaSnapshotPiece"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSchemaSnapshotSeal"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferConfigPlan"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferCompressed"); + metricService.remove( + MetricType.RATE, + Metric.PIPE_DATANODE_RECEIVER_REQ_SIZE.toString(), + Tag.NAME.toString(), + RECEIVER, + Tag.TYPE.toString(), + "transferSlice"); } public static PipeDataNodeReceiverMetrics getInstance() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 978d924c05651..1c7212f886ce3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -216,6 +216,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { final long startTime = System.nanoTime(); final short rawRequestType = req.getType(); + final PipeDataNodeReceiverMetrics metrics = PipeDataNodeReceiverMetrics.getInstance(); if (PipeRequestType.isValidatedRequestType(rawRequestType)) { final PipeRequestType requestType = PipeRequestType.valueOf(rawRequestType); if (requestType != PipeRequestType.TRANSFER_SLICE) { @@ -236,8 +237,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferHandshakeV1( PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); + metrics.recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV1Size(req.body.limit()); } } case HANDSHAKE_DATANODE_V2: @@ -254,8 +255,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferHandshakeV2( PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime); + metrics.recordHandshakeDatanodeV2Timer(System.nanoTime() - startTime); + metrics.markHandshakeDatanodeV2Size(req.body.limit()); } } case TRANSFER_TABLET_INSERT_NODE: @@ -263,10 +264,9 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { return handleTransferTabletInsertNode( PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); + metrics.recordTransferTabletInsertNodeTimer(System.nanoTime() - startTime); + metrics.markTransferTabletInsertNodeSize(req.body.limit()); } } case TRANSFER_TABLET_INSERT_NODE_V2: @@ -275,8 +275,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletInsertNode( PipeTransferTabletInsertNodeReqV2.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletInsertNodeV2Timer(System.nanoTime() - startTime); + metrics.recordTransferTabletInsertNodeV2Timer(System.nanoTime() - startTime); + metrics.markTransferTabletInsertNodeV2Size(req.body.limit()); } } case TRANSFER_TABLET_RAW: @@ -284,8 +284,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { return handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletRawTimer(System.nanoTime() - startTime); + metrics.recordTransferTabletRawTimer(System.nanoTime() - startTime); + metrics.markTransferTabletRawSize(req.body.limit()); } } case TRANSFER_TABLET_RAW_V2: @@ -294,8 +294,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletRaw( PipeTransferTabletRawReqV2.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletRawV2Timer(System.nanoTime() - startTime); + metrics.recordTransferTabletRawV2Timer(System.nanoTime() - startTime); + metrics.markTransferTabletRawV2Size(req.body.limit()); } } case TRANSFER_TABLET_BINARY: @@ -304,8 +304,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletBinary( PipeTransferTabletBinaryReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBinaryTimer(System.nanoTime() - startTime); + metrics.recordTransferTabletBinaryTimer(System.nanoTime() - startTime); + metrics.markTransferTabletBinarySize(req.body.limit()); } } case TRANSFER_TABLET_BINARY_V2: @@ -314,8 +314,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletBinary( PipeTransferTabletBinaryReqV2.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBinaryV2Timer(System.nanoTime() - startTime); + metrics.recordTransferTabletBinaryV2Timer(System.nanoTime() - startTime); + metrics.markTransferTabletBinaryV2Size(req.body.limit()); } } case TRANSFER_TABLET_BATCH: @@ -324,8 +324,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletBatch( PipeTransferTabletBatchReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBatchTimer(System.nanoTime() - startTime); + metrics.recordTransferTabletBatchTimer(System.nanoTime() - startTime); + metrics.markTransferTabletBatchSize(req.body.limit()); } } case TRANSFER_TABLET_BATCH_V2: @@ -334,8 +334,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferTabletBatchV2( PipeTransferTabletBatchReqV2.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTabletBatchV2Timer(System.nanoTime() - startTime); + metrics.recordTransferTabletBatchV2Timer(System.nanoTime() - startTime); + metrics.markTransferTabletBatchV2Size(req.body.limit()); } } case TRANSFER_TS_FILE_PIECE: @@ -346,8 +346,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { req instanceof AirGapPseudoTPipeTransferRequest, true); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFilePieceTimer(System.nanoTime() - startTime); + metrics.recordTransferTsFilePieceTimer(System.nanoTime() - startTime); + metrics.markTransferTsFilePieceSize(req.body.limit()); } } case TRANSFER_TS_FILE_SEAL: @@ -356,8 +356,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferFileSealV1( PipeTransferTsFileSealReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFileSealTimer(System.nanoTime() - startTime); + metrics.recordTransferTsFileSealTimer(System.nanoTime() - startTime); + metrics.markTransferTsFileSealSize(req.body.limit()); } } case TRANSFER_TS_FILE_PIECE_WITH_MOD: @@ -367,10 +367,9 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, false); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime); + metrics.recordTransferTsFilePieceWithModTimer(System.nanoTime() - startTime); + metrics.markTransferTsFilePieceWithModSize(req.body.limit()); } } case TRANSFER_TS_FILE_SEAL_WITH_MOD: @@ -379,8 +378,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { return handleTransferFileSealV2( PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); + metrics.recordTransferTsFileSealWithModTimer(System.nanoTime() - startTime); + metrics.markTransferTsFileSealWithModSize(req.body.limit()); } } case TRANSFER_PLAN_NODE: @@ -388,8 +387,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { return handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaPlanTimer(System.nanoTime() - startTime); + metrics.recordTransferSchemaPlanTimer(System.nanoTime() - startTime); + metrics.markTransferSchemaPlanSize(req.body.limit()); } } case TRANSFER_SCHEMA_SNAPSHOT_PIECE: @@ -399,10 +398,9 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, false); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime); + metrics.recordTransferSchemaSnapshotPieceTimer(System.nanoTime() - startTime); + metrics.markTransferSchemaSnapshotPieceSize(req.body.limit()); } } case TRANSFER_SCHEMA_SNAPSHOT_SEAL: @@ -410,10 +408,9 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { return handleTransferFileSealV2( PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req)); - } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); + metrics.recordTransferSchemaSnapshotSealTimer(System.nanoTime() - startTime); + metrics.markTransferSchemaSnapshotSealSize(req.body.limit()); } } case HANDSHAKE_CONFIGNODE_V1: @@ -427,8 +424,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { // then transferred to ConfigNode receiver to execute. return handleTransferConfigPlan(req); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferConfigPlanTimer(System.nanoTime() - startTime); + metrics.recordTransferConfigPlanTimer(System.nanoTime() - startTime); + metrics.markTransferConfigPlanSize(req.body.limit()); } } case TRANSFER_SLICE: @@ -436,8 +433,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { return handleTransferSlice(PipeTransferSliceReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferSliceTimer(System.nanoTime() - startTime); + metrics.recordTransferSliceTimer(System.nanoTime() - startTime); + metrics.markTransferSliceSize(req.body.limit()); } } case TRANSFER_COMPRESSED: @@ -445,8 +442,8 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { try { return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req)); } finally { - PipeDataNodeReceiverMetrics.getInstance() - .recordTransferCompressedTimer(System.nanoTime() - startTime); + metrics.recordTransferCompressedTimer(System.nanoTime() - startTime); + metrics.markTransferCompressedSize(req.body.limit()); } } default: diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index cbeef1f2c7c7e..d1a50817fdde9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -161,7 +161,9 @@ public enum Metric { PIPE_TOTAL_COMPRESSED_SIZE("pipe_total_compressed_size"), PIPE_COMPRESSION_TIME("pipe_compression_time"), PIPE_DATANODE_RECEIVER("pipe_datanode_receiver"), + PIPE_DATANODE_RECEIVER_REQ_SIZE("pipe_datanode_receiver_req_size"), PIPE_CONFIGNODE_RECEIVER("pipe_confignode_receiver"), + PIPE_CONFIGNODE_RECEIVER_REQ_SIZE("pipe_confignode_receiver_req_size"), PIPE_EXTRACTOR_TABLET_SUPPLY("pipe_extractor_tablet_supply"), PIPE_EXTRACTOR_TSFILE_SUPPLY("pipe_extractor_tsfile_supply"), PIPE_EXTRACTOR_HEARTBEAT_SUPPLY("pipe_extractor_heartbeat_supply"),