diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/metrics/PCMetricsDef.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/metrics/PCMetricsDef.java index f0010063f..eda6028ab 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/metrics/PCMetricsDef.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/metrics/PCMetricsDef.java @@ -38,6 +38,7 @@ public enum PCMetricsDef { INCOMPLETE_OFFSETS_TOTAL("incomplete.offsets.total", "Total number of incomplete offsets", PCMetricsSubsystem.SHARD_MANAGER, GAUGE), SHARDS_SIZE("shards.size", "Number of records queued for processing across all shards", PCMetricsSubsystem.SHARD_MANAGER, GAUGE), + SHARDS_MAX_SIZE("shards.max.size", "The number of queued records in the shards with the most queued records", PCMetricsSubsystem.SHARD_MANAGER, GAUGE), //TODO: Not implemented yet - add to Metrics.adoc when implemented diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java index eb9478c61..eb19cef5a 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java @@ -83,6 +83,7 @@ public class ShardManager { private Optional iterationResumePoint = Optional.empty(); private Gauge shardsSizeGauge; + private Gauge shardsMaxSizeGauge; private Gauge numberOfShardsGauge; private final PCMetrics pcMetrics; @@ -302,7 +303,10 @@ private void initMetrics() { shardsSizeGauge = pcMetrics.gaugeFromMetricDef(PCMetricsDef.SHARDS_SIZE, this, shardManager -> shardManager.processingShards.values().stream() .mapToInt(processingShard -> processingShard.getEntries().size()).sum()); + shardsMaxSizeGauge = pcMetrics.gaugeFromMetricDef(PCMetricsDef.SHARDS_MAX_SIZE, + this, shardManager -> shardManager.processingShards.values().stream() + .mapToInt(processingShard -> processingShard.getEntries().size()).max().orElse(0)); numberOfShardsGauge = pcMetrics.gaugeFromMetricDef(PCMetricsDef.NUMBER_OF_SHARDS, - this, shardManager -> shardManager.processingShards.keySet().size()); + this, shardManager -> shardManager.processingShards.size()); } }