Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.type.Rate;
import org.apache.iotdb.metrics.type.Timer;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
Expand All @@ -37,6 +38,14 @@ public class PipeConfigNodeReceiverMetrics implements IMetricSet {
private Timer transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer transferConfigSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer transferConfigSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER;

private Rate handshakeConfigNodeV1Rate = DoNothingMetricManager.DO_NOTHING_RATE;
private Rate handshakeConfigNodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE;
private Rate transferConfigPlanRate = DoNothingMetricManager.DO_NOTHING_RATE;
private Rate transferConfigSnapshotPieceRate = DoNothingMetricManager.DO_NOTHING_RATE;
private Rate transferConfigSnapshotSealRate = DoNothingMetricManager.DO_NOTHING_RATE;
private Rate transferCompressedRate = DoNothingMetricManager.DO_NOTHING_RATE;

private static final String RECEIVER = "pipeConfigNodeReceiver";

Expand All @@ -62,6 +71,34 @@ public void recordTransferConfigSnapshotSealTimer(long costTimeInNanos) {
transferConfigSnapshotSealTimer.updateNanos(costTimeInNanos);
}

public void recordTransferCompressedTimer(long costTimeInNanos) {
transferCompressedTimer.updateNanos(costTimeInNanos);
}

public void markHandshakeConfigNodeV1Size(final long reqBytes) {
handshakeConfigNodeV1Rate.mark(reqBytes);
}

public void markHandshakeConfigNodeV2Size(final long reqBytes) {
handshakeConfigNodeV2Rate.mark(reqBytes);
}

public void markTransferConfigPlanSize(final long reqBytes) {
transferConfigPlanRate.mark(reqBytes);
}

public void markTransferConfigSnapshotPieceSize(final long reqBytes) {
transferConfigSnapshotPieceRate.mark(reqBytes);
}

public void markTransferConfigSnapshotSealSize(final long reqBytes) {
transferConfigSnapshotSealRate.mark(reqBytes);
}

public void markTransferCompressedSize(final long reqBytes) {
transferCompressedRate.mark(reqBytes);
}

@Override
public void bindTo(AbstractMetricService metricService) {
bindToTimer(metricService);
Expand Down Expand Up @@ -112,6 +149,70 @@ private void bindToTimer(AbstractMetricService metricService) {
RECEIVER,
Tag.TYPE.toString(),
"transferConfigSnapshotSeal");

transferCompressedTimer =
metricService.getOrCreateTimer(
Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"transferCompressed");

// Rate
handshakeConfigNodeV1Rate =
metricService.getOrCreateRate(
Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"handshakeConfigNodeV1");

handshakeConfigNodeV2Rate =
metricService.getOrCreateRate(
Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"handshakeConfigNodeV2");

transferConfigPlanRate =
metricService.getOrCreateRate(
Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"transferConfigPlan");

transferConfigSnapshotPieceRate =
metricService.getOrCreateRate(
Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"transferConfigSnapshotPiece");

transferConfigSnapshotSealRate =
metricService.getOrCreateRate(
Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"transferConfigSnapshotSeal");

transferCompressedRate =
metricService.getOrCreateRate(
Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"transferCompressed");
}

@Override
Expand All @@ -125,6 +226,14 @@ private void unbind(AbstractMetricService metricService) {
transferConfigPlanTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
transferConfigSnapshotPieceTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
transferConfigSnapshotSealTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
transferCompressedTimer = DoNothingMetricManager.DO_NOTHING_TIMER;

handshakeConfigNodeV1Rate = DoNothingMetricManager.DO_NOTHING_RATE;
handshakeConfigNodeV2Rate = DoNothingMetricManager.DO_NOTHING_RATE;
transferConfigPlanRate = DoNothingMetricManager.DO_NOTHING_RATE;
transferConfigSnapshotPieceRate = DoNothingMetricManager.DO_NOTHING_RATE;
transferConfigSnapshotSealRate = DoNothingMetricManager.DO_NOTHING_RATE;
transferCompressedRate = DoNothingMetricManager.DO_NOTHING_RATE;

metricService.remove(
MetricType.TIMER,
Expand Down Expand Up @@ -161,6 +270,56 @@ private void unbind(AbstractMetricService metricService) {
RECEIVER,
Tag.TYPE.toString(),
"transferConfigSnapshotSeal");
metricService.remove(
MetricType.TIMER,
Metric.PIPE_CONFIGNODE_RECEIVER.toString(),
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"transferCompressed");

metricService.remove(
MetricType.RATE,
Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(),
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"handshakeConfigNodeV1");
metricService.remove(
MetricType.RATE,
Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(),
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"handshakeConfigNodeV2");
metricService.remove(
MetricType.RATE,
Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(),
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"transferConfigPlan");
metricService.remove(
MetricType.RATE,
Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(),
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"transferConfigSnapshotPiece");
metricService.remove(
MetricType.RATE,
Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(),
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"transferConfigSnapshotSeal");
metricService.remove(
MetricType.RATE,
Metric.PIPE_CONFIGNODE_RECEIVER_REQ_SIZE.toString(),
Tag.NAME.toString(),
RECEIVER,
Tag.TYPE.toString(),
"transferCompressed");
}

public static PipeConfigNodeReceiverMetrics getInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public class IoTDBConfigNodeReceiver extends IoTDBFileReceiver {
public TPipeTransferResp receive(final TPipeTransferReq req) {
try {
final short rawRequestType = req.getType();
final PipeConfigNodeReceiverMetrics metrics = PipeConfigNodeReceiverMetrics.getInstance();
if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
final PipeRequestType type = PipeRequestType.valueOf(rawRequestType);
if (needHandshake(type)) {
Expand All @@ -189,43 +190,69 @@ public TPipeTransferResp receive(final TPipeTransferReq req) {
final long startTime = System.nanoTime();
switch (type) {
case HANDSHAKE_CONFIGNODE_V1:
resp =
handleTransferHandshakeV1(
{
try {
return handleTransferHandshakeV1(
PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req));
PipeConfigNodeReceiverMetrics.getInstance()
.recordHandshakeConfigNodeV1Timer(System.nanoTime() - startTime);
return resp;
} finally {
metrics.recordHandshakeConfigNodeV1Timer(System.nanoTime() - startTime);
metrics.markHandshakeConfigNodeV1Size(req.body.limit());
}
}
case HANDSHAKE_CONFIGNODE_V2:
resp =
handleTransferHandshakeV2(
PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req));
userEntity.setAuditLogOperation(AuditLogOperation.DDL);
PipeConfigNodeReceiverMetrics.getInstance()
.recordHandshakeConfigNodeV2Timer(System.nanoTime() - startTime);
return resp;
{
try {
resp =
handleTransferHandshakeV2(
PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req));
userEntity.setAuditLogOperation(AuditLogOperation.DDL);
return resp;
} finally {
metrics.recordHandshakeConfigNodeV2Timer(System.nanoTime() - startTime);
metrics.markHandshakeConfigNodeV2Size(req.body.limit());
}
}
case TRANSFER_CONFIG_PLAN:
resp = handleTransferConfigPlan(PipeTransferConfigPlanReq.fromTPipeTransferReq(req));
PipeConfigNodeReceiverMetrics.getInstance()
.recordTransferConfigPlanTimer(System.nanoTime() - startTime);
return resp;
{
try {
return handleTransferConfigPlan(
PipeTransferConfigPlanReq.fromTPipeTransferReq(req));
} finally {
metrics.recordTransferConfigPlanTimer(System.nanoTime() - startTime);
metrics.markTransferConfigPlanSize(req.body.limit());
}
}
case TRANSFER_CONFIG_SNAPSHOT_PIECE:
resp =
handleTransferFilePiece(
{
try {
return handleTransferFilePiece(
PipeTransferConfigSnapshotPieceReq.fromTPipeTransferReq(req),
req instanceof AirGapPseudoTPipeTransferRequest,
false);
PipeConfigNodeReceiverMetrics.getInstance()
.recordTransferConfigSnapshotPieceTimer(System.nanoTime() - startTime);
return resp;
} finally {
metrics.recordTransferConfigSnapshotPieceTimer(System.nanoTime() - startTime);
metrics.markTransferConfigSnapshotPieceSize(req.body.limit());
}
}
case TRANSFER_CONFIG_SNAPSHOT_SEAL:
resp =
handleTransferFileSealV2(
{
try {
return handleTransferFileSealV2(
PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req));
PipeConfigNodeReceiverMetrics.getInstance()
.recordTransferConfigSnapshotSealTimer(System.nanoTime() - startTime);
return resp;
} finally {
metrics.recordTransferConfigSnapshotSealTimer(System.nanoTime() - startTime);
metrics.markTransferConfigSnapshotSealSize(req.body.limit());
}
}
case TRANSFER_COMPRESSED:
return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
{
try {
return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
} finally {
metrics.recordTransferCompressedTimer(System.nanoTime() - startTime);
metrics.markTransferCompressedSize(req.body.limit());
}
}
default:
break;
}
Expand Down
Loading
Loading