Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@
}
}

public void setSchemaBatchSizeHistogram(Histogram schemaBatchSizeHistogram) {
if (outputPipeSink instanceof IoTDBSink) {
((IoTDBSink) outputPipeSink).setSchemaBatchSizeHistogram(schemaBatchSizeHistogram);
}
}

public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
if (outputPipeSink instanceof IoTDBSink) {
((IoTDBSink) outputPipeSink).setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
Expand All @@ -355,12 +361,19 @@
}

public void setTabletBatchTimeIntervalHistogram(Histogram tabletBatchTimeIntervalHistogram) {
if (outputPipeSink instanceof IoTDBSink) {

Check warning on line 364 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this instanceof check and cast with 'instanceof IoTDBSink iotdbsink'

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ7tf94XG0KwB-6342hZ&open=AZ7tf94XG0KwB-6342hZ&pullRequest=17665
((IoTDBSink) outputPipeSink)
.setTabletBatchTimeIntervalHistogram(tabletBatchTimeIntervalHistogram);
}
}

public void setSchemaBatchTimeIntervalHistogram(Histogram schemaBatchTimeIntervalHistogram) {
if (outputPipeSink instanceof IoTDBSink) {

Check warning on line 371 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this instanceof check and cast with 'instanceof IoTDBSink iotdbsink'

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ7tf94XG0KwB-6342ha&open=AZ7tf94XG0KwB-6342ha&pullRequest=17665
((IoTDBSink) outputPipeSink)
.setSchemaBatchTimeIntervalHistogram(schemaBatchTimeIntervalHistogram);
}
}

public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeIntervalHistogram) {
if (outputPipeSink instanceof IoTDBSink) {
((IoTDBSink) outputPipeSink)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.metrics.type.Rate;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
Expand Down Expand Up @@ -57,6 +58,7 @@ public void bindTo(final AbstractMetricService metricService) {

private void createMetrics(final String taskID) {
createRate(taskID);
createHistogram(taskID);
}

private void createRate(final String taskID) {
Expand All @@ -73,6 +75,38 @@ private void createRate(final String taskID) {
String.valueOf(connector.getCreationTime())));
}

private void createHistogram(final String taskID) {
final PipeSinkSubtask connector = connectorMap.get(taskID);

final Histogram schemaBatchSizeHistogram =
metricService.getOrCreateHistogram(
Metric.PIPE_SCHEMA_BATCH_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
connector.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
connector.setSchemaBatchSizeHistogram(schemaBatchSizeHistogram);

final Histogram schemaBatchTimeIntervalHistogram =
metricService.getOrCreateHistogram(
Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
connector.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
connector.setSchemaBatchTimeIntervalHistogram(schemaBatchTimeIntervalHistogram);

final Histogram schemaBatchEventSizeHistogram =
metricService.getOrCreateHistogram(
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
connector.getAttributeSortedString());
connector.setEventSizeHistogram(schemaBatchEventSizeHistogram);
}

@Override
public void unbindFrom(final AbstractMetricService metricService) {
ImmutableSet.copyOf(connectorMap.keySet()).forEach(this::deregister);
Expand All @@ -83,6 +117,7 @@ public void unbindFrom(final AbstractMetricService metricService) {

private void removeMetrics(final String taskID) {
removeRate(taskID);
removeHistogram(taskID);
}

private void removeRate(final String taskID) {
Expand All @@ -98,6 +133,29 @@ private void removeRate(final String taskID) {
schemaRateMap.remove(taskID);
}

private void removeHistogram(final String taskID) {
final PipeSinkSubtask connector = connectorMap.get(taskID);
metricService.remove(
MetricType.HISTOGRAM,
Metric.PIPE_SCHEMA_BATCH_SIZE.toString(),
Tag.NAME.toString(),
connector.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
metricService.remove(
MetricType.HISTOGRAM,
Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(),
Tag.NAME.toString(),
connector.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
metricService.remove(
MetricType.HISTOGRAM,
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
Tag.NAME.toString(),
connector.getAttributeSortedString());
}

//////////////////////////// Register & deregister (pipe integration) ////////////////////////////

public void register(final PipeSinkSubtask pipeSinkSubtask) {
Expand Down
Loading
Loading